docs/tech-notes/life_of_a_query.md
Original author: Andrei Matei (updated June 2021)
This document aims to explain the execution of a SQL query in CockroachDB, following the code paths through the various layers of the system: network protocol, SQL session management, query parsing, planning and optimization, execution, key/value service, transaction management, request routing, request processing, Raft consensus, on-disk storage engine, etc. The idea is to provide a high-level unifying view of the structure of the various components; none will be explored in particular depth but pointers to other documentation will be provided where such documentation exists. Code pointers will abound.
This document will generally not discuss design decisions, but rather focus on tracing through the actual (current) code. In the interest of brevity, it only covers the most significant and common parts of query execution, and omits a large amount of detail and special cases.
The intended audience is folks curious about a walk through the architecture of a modern database, presented differently than in a design doc. It will hopefully also be helpful for open source contributors and new Cockroach Labs engineers.
A SQL query arrives at the server through the PostgreSQL wire protocol. CockroachDB uses this protocol for compatibility with existing client drivers and applications.
The protocol is implemented by the pgwire package.
Once a client connects,
it is represented by pgwire.conn,
which wraps a net.Conn socket interface.
conn.serveImpl
implements the main "read, parse, execute" loop:
for the lifetime of the connection, we
read
a message (usually containing one or more SQL statements),
parse
the queries, pass
them
to connExecutor for execution, serialize and
buffer
the results, and
send
a result message to the client.
CockroachDB's SQL engine sits between the client connection and the core key/value service, translating SQL queries into key/value operations. It broadly goes through the following stages:
Parsing → Logical Planning & Optimization → Physical Planning → Execution
Parsing transforms the original SQL statement, represented as a string, into
an Abstract Syntax Tree
(AST) that is easier to work with. CockroachDB uses an
LALR parser that is
generated
by goyacc from a Yacc-like
grammar file.
This file specifies CockroachDB's SQL dialect, and was originally copied from
PostgreSQL and then modified.
As we've already seen, pgwire.conn parses
client statements using a parser.Parser instance.
The parser begins by scanning
the string into tokens — individual pieces such as a
string
or number.
It does this one by one
until it reaches a semicolon
which terminates the statement. These tokens are then
lexed
for some simple post-processing, and the lexer is
passed
to the generated parser which iterates over the tokens and applies the SQL
grammar to them.
The result of this process is a parser.Statement,
which wraps
an AST tree.Statement,
a tree structure that models the SQL syntax. For example,
tree.SelectClause
models the familiar SELECT clause, with recognizable fields such as From and
Where. Many parts of the AST also contains
tree.Expr,
nested algebraic expressions such as tax / total * 100.
Note that the AST only represents the syntax of the query, and says nothing about how, or even if, it can be executed — for example, it has no idea whether a table exists or what datatype a column has. Figuring that out is the job of the planner.
Once pgwire.conn has parsed the statement, it's placed in an sql.ExecStmt
command and pushed into a
statement buffer.
On the other side of this buffer is a sql.connExecutor
which was created during connection setup.
It processes
the commands in the statement buffer by repeatedly calling
execCmd(),
executing
any statements.
As execStmt()
executes statements, it records various
session data
such as the current database and user-defined settings,
and implements
a finite state machine
to track the transaction state:
did a transaction just begin or end, or did we encounter an error?
It then branches off depending on the state of the SQL transaction:
open,
aborted,
waiting for retry,
or none.
Simple statements are primarily handled here. For example, a
BEGIN statement
will open
a new kv.Txn transaction
in the key/value service (which we'll get back to later), and a
COMMIT statement
will commit
that transaction. If a query runs outside of a transaction,
it will use an implicit transaction.
Query execution uses this transaction to dispatch key/value requests.
Now that we have figured out what (KV) transaction we're running inside of, we are concerned with executing statements one at a time.
There is an impedance mismatch when interfacing SQL connExecutor code,
which is stream-oriented (with statements being executed one at a time
possibly within the scope of a SQL transaction), and CockroachDB's
key/value (KV) interface, which is request-oriented (with transactions
explicitly attached to every request).
To hint at the complications: it is
sometimes necessary
to retry statements in CockroachDB, usually because of serializability
violations or data contention. A single SQL statement executed outside of a SQL
transaction (i.e. an implicit transaction) can be retried automatically.
However, it is not safe to retry a SQL transaction spanning multiple client
statements, since they may be conditional on client logic (i.e. different
SELECT results may trigger different subsequent statements). In this case, we
bubble up a retryable error
to the client.
connExecutor serves as a coordinator between different components during
SQL statement execution. This primarily happens in
execStmtInOpenState(),
which dispatches to the execution engine in
dispatchToExecutionEngine().
During this process, it
builds and optimizes
a logical plan, then builds and executes
a physical plan. Once executed, the statement
result
is returned to the client via a
ClientComm.
We're now getting to the heart of the SQL engine. Query planning is the process
of converting the SQL query AST into a tree of
relational algebra operators
that will produce the desired result. There can be a large number of
functionally equivalent plans for a given query, with very different performance
characteristics, and the optimizer will try to figure out the most efficient
plan. Query planning is a complex topic that could easily fill several articles
of its own, so we won't go into any depth here — see the
opt package documentation
for further details.
The query plan for a SQL statement can be inspected via EXPLAIN. Let's
have a look at a concrete example:
CREATE TABLE countries (id STRING PRIMARY KEY, name STRING);
INSERT INTO countries VALUES
('us', 'United States'), ('cn', 'China'), ('de', 'Germany');
CREATE TABLE companies (
name STRING,
country STRING REFERENCES countries(id),
employees INT
);
INSERT INTO companies VALUES
('McDonalds', 'us', 1.9e6),
('Apple', 'us', 0.15e6),
('State Grid', 'cn', 1.5e6),
('Sinopec', 'cn', 0.58e6),
('Volkswagen', 'de', 0.67e6);
EXPLAIN
SELECT companies.name AS company, countries.name AS country, employees
FROM companies JOIN countries ON companies.country = countries.id
WHERE employees > 1e6
ORDER BY employees DESC;
• sort
│ estimated row count: 3
│ order: -employees
│
└── • lookup join
│ estimated row count: 3
│ table: countries@primary
│ equality: (country) = (id)
│ equality cols are key
│
└── • filter
│ estimated row count: 3
│ filter: employees > 1000000
│
└── • scan
estimated row count: 5 (100% of the table; stats collected 12 seconds ago)
table: companies@primary
spans: FULL SCAN
Each node in the plan is a relational operator, and results flow upwards from
one node to the next. We can see that the optimizer has decided to do a full table
scan of the companies table, filter the rows on employees, perform a
lookup join with the countries table, and finally sort the results
by employees in descending order. The optimizer could instead have chosen to e.g.
use a hash join, or do the sort right after the scan, but it thought this
plan would be more efficient.
The connExecutor builds a plan by using a sql.planner
and calling makeOptimizerPlan()
on it. Internally, it starts by converting
the AST into a "memo".
memo.Memo
is a data structure that can efficiently represent all of the different possible
query plan trees that the optimizer explores by using
memoization. Briefly, it does this
by representing each query expression (either a relational expression
opt.RelExpr such as scan or scalar expression opt.ScalarExpr such as =)
as a "memo group" with references between them. The optimizer can then expand
out each group into other equivalent expressions, such that each group
represents many different methods to achieve the same result, and try to figure
out which one would be best. Because it uses references between groups, it can
compute and store the equivalent expressions once for each group instead of
recursively repeating them for all possible combinations. For example, consider
the following query and corresponding memo groups:
SELECT * FROM companies, countries WHERE companies.country = countries.id
G6: [inner-join [G1 G2 G5]]
G5: [eq [G3 G4]]
G4: [variable countries.id]
G3: [variable companies.country]
G2: [scan countries]
G1: [scan companies]
Since there are many different ways of doing the join, we can expand out G6 to represent multiple possible plans, but without having to duplicate and recompute the other groups for each join:
G6: [inner-join [G1 G2 G5]] [inner-join [G2 G1 G5]] [lookup-join [G1 G2 G5]] ...
G5: [eq [G3 G4]]
G4: [variable countries.id]
G3: [variable companies.country]
G2: [scan countries]
G1: [scan companies]
buildExecMemo()
first uses an optbuilder.Builder
to build
an initial normalized memo from the AST. If we keep tracing the internal
calls we eventually get somewhere interesting:
buildSelectClause()
which converts the SELECT clause from a tree.SelectClause AST into
a set of memo groups. Let's keep going deeper into
building the FROM clause
until we get to where it builds a table name.
This first resolves
the table name in the system catalog, which e.g. makes sure that the table
exists and that the user has access to it. It then uses the memo
factory to construct a ScanExpr
for the scan. Similarly,
building the WHERE clause
will resolve and build
a ScalarExpr for the predicate and
place it
in a FiltersExpr.
Other AST nodes are built in a similar manner. Of particular note, scalar
expressions (algebraic expressions such as tax / total * 100) are originally
represented by nested tree.Expr AST nodes and implement a
visitor pattern
for analysis and transformation. These can be found in many parts of a query
(e.g. SELECT, WHERE, and ORDER BY) and need common processing. This
typically happens in resolveAndBuildScalar(),
which recursively resolves names and types
to yield tree.TypedExpr (which can be recursively evaluated
to a scalar value), and then builds
a ScalarExpr memo expression for it.
When optbuilder.Builder.Build() returns a memo, it will have carried out the
following analysis and preparation of the memo group expressions:
Normalization deserves further discussion. The typical example is constant
folding, where a given constant scalar expression such as 1 + 2 * 3 can be
pre-evaluated (normalized) to 7 to avoid re-computing it during query
execution. We apply a large number of such normalization rules, and during plan
exploration and optimization we also need to apply similar transformation rules
to generate equivalent expressions for memo groups. We express these rules in a
custom domain-specific language called Optgen. We don't have time to go into
any detail here (see the
optgen package documentation),
but let's have a look at a simple normalization rule,
FoldInEmpty:
[FoldInEmpty, Normalize]
(In * (Tuple []))
=>
(False)
This looks for IN operators with an empty tuple on the right-hand side, and
replaces any matching expressions with FALSE since an empty tuple can never
contain anything — i.e. SELECT 'foo' IN () is equivalent to SELECT FALSE.
These normalization rules
are compiled
into Go code along with a
norm.Factory
that builds normalized memo expressions. We've seen this factory before:
the Builder used it
to construct a normalized ScanExpr node for a FROM clause table name.
Note that since ConstructScan() and the other factory methods immediately
apply the normalization rules, they may end up returning a different
kind of expression than the method name indicates — we never construct
a non-normalized memo expression.
Now that we've built a normalized memo, buildExecMemo() goes on to
optimize the plan using
xform.Optimizer.
This explores possible query plans by applying Optgen
transformation rules
to generate equivalent memo groups, and tries to pick the best combination through
cost-based optimization
based on table statistics.
The optimizer starts
with the root memo group and recursively calls
optimizeGroup()
on group members. While recursing, it
calls exploreGroup()
to generate equivalent memo group expressions, repeating the process until
all possibilities have been searched.
It also computes the cost
of each memo group expression using a
Coster,
which takes into account a number of factors such as the
relative costs
of CPU usage, sequential IO, and random IO as well as
table statistics.
For example, the cost of a scan
is roughly the estimated number of rows multiplied by the sequential
IO cost and per-row processing cost.
Finally, the optimizer picks
the lowest-cost memo tree and returns it.
Back in planner.makeOptimizerPlan(), we now have an optimized memo and
go on to convert it
into a query plan.
This plan is primarily made up of
planNodes
(such as scanNode
and filterNode)
that correspond to the memo expressions. Since this process is mostly
a direct transcription of the memo, we won't go into any detail here.
In the past, these planNodes would execute the query themselves
by recursively calling
Next()
to retrieve the next result row. However, they are now mostly a historical
artifact, and temporarily serve as an intermediate representation of the logical
query plan. The plan will be handed off to the DistSQL (distributed SQL) engine
which will generate a distributed physical query plan and execute it.
Now that we have an optimized logical query plan, we're getting ready to
actually execute it. However, CockroachDB is a distributed system, and fetching
large amounts of data over the network can be slow. We'd much rather send the
computation to the data, for example by doing WHERE filtering on the nodes
containing the data.
This is managed by the DistSQL engine, which was inspired by Sawzall and MapReduce. The node running the DistSQL planner is called the "gateway node", and it will set up a flow for the query consisting of processors that exchange data via streams. To do so, it figures out which nodes contain the data we're interested in and configures them by sending RPC requests. Streams transmit row data either in memory between local processors or across the network via RPC, and converge on the gateway node where they are combined into the final result.
Let's use the example from the logical planning section, but move the
companies table to node 2 and countries to node 3, using node 1 as the gateway
node (having started the nodes with appropriate --locality node=X flags):
ALTER TABLE companies CONFIGURE ZONE USING num_replicas=1, constraints='[+node=2]';
ALTER TABLE countries CONFIGURE ZONE USING num_replicas=1, constraints='[+node=3]';
We can then obtain a DistSQL plan by using EXPLAIN(DISTSQL), which will
output a URL for a graphical diagram of the flow. We'll use the
join hint
INNER HASH JOIN to produce a more illustrative example.
EXPLAIN(DISTSQL)
SELECT companies.name AS company, countries.name AS country, employees
FROM companies INNER HASH JOIN countries ON companies.country = countries.id
WHERE employees > 1e6
ORDER BY employees DESC;
• sort
│ estimated row count: 3
│ order: -employees
│
└── • hash join
│ estimated row count: 3
│ equality: (country) = (id)
│ right cols are key
│
├── • filter
│ │ estimated row count: 3
│ │ filter: employees > 1000000
│ │
│ └── • scan
│ estimated row count: 5 (100% of the table; stats collected 30 minutes ago)
│ table: companies@primary
│ spans: FULL SCAN
│
└── • scan
estimated row count: 3 (100% of the table; stats collected 30 minutes ago)
table: countries@primary
spans: FULL SCAN
Here, we see that it sets up a TableReader processor for companies on
node 2, passing rows to a local Filterer processor that applies
employees > 1e6. Similarly, a TableReader for countries is set up on
node 3. The results are split using a deterministic hash, and sent to
HashJoiner processors on either node 2 or 3 which join the partial results
before sorting them via a local Sorter — this parallelizes these operations
across multiple nodes. Finally, the partial sorted results are sent to the
gateway node (node 1) which uses a No-op processor to combine them and return
the final result to the client.
Distributed SQL planning and execution is carried out by
DistSQLPlanner.
Once connExecutor has obtained the logical plan and
determined
to what extent it should be distributed (it may not always make sense to
fully distribute execution), it
passes the plan
to DistSQLPlanner.PlanAndRun()
along with the SQL session's KV transaction for accessing the KV service.
The DistSQL planner begins by creating
a PhysicalPlan
that specifies the query flow, including specifications for
processors
and streams.
Planning mainly involves recursing into the planNodes of the logical
plan and
building
PhysicalPlan nodes for them (any physical plan node is also a valid plan).
Let's consider how it builds the example plan above, starting from the deepest
node:
scanNode(companies):
Creates
an initial TableReaderSpec
and then plans
its execution by figuring out
which node contains the data. This is looked up
with the help of a SpanResolverIterator,
using a kvcoord.RangeIterator
under the hood — we'll get back to this guy when we dive into the KV service,
but for now let's just say that "KV told us" which nodes have the data. If the table is
spread across many nodes we set up multiple table readers.
filterNode(employees > 1e6):
Attaches
a FiltererSpec
as a local post-processor for the TableReaderSpec.
scanNode(countries):
Same procedure as scanNode(companies).
joinNode:
Merges
the plans for the left and right subplans,
figures out
which nodes to run the join processors on,
creates
a base HashJoinerSpec
for the joiners, and adds a join stage.
This creates one join processor
on each node, hashes
the parent nodes' outputs to distribute across the joiners, and
merges
them into the correct join process.
sortNode(-employees):
Attaches
a local SorterSpec
to the parent hash joiner processors.
The plan is then finalized
by adding
a final NoopCoreSpec
stage on the gateway node which merges
the results from the parent processors (preserving order) and
adds a projection
to generate the final result.
The physical plan is now ready, and DistSQLPlanner starts
executing it.
It first generates
a FlowSpec
for each node based on the physical plan by collecting the relevant
ProcessorSpecs,
sets them up
by sending
parallel SetupFlowRequest
RPCs, and waits
until the nodes are ready. It also
sets up and returns
the local end of the flow on the gateway node. Let's have a look at what
happens on the remote nodes.
distsql.ServerImpl
implements the DistSQL gRPC service.
The SetupFlow()
method ends up dispatching
to colflow.NewVectorizedFlow()
which sets up the flow using the vectorized execution engine
(the only engine we'll discuss here). This engine represents processors and
stream infrastructure as
colexecop.Operator,
where the Next()
method returns a set of processed results as a
Batch
of column vectors — this is typically called recursively on any child
operators to obtain input data.
The vectorized flow setup iterates
over the flow processor specs, first setting
up any processor inputs.
For local inputs it simply
returns a reference
to the operator, but for remote inputs it
creates
an colrpc.Inbox
operator which receives data through a gRPC
ProducerMessage
stream set up with a FlowStream()
call. The processor setup then
creates
an operator
based on
the processor spec type — we'll get back to these shortly when we run the flow.
Finally, it sets up any
processor outputs,
by either linking it
to local operators (as we saw for inputs) or
creating
a colrpc.Outbox
that sends batches to a remote Inbox operator
serialized
as Apache Arrow. In the case of multiple outputs
(as seen with the HashJoiners in our example), it
sets up
a HashRouter
which hashes the input data and routes it to the appropriate output
operator.
The gateway node has now set up a flow of processors and streams, implemented as vectorized operators exchanging column batches over gRPC streams. What happens when the DistSQL processor runs the flow?
At the root of the vectorized flow sits a
BatchFlowCoordinator.
When it's
run
it continually pulls
a batch from its input operator
by calling Next()
and pushes it
to its output BatchReceiver —
a DistSQLReceiver
which was connected to the client session
way back in connExecutor.execWithDistSQLEngine().
This triggers a cascade of recursive Next() calls on child operators all
the way down to the flow leaves. Using the example from earlier (see diagram),
these operators are:
Noop: implemented by noopOperator
which does nothing.ordered: uses an
OrderedSynchronizer
operator to combine
the ordered inputs from the two remote input streams.Inbox/Outbox operator pairs that
receive/send
batches of data across gRPC streams.Sorter: implemented by sortOp
which buffers
the input data and then sorts it.HashJoiner: implemented by hashJoiner
which builds
a hash table from the left input and then probes it
with data from the right input,
emitting any matching combinations.by hash: implemented by HashRouter, which distributes
the input data across its outputs.Filterer: implemented by a selection operator such as
BoolVecToSelOp,
which selects
batch members that satisfy the filter predicate.TableReader: implemented by ColBatchScan
which fetches
table data from the KV service
using a cFetcher.An interesting side note is that in order to avoid the overhead of runtime type
assertions and reflection normally needed because of Go's lack of generics, the
vectorized execution engine instead uses
"execgen
templates" to generate
typed operators for all combinations of input datatypes. See for example
sort_tmpl.go
which is the input template for the type-specific sort operators in
sort.eg.go.
We've now gotten to where the rubber meets the road — or rather, where
the SQL engine meets the KV service: the
cFetcher.
We won't explore the mapping of KV data to SQL data here (see the
encoding tech note
for details), but for our purposes we can say that, in broad terms, each row in
a table is stored as a serialized binary value with a key consisting of a table
identifier followed by the row's primary key value — something like
/companies/Apple. All rows for a table are therefore stored as a contiguous
key span ordered by primary key under a common prefix.
Recall that the TableReaderSpec already includes
the key span of the table scan from back when the DistSQLPlanner was figuring
out which nodes contained the table. The ColBatchScan operator
passes
this span to the cFetcher via
StartScan().
The cFetcher itself is primarily concerned with
decoding
KV data into SQL data structures, and
delegates
to KVFetcher.
It, in turn, is also primarily concerned with
decoding — this time of MVCC metadata — and
delegates further to txnKVFetcher
where the actual fetching happens.
txnKVFetcher, as the name implies, uses the kv.Txn transaction that
was originally set up for the SQL session back in the connExecutor
to send
KV gRPC requests. Specifically, it
submits
roachpb.ScanRequests
for the requested key span and
returns
the roachpb.ScanResponse
results.
We'll now leave the SQL engine behind, and dive into the key/value service.
The KV layer of CockroachDB is a transactional, distributed key/value storage service. A full description of its architecture is outside of the scope of this article, see the architecture overview for details.
Briefly, the KV service maintains a single ordered key/value map, split into multiple contiguous ranges with a target size of 512 MB. Each range corresponds to a separate Raft consensus cluster, where each Raft node is called a replica. A single CockroachDB node contains many replicas belonging to many ranges, using a common underlying on-disk data store. Cross-range transactions are achieved through a distributed 2-phase commit protocol coordinated by the KV client.
The KV service is request-oriented, with a
Protocol Buffers-based
gRPC API defined in api.proto.
In practice, the KV client always sends
roachpb.BatchRequest,
a generic request containing a collection of other requests. All
requests have a
Header
which contains routing information (e.g. which replica a request is
destined for) and transaction
information
(which transaction's context to execute the request in). The
corresponding response is, unsurprisingly,
roachpb.BatchResponse.
Clients send KV requests via the internal
kv.DB
client interface, typically by obtaining a
kv.Txn
transaction handle from
DB.NewTxn()
and calling methods on it such as
Get()
and Put().
Recall the SQL engine using this to execute statements in the context of a
transaction. KV transactions are ACID-compliant
with serializable
isolation, so their writes only take effect after calling
Commit(),
and they can be discarded with Rollback().
Since we often want to send multiple operations in the same request
(for performance), they are grouped in a
kv.Batch
which can be obtained via Txn.NewBatch().
Calling e.g. Batch.Get()
and Batch.Put()
adds those operations to the batch, and
Txn.Run()
sends the batch to the server as a BatchRequest, populating
Batch.Results
with the results. Convenience methods such as Txn.Get() simply
create an internal batch with a single request and run it.
Let's see what happens inside Txn.Run(). Tracing the code, we
eventually get to
txn.db.sendUsingSender(ctx, batchRequest, txn.mu.sender)
which calls sender.Send(ctx, batchRequest) on the
passed sender. At this point, the request starts percolating through
a hierarchy of Senders —
objects that perform various peripheral tasks before routing
the request to a replica for execution.
Senders have a single method, Send(), which ultimately passes
the request to the next sender. Let's go down this "sending" rabbit
hole:
TxnCoordSender → DistSender → Node → Stores → Store → Replica
The first two run on the same node that received the SQL query and is doing the SQL processing (the "gateway node"), the others run on the nodes responsible for the data that is being accessed (the "range node").
Note that in the case of a DistSQL flow (see earlier section), additional
"leaf" transactions (of type LeafTxn)
are set up
on the remote processing nodes for interacting with KV, using their own local
TxnCoordSenders. However, writes always go via the "root" transaction on the
gateway node. We won't dwell on leaf transactions here, and only consider a root
transaction executing on the gateway node.
The top-most sender is the
TxnCoordSender,
which is responsible for managing transaction state (see the
"Transaction Management" section of the
design doc).
All operations for a transaction go through a TxnCoordSender instance,
and if it crashes then the transaction is aborted.
Internally it is subdivided into an interceptor stack
consisting of lockedSenders —
basically senders that share the TxnCoordSender.mu mutex and make up
a single critical section.
These interceptors are, from top to bottom:
txnHeartbeater:
creates and periodically heartbeats
the transaction record to mark
the transaction as alive, and
detects
external aborts of the transaction (e.g. by a conflicting transaction).
txnSeqNumAllocator:
assigns
sequence numbers to the operations within a transaction, for ordering and
idempotency.
txnPipeliner:
pipelines writes
by asynchronously
submitting them to consensus and coordinating them
with later operations that depend on their results.
txnSpanRefresher:
keeps track of transactional reads, and in the case of serialization errors
checks if its past reads are still valid at a higher transaction timestamp.
If they are, the transaction can continue at the higher timestamp without
propagating the error to the client and retrying the entire transaction.
txnCommitter:
manages commits and rollbacks and, in particular, implements the
parallel commit protocol.
txnMetricRecorder:
records various transaction metrics.
txnLockGatekeeper:
unlocks the TxnCoordSender.mu while requests are in flight, and enforces
a synchronous client protocol for relevant requests.
After traversing the TxnCoordSender stack, the request is passed on
to the DistSender.
The DistSender
is truly a workhorse: it handles the communication between the gateway
node and the (possibly many) range nodes, putting the "distributed" in
"distributed database". It receives BatchRequests, looks at the
requests inside the batch, figures out what range each command needs
to go to, finds the nodes/replicas responsible for that range, routes
the requests there and then collects and reassembles the results.
Let's go through the code a bit:
The request is subdivided into ranges: DistSender.Send() calls
DistSender.divideAndSendBatchToRanges()
which
iterates
over the constituent ranges of the request by using a
RangeIterator.
Recall that a single request, such as a ScanRequest, can refer to a key
span that might straddle many ranges.
A lot of things hide behind this innocent-looking iteration: the cluster's
range metadata needs to be accessed in order to find the mapping of keys to
ranges (info on this metadata can be found in the Range Metadata
section
of the design doc). Range metadata is stored as regular data in the
cluster, in a two-level index mapping range end keys to descriptors
about the replicas of the respective range (the ranges storing this
index are called "meta-ranges"). The RangeIterator logically
iterates over these descriptors, in range key order.
Brace yourself: when moving from one range to the next, the iterator calls
back into the DistSender,
which knows how to find the descriptor of the range responsible for
one particular key. The DistSender
delegates
range descriptor lookup to the
RangeCache
(a LRU tree cache indexed by range end key). This cache
desynchronizes with reality as ranges in a cluster split or move
around; when an entry is discovered to be stale, we'll see below that
the DistSender removes it from the cache.
In the happy case, the cache has information about a descriptor covering the
key we're interested in and it returns it.
In the unhappy case, it needs to look up
the range descriptor in the range database RangeCache.db. This is an
interface
which hides the DistSender itself,
which will recursively look up the range's meta-key by calling into DistSender.Send(), which calls into the range cache, and so on.
This recursion cannot go on forever. The descriptor of a regular range is in
a meta2-range (a second-level index range), and the descriptors for meta2-ranges
are present in the (one and only) meta1-range,
which is provided by Gossip.GetFirstRangeDescriptor().
Each sub-request (partial batch) is sent to its range. This is done
through a call to
DistSender.sendPartialBatchAsync()
which
truncates
all the requests in the batch to the current range and then
sends
the truncated batch to it. All these partial batches are sent concurrently.
sendPartialBatch() is where errors stemming from stale RangeCache
information is handled: the stale range descriptor is
evicted
from the cache and the partial batch is retried.
Sending a partial batch to a single range involves selecting the appropriate
replica for that range and making an RPC call to it. By default, each range is
replicated three ways, but only one of the three replicas is the "lease holder" —
the temporarily designated owner of that range, in charge of coordinating all
reads and writes to it (see the Range Leases
section
in the design doc). The leaseholder is also cached in RangeCache, and the information can get stale.
The DistSender method dealing with this is
sendToReplicas. It
will use the cache to send the request to the lease holder, but is
also prepared to try the other replicas, in order of
"proximity". The
replica that the cache says is the leaseholder is simply moved to the
front
of the list of replicas to be tried and then an RPC is
sent
to all of them, in order, until one succeeds. If the leaseholder
fails, we assume it is stale and evict it from the cache.
Actually sending the RPCs is hidden behind the
Transport interface.
Concretely, grpcTransport.SendNext()
makes gRPC calls to the nodes containing the destination replicas, via the
Internal service.
The (async) responses from the different replicas are
combined
into a single BatchResponse, which is ultimately returned from
DistSender.Send().
We've now gone through the relevant things that happen on the gateway node. Next, we'll have a look at what happens on the "remote" side — on each of the ranges.
We've seen how the DistSender splits BatchRequest into partial batches, each
containing commands local to a single replica, and how these commands are sent
to the lease holders of their ranges through RPCs. We're now moving to the
server side of these RPCs.
The Internal RPC service
is implemented by Node.
It doesn't do anything of great relevance itself, but
delegates
the request to
Stores
— a collection of on-disk data stores
(see the Architecture section
of the design doc). Stores implements the Sender interface, just like the
layers we've seen before, and
identifies
which particular store contains the destination replica (based on
request routing info
filled in by the DistSender) before
routing
the request there.
A Store
typically represents one physical disk device. For our purposes, it mostly
delegates
the request to a Replica, but one other interesting thing it does is to
update the upper bound
on the uncertainty interval for the current request in case other requests from
this transaction have already been processed on this node.
The uncertainty interval dictates which values are ambiguous due to clock skew between nodes, i.e. values that we don't know whether were written before or after the serialization point of the current transaction. This code realizes that, if a request from the current transaction has been processed on this node before, then no value written after that node's timestamp at the time of that other request being processed is ambiguous. For more details on uncertainty intervals, see the "Choosing a Timestamp" section of the design doc.
Recall that a range is a contiguous slice of the KV keyspace managed by
one instance of the Raft consensus algorithm.
A Replica
is one node in the range's Raft cluster, maintaining one copy of the range
state.
The Replica is the final Sender in our hierarchy. The role of all the other
Senders was, mostly, to route requests to the Replica currently acting as
the lease holder for the range (a primus inter pares Replica that takes on a
bunch of coordination responsibilities that we'll explore below).
A replica deals with read requests differently than write requests: reads are
evaluated directly, whereas writes will enter another big chapter in their life
and go through the Raft consensus protocol. This is readily apparent as
Replica.Send() quickly branches off
based on the request type. We'll talk about each of these paths in turn.
Before that, however, both paths are wrapped in
executeBatchWithConcurrencyRetries()
which handles concurrency control for the request batch: in a big
retry loop it will attempt to
acquire latches
(internal locks) via concurrency.Manager.SequenceReq() and then
execute the batch.
Any concurrency-related errors will be handled here, if possible, before
retrying the batch, such as write intent resolution,
transaction conflicts,
lease errors,
and so on.
Our request will most likely not arrive at the replica by itself, but along with
a variety of other requests sent by many different nodes, clients, and
transactions. The
concurrency.Manager
is responsible for deciding who goes when and in what order. Broadly, it will
allow readers to run concurrently while writers run alone, in FIFO order and
taking into account e.g. key spans, MVCC timestamps, and transaction isolation.
It supports "latches", single-request internal locks taken out for the duration
of request evaluation and replication, and "locks", cross-request transactional
locks (e.g. for an uncommitted write) that persist until the transaction is
finalized — both with read-only and read/write access modes.
First, the replica figures out
which latches and locks need to be taken out for the requests.
To do this, it looks up
the command for each request, and calls
its DeclareKeys method with the request header. This method is provided during
command registration
(see e.g. the Put command),
and will often delegate to either DefaultDeclareKey
(taking out latches) or
DefaultDeclareIsolatedKeys
(taking out locks and latches) for the keys
listed
in the header.
Once the necessary latches and locks are known, the request will pass these to
the concurrency manager and
wait for its turn
by calling Manager.SequenceReq(). This will first attempt to
acquire latches
in the latch manager, which may require it to
wait
for them to be released by other requests first. It then goes on to
acquire locks
in the lock table,
waiting
for any conflicting transactions to complete — this will also attempt to push the lock holder's timestamp forward, which may allow our transaction to run "before it", or even abort it depending on transaction priorities.
Once the request has acquired its latches and locks, it is ready to execute. At this point, the request no longer has to worry about concurrent requests, as it is fully isolated. There is one notable exception, however: write intents (new values) written by concurrent transactions, which have not yet been fully migrated into the concurrency manager, and must be handled as they are encountered. We'll get back to these later.
The first thing both Replica.executeReadBatch() and Replica.executeWriteBatch()
does is to call
Replica.checkExecutionCanProceed()
to perform a few pre-flight checks. Most importantly, it checks that the request
got to the right place, i.e. that the replica is the current lease holder —
remember that a lot of the routing was done based on caches or outright guesses.
The leaseholder check is performed by Replica.leaseGoodToGoRLocked(), a rabbit hole in its own right. Let's just say that, if the
current replica isn't the lease holder, it returns either
InvalidLeaseError,
which will cause
the replica to try acquiring the lease,
or NotLeaseholderError,
which is returned to the DistSender and redirects
the request to the real leaseholder.
Requesting a lease is done through the
pendingLeaseRequest
helper struct, which coalesces multiple requests for the same lease
and eventually constructs a
RequestLeaseRequest
that is sent for execution
directly to the replica (as we've seen in other cases, bypassing all the senders
to avoid recursing infinitely). In case a lease is requested,
redirectOnOrAcquireLease will
wait
for that request to complete and check if it was successful. If it was, the
batch will be retried. If not, the request is redirected to the actual
leaseholder.
Once the lease situation has been settled, Replica.executeReadOnlyBatch()
is ready to actually start reading. It grabs
a storage engine handle and executes
the batch on it, eventually moving control to
Replica.evaluateBatch()
which calls Replica.evaluateCommand()
for each request in the batch. This
looks up
the command for the request method in a command registry,
and passes execution
to it.
One typical read request is a ScanRequest,
which is evaluated by
batcheval.Scan.
The code is rather brief, and mostly
calls
a corresponding function for the underlying storage engine.
We're getting to the bottom of the CockroachDB stack. The
Engine
is an interface abstracting away different on-disk data stores. From
version 21.1 onwards we only support Pebble,
an embedded key/value database developed in-house based on
RocksDB/LevelDB.
RocksDB was used previously, but got replaced in pursuit of
performance, stability, and tighter integration.
Although the storage engine is a crucial part of servicing a request, for the
sake of brevity we won't go into any details on Pebble's internals here.
For reads, we typically make use of the
storage.Reader
interface, a subset of Engine. The Scan command from the previous
section passes this Reader to storage.MVCCScanToBytes(),
which is a helper function that fetches a list of key/value pairs in a
given key span from the storage engine.
The CockroachDB storage layer uses Multi-Version Concurrency Control (MVCC),
storing multiple versions of Protobuf-encoded Values
for a given key. Each version is identified by the value's
Timestamp,
and the latest version visible to a given transaction is given by its
read timestamp.
As a special case, new values (called write intents) are written
without a timestamp, and once the transaction commits these will
have to be cleaned up by rewriting them with the transaction's final
write timestamp —
more on this later.
The workhorse of MVCC reads is the
storage.MVCCIterator,
which iterates over MVCC key/value pairs in order. MVCCScanToBytes
makes use of it
with the help of pebbleMVCCScanner
to scan over the key/value versions
in the storage engine, collect
the appropriate version of each key, and
return them
to the caller.
However, if the scan encounters conflicting write intents from a different transaction, these will have to be dealt with first.
Whenever a request runs into write intents (i.e. uncommitted values) that
weren't written by its own transaction, it's facing a possible conflict with
another transaction. To resolve this conflict, the MVCC storage operation
returns a WriteIntentError containing the intents. This is
caught
further up the stack by Replica.executeBatchWithConcurrencyRetries, which
delegates
conflict resolution to concurrency.Manager.HandleWriterIntentError.
This will first add the intents to the lock table,
since it clearly did not know about them yet (or the request wouldn't have failed),
and then wait
for the conflicting transaction to complete. As part of this waiting, it will try to
push
the lock holder using the
IntentResolver
to resolve the conflicting intents.
To resolve an intent, we first need to figure out if the transaction that owns it is still pending — it might already have committed or aborted. If it is, then we can try to push it such that it restarts at a higher timestamp or gets aborted. If successful then the intents can be resolved by either replacing them with a committed value or discarding them, depending on the final transaction state. Otherwise, the current transaction must wait for it to complete.
Figuring out the other transaction's status and pushing it happens in
IntentResolver.MaybePushTransaction,
where a series of PushTxnRequests are batched and sent to the cluster (using
the hierarchy of Senders to route them to the various transactions records). If
the conflicting transaction is still pending, the decision about whether or not
it gets pushed is made
deep in the PushTxnRequest processing
based on the relative priorities of the transactions. Resolving the intents themselves
is done through calls to
IntentResolve.ResolveIntent.
Once the conflicting intents are resolved, either by pushing the other
transaction or waiting for it to complete, we can continue where we left off
back in Replica.executeBatchWithConcurrencyRetries and
retry
the request.
Write requests are conceptually more interesting than reads because they're not simply serviced by one node/replica. Instead, they go through the Raft consensus protocol which eventually commits them to a replicated ordered command log that gets applied by all of the range's replicas (see the Raft section of the design doc for details). The replica that initiates this process is, just like in the read case, the lease holder. Execution on the lease holder is thus broken into two stages: before ("upstream of") Raft and below ("downstream of") Raft. The upstream stage will eventually block until the submitted Raft command is applied locally, at which point future reads are guaranteed to see its effects.
Write requests go through much of the same preparation as reads, such as concurrency control and lease acquisition/redirection. However, it has one additional step that is closely linked to the read path: it consults the timestamp cache, and moves the transaction's timestamp forward if necessary. This structure is a bounded in-memory cache containing the latest timestamp a given key span was read, and serves to protect against violations of snapshot isolation: a key write at a lower timestamp than a previous read must not succeed (see the "Read-Write Conflicts – Read Timestamp Cache" section in Matt's blog post). This cache is updated during the read path epilogue and write path proposal epilogue.
With that out of the way, we can evaluate the request and propose resulting Raft commands.
We'll describe the process below (it will be fun), but before we do, let's see
what the current method does afterwards. The call returns a channel that
we'll wait on.
This is the decoupling point that we've anticipated above — the point where we
cede control to the Raft machinery. The Replica submitting the proposals
accepts its role as merely one of many replicas and waits for the consensus
group to make progress in lock-step. The channel will receive a result when the
(local) replica has applied the respective commands, which can happen only after
the commands have been committed to the shared Raft log (a global operation).
As promised, let's see what happens inside Replica.evalAndPropose().
We first evaluate the KV request, which turns it into a Raft command.
This is done by calling
requestToProposal(),
which quickly calls
evaluateProposal(),
which in turn calls
evaluateWriteBatch().
This last method "simulates" the execution of the request, if you will, and
records all the would-be changes to the Engine into a "batch" (these batches
are how Pebble models transactions). This batch will be
serialized
into a Raft command. If we were to commit this batch now, the changes
would be live, but just on this one replica — a potential data consistency
violation. Instead, we abort it.
It will be resurrected when the command "comes out of Raft", as we'll see.
The simulation part takes place inside
evaluateWriteBatchWrapper().
This takes in the roachpb.BatchRequest (the KV request we've
been dealing with all along), allocates an engine.Batch,
and delegates to
evaluateBatch(). This
fellow finally
iterates
over the individual requests in the batch and, for each one, calls
evaluateCommand(). We've
seen evaluateCommand before, on the read path: it looks up the
corresponding command for each request's method and
calls it.
One such method would be
batcheval.Put,
which writes a value for a key. Inside it we see a call to the
engine
to perform the write — but remember, it's all performed inside a Pebble
transaction, the storage.Batch.
This was all for the purposes of recording the engine changes that need to be
proposed to Raft. Let's unwind the stack to evalAndPropose() (where we started
this section), and see what happens with the result of
requestToProposal().
For one, we grab proposalCh,
the channel which will be returned to the caller to wait for the proposal's
local application. This gets inserted into
the pending proposals map — a structure that will make the connection between a
command being applied and executeWriteBatch() which will be blocked on a
channel waiting for the local application. More importantly, the proposal is
passed to Replica.propose
which inserts it into the
proposal buffer
where it eventually ends up being proposed to raftGroup.
This raftGroup is a handle for the Raft consensus cluster, implemented by the
Etcd Raft library. We treat
this as a black box that we submit command proposals to, and it deals with
running them through Raft consensus, committing them to the replicated command
log, and applying them onto every replica.
One notable exception here is requests with the
AsyncConsensus flag set. These
are used by pipelined writes
and the parallel commit protocol
to avoid waiting for writes to go through Raft, which significantly improves
write throughput. Instead, as we briefly touched on earlier, the
TxnCoordSender will check that these writes went through as necessary -- in
particular, when committing the transaction. For these requests,
evalAndPropose() immediately sends a result
via proposalCh, causing executeWriteBatch() to return instead of
waiting for Raft application.
We've seen how commands are "put into" Raft, but how do they "come out"? To
integrate with the Etcd Raft library we implement a state machine that it
applies log commands to. A complete description of this is beyond the scope of
this article, but suffice it to say that this involves the
raftProcessor
interface, which is implemented by our old friend Store.
The important method is
Store.processReady(),
which eventually calls back into the Replica being modified via
handleRaftReadyRaftMuLocked().
This applies committed entries
by taking in the serialized storage.Batch and applying it to the
state machine
by committing the batch to Pebble.
The command has now been applied on this particular replica, but keep in
mind that this process happens on every replica.
Let's not forget about the waiting proposer, who is still blocked on the
proposal channel back in executeWriteBatch(). After applying the command, on
the proposing replica (i.e. typically the lease holder), we need to return the
result back through the channel. This happens
at the end
of command application, where AckOutcomeAndFinish()
gets called on every command: if the command was local, it will eventually
signal
the proposer by sending
the result.
We've now come full circle: the proposer unblocks and receives its result,
unwinding the stack to let the client know that the request is complete. The
response travels back through the hierarchy of Senders, from the lease holder
node to the gateway node, through the DistSQL flow and transaction coordinator,
via the session manager and pgwire implementation, before finally reaching the
SQL client.