Back to Cockroach

Life of a SQL Query

docs/tech-notes/life_of_a_query.md

26.1.3106.4 KB
Original Source

Life of a SQL Query

Original author: Andrei Matei (updated June 2021)

Introduction

This document aims to explain the execution of a SQL query in CockroachDB, following the code paths through the various layers of the system: network protocol, SQL session management, query parsing, planning and optimization, execution, key/value service, transaction management, request routing, request processing, Raft consensus, on-disk storage engine, etc. The idea is to provide a high-level unifying view of the structure of the various components; none will be explored in particular depth but pointers to other documentation will be provided where such documentation exists. Code pointers will abound.

This document will generally not discuss design decisions, but rather focus on tracing through the actual (current) code. In the interest of brevity, it only covers the most significant and common parts of query execution, and omits a large amount of detail and special cases.

The intended audience is folks curious about a walk through the architecture of a modern database, presented differently than in a design doc. It will hopefully also be helpful for open source contributors and new Cockroach Labs engineers.

PostgreSQL Client Protocol

A SQL query arrives at the server through the PostgreSQL wire protocol. CockroachDB uses this protocol for compatibility with existing client drivers and applications.

The protocol is implemented by the pgwire package. Once a client connects, it is represented by pgwire.conn, which wraps a net.Conn socket interface. conn.serveImpl implements the main "read, parse, execute" loop: for the lifetime of the connection, we read a message (usually containing one or more SQL statements), parse the queries, pass them to connExecutor for execution, serialize and buffer the results, and send a result message to the client.

SQL Processing

CockroachDB's SQL engine sits between the client connection and the core key/value service, translating SQL queries into key/value operations. It broadly goes through the following stages:

Parsing → Logical Planning & Optimization → Physical Planning → Execution

Parsing

Parsing transforms the original SQL statement, represented as a string, into an Abstract Syntax Tree (AST) that is easier to work with. CockroachDB uses an LALR parser that is generated by goyacc from a Yacc-like grammar file. This file specifies CockroachDB's SQL dialect, and was originally copied from PostgreSQL and then modified.

As we've already seen, pgwire.conn parses client statements using a parser.Parser instance. The parser begins by scanning the string into tokens — individual pieces such as a string or number. It does this one by one until it reaches a semicolon which terminates the statement. These tokens are then lexed for some simple post-processing, and the lexer is passed to the generated parser which iterates over the tokens and applies the SQL grammar to them.

The result of this process is a parser.Statement, which wraps an AST tree.Statement, a tree structure that models the SQL syntax. For example, tree.SelectClause models the familiar SELECT clause, with recognizable fields such as From and Where. Many parts of the AST also contains tree.Expr, nested algebraic expressions such as tax / total * 100.

Note that the AST only represents the syntax of the query, and says nothing about how, or even if, it can be executed — for example, it has no idea whether a table exists or what datatype a column has. Figuring that out is the job of the planner.

Session Management

Once pgwire.conn has parsed the statement, it's placed in an sql.ExecStmt command and pushed into a statement buffer. On the other side of this buffer is a sql.connExecutor which was created during connection setup. It processes the commands in the statement buffer by repeatedly calling execCmd(), executing any statements.

As execStmt() executes statements, it records various session data such as the current database and user-defined settings, and implements a finite state machine to track the transaction state: did a transaction just begin or end, or did we encounter an error? It then branches off depending on the state of the SQL transaction: open, aborted, waiting for retry, or none.

Simple statements are primarily handled here. For example, a BEGIN statement will open a new kv.Txn transaction in the key/value service (which we'll get back to later), and a COMMIT statement will commit that transaction. If a query runs outside of a transaction, it will use an implicit transaction. Query execution uses this transaction to dispatch key/value requests.

Statement Execution

Now that we have figured out what (KV) transaction we're running inside of, we are concerned with executing statements one at a time.

There is an impedance mismatch when interfacing SQL connExecutor code, which is stream-oriented (with statements being executed one at a time possibly within the scope of a SQL transaction), and CockroachDB's key/value (KV) interface, which is request-oriented (with transactions explicitly attached to every request).

To hint at the complications: it is sometimes necessary to retry statements in CockroachDB, usually because of serializability violations or data contention. A single SQL statement executed outside of a SQL transaction (i.e. an implicit transaction) can be retried automatically. However, it is not safe to retry a SQL transaction spanning multiple client statements, since they may be conditional on client logic (i.e. different SELECT results may trigger different subsequent statements). In this case, we bubble up a retryable error to the client.

connExecutor serves as a coordinator between different components during SQL statement execution. This primarily happens in execStmtInOpenState(), which dispatches to the execution engine in dispatchToExecutionEngine(). During this process, it builds and optimizes a logical plan, then builds and executes a physical plan. Once executed, the statement result is returned to the client via a ClientComm.

Logical Planning and Optimization

We're now getting to the heart of the SQL engine. Query planning is the process of converting the SQL query AST into a tree of relational algebra operators that will produce the desired result. There can be a large number of functionally equivalent plans for a given query, with very different performance characteristics, and the optimizer will try to figure out the most efficient plan. Query planning is a complex topic that could easily fill several articles of its own, so we won't go into any depth here — see the opt package documentation for further details.

The query plan for a SQL statement can be inspected via EXPLAIN. Let's have a look at a concrete example:

sql
CREATE TABLE countries (id STRING PRIMARY KEY, name STRING);
INSERT INTO countries VALUES
    ('us', 'United States'), ('cn', 'China'), ('de', 'Germany');
CREATE TABLE companies (
    name STRING,
    country STRING REFERENCES countries(id),
    employees INT
);
INSERT INTO companies VALUES
    ('McDonalds', 'us', 1.9e6),
    ('Apple', 'us', 0.15e6),
    ('State Grid', 'cn', 1.5e6),
    ('Sinopec', 'cn', 0.58e6),
    ('Volkswagen', 'de', 0.67e6);

EXPLAIN
    SELECT companies.name AS company, countries.name AS country, employees
    FROM companies JOIN countries ON companies.country = countries.id
    WHERE employees > 1e6
    ORDER BY employees DESC;

  • sort
  │ estimated row count: 3order: -employees
  │
  └── • lookup join
      │ estimated row count: 3table: countries@primary
      │ equality: (country) = (id)
      │ equality cols are key
      │
      └── • filter
          │ estimated row count: 3filter: employees > 1000000
          │
          └── • scan
                estimated row count: 5 (100% of the table; stats collected 12 seconds ago)
                table: companies@primary
                spans: FULL SCAN

Each node in the plan is a relational operator, and results flow upwards from one node to the next. We can see that the optimizer has decided to do a full table scan of the companies table, filter the rows on employees, perform a lookup join with the countries table, and finally sort the results by employees in descending order. The optimizer could instead have chosen to e.g. use a hash join, or do the sort right after the scan, but it thought this plan would be more efficient.

The connExecutor builds a plan by using a sql.planner and calling makeOptimizerPlan() on it. Internally, it starts by converting the AST into a "memo".

memo.Memo is a data structure that can efficiently represent all of the different possible query plan trees that the optimizer explores by using memoization. Briefly, it does this by representing each query expression (either a relational expression opt.RelExpr such as scan or scalar expression opt.ScalarExpr such as =) as a "memo group" with references between them. The optimizer can then expand out each group into other equivalent expressions, such that each group represents many different methods to achieve the same result, and try to figure out which one would be best. Because it uses references between groups, it can compute and store the equivalent expressions once for each group instead of recursively repeating them for all possible combinations. For example, consider the following query and corresponding memo groups:

SELECT * FROM companies, countries WHERE companies.country = countries.id

G6: [inner-join [G1 G2 G5]]
G5: [eq [G3 G4]]
G4: [variable countries.id]
G3: [variable companies.country]
G2: [scan countries]
G1: [scan companies]

Since there are many different ways of doing the join, we can expand out G6 to represent multiple possible plans, but without having to duplicate and recompute the other groups for each join:

G6: [inner-join [G1 G2 G5]] [inner-join [G2 G1 G5]] [lookup-join [G1 G2 G5]] ...
G5: [eq [G3 G4]]
G4: [variable countries.id]
G3: [variable companies.country]
G2: [scan countries]
G1: [scan companies]

buildExecMemo() first uses an optbuilder.Builder to build an initial normalized memo from the AST. If we keep tracing the internal calls we eventually get somewhere interesting: buildSelectClause() which converts the SELECT clause from a tree.SelectClause AST into a set of memo groups. Let's keep going deeper into building the FROM clause until we get to where it builds a table name. This first resolves the table name in the system catalog, which e.g. makes sure that the table exists and that the user has access to it. It then uses the memo factory to construct a ScanExpr for the scan. Similarly, building the WHERE clause will resolve and build a ScalarExpr for the predicate and place it in a FiltersExpr.

Other AST nodes are built in a similar manner. Of particular note, scalar expressions (algebraic expressions such as tax / total * 100) are originally represented by nested tree.Expr AST nodes and implement a visitor pattern for analysis and transformation. These can be found in many parts of a query (e.g. SELECT, WHERE, and ORDER BY) and need common processing. This typically happens in resolveAndBuildScalar(), which recursively resolves names and types to yield tree.TypedExpr (which can be recursively evaluated to a scalar value), and then builds a ScalarExpr memo expression for it.

When optbuilder.Builder.Build() returns a memo, it will have carried out the following analysis and preparation of the memo group expressions:

  • Name resolution
  • Type inference and checking (see also the typing RFC)
  • Normalization

Normalization deserves further discussion. The typical example is constant folding, where a given constant scalar expression such as 1 + 2 * 3 can be pre-evaluated (normalized) to 7 to avoid re-computing it during query execution. We apply a large number of such normalization rules, and during plan exploration and optimization we also need to apply similar transformation rules to generate equivalent expressions for memo groups. We express these rules in a custom domain-specific language called Optgen. We don't have time to go into any detail here (see the optgen package documentation), but let's have a look at a simple normalization rule, FoldInEmpty:

[FoldInEmpty, Normalize]
(In * (Tuple []))
=>
(False)

This looks for IN operators with an empty tuple on the right-hand side, and replaces any matching expressions with FALSE since an empty tuple can never contain anything — i.e. SELECT 'foo' IN () is equivalent to SELECT FALSE.

These normalization rules are compiled into Go code along with a norm.Factory that builds normalized memo expressions. We've seen this factory before: the Builder used it to construct a normalized ScanExpr node for a FROM clause table name. Note that since ConstructScan() and the other factory methods immediately apply the normalization rules, they may end up returning a different kind of expression than the method name indicates — we never construct a non-normalized memo expression.

Now that we've built a normalized memo, buildExecMemo() goes on to optimize the plan using xform.Optimizer. This explores possible query plans by applying Optgen transformation rules to generate equivalent memo groups, and tries to pick the best combination through cost-based optimization based on table statistics.

The optimizer starts with the root memo group and recursively calls optimizeGroup() on group members. While recursing, it calls exploreGroup() to generate equivalent memo group expressions, repeating the process until all possibilities have been searched. It also computes the cost of each memo group expression using a Coster, which takes into account a number of factors such as the relative costs of CPU usage, sequential IO, and random IO as well as table statistics. For example, the cost of a scan is roughly the estimated number of rows multiplied by the sequential IO cost and per-row processing cost. Finally, the optimizer picks the lowest-cost memo tree and returns it.

Back in planner.makeOptimizerPlan(), we now have an optimized memo and go on to convert it into a query plan. This plan is primarily made up of planNodes (such as scanNode and filterNode) that correspond to the memo expressions. Since this process is mostly a direct transcription of the memo, we won't go into any detail here.

In the past, these planNodes would execute the query themselves by recursively calling Next() to retrieve the next result row. However, they are now mostly a historical artifact, and temporarily serve as an intermediate representation of the logical query plan. The plan will be handed off to the DistSQL (distributed SQL) engine which will generate a distributed physical query plan and execute it.

Physical Planning and Execution

Now that we have an optimized logical query plan, we're getting ready to actually execute it. However, CockroachDB is a distributed system, and fetching large amounts of data over the network can be slow. We'd much rather send the computation to the data, for example by doing WHERE filtering on the nodes containing the data.

This is managed by the DistSQL engine, which was inspired by Sawzall and MapReduce. The node running the DistSQL planner is called the "gateway node", and it will set up a flow for the query consisting of processors that exchange data via streams. To do so, it figures out which nodes contain the data we're interested in and configures them by sending RPC requests. Streams transmit row data either in memory between local processors or across the network via RPC, and converge on the gateway node where they are combined into the final result.

Let's use the example from the logical planning section, but move the companies table to node 2 and countries to node 3, using node 1 as the gateway node (having started the nodes with appropriate --locality node=X flags):

sql
ALTER TABLE companies CONFIGURE ZONE USING num_replicas=1, constraints='[+node=2]';
ALTER TABLE countries CONFIGURE ZONE USING num_replicas=1, constraints='[+node=3]';

We can then obtain a DistSQL plan by using EXPLAIN(DISTSQL), which will output a URL for a graphical diagram of the flow. We'll use the join hint INNER HASH JOIN to produce a more illustrative example.

sql
EXPLAIN(DISTSQL)
    SELECT companies.name AS company, countries.name AS country, employees
    FROM companies INNER HASH JOIN countries ON companies.country = countries.id
    WHERE employees > 1e6
    ORDER BY employees DESC;

  • sort
  │ estimated row count: 3order: -employees
  │
  └── • hash join
      │ estimated row count: 3
      │ equality: (country) = (id)
      │ right cols are key
      │
      ├── • filter
      │   │ estimated row count: 3
      │   │ filter: employees > 1000000
      │   │
      │   └── • scan
      │         estimated row count: 5 (100% of the table; stats collected 30 minutes ago)
      │         table: companies@primary
      │         spans: FULL SCAN
      │
      └── • scan
            estimated row count: 3 (100% of the table; stats collected 30 minutes ago)
            table: countries@primary
            spans: FULL SCAN

Here, we see that it sets up a TableReader processor for companies on node 2, passing rows to a local Filterer processor that applies employees > 1e6. Similarly, a TableReader for countries is set up on node 3. The results are split using a deterministic hash, and sent to HashJoiner processors on either node 2 or 3 which join the partial results before sorting them via a local Sorter — this parallelizes these operations across multiple nodes. Finally, the partial sorted results are sent to the gateway node (node 1) which uses a No-op processor to combine them and return the final result to the client.

Distributed SQL planning and execution is carried out by DistSQLPlanner. Once connExecutor has obtained the logical plan and determined to what extent it should be distributed (it may not always make sense to fully distribute execution), it passes the plan to DistSQLPlanner.PlanAndRun() along with the SQL session's KV transaction for accessing the KV service.

The DistSQL planner begins by creating a PhysicalPlan that specifies the query flow, including specifications for processors and streams. Planning mainly involves recursing into the planNodes of the logical plan and building PhysicalPlan nodes for them (any physical plan node is also a valid plan). Let's consider how it builds the example plan above, starting from the deepest node:

The plan is then finalized by adding a final NoopCoreSpec stage on the gateway node which merges the results from the parent processors (preserving order) and adds a projection to generate the final result.

The physical plan is now ready, and DistSQLPlanner starts executing it. It first generates a FlowSpec for each node based on the physical plan by collecting the relevant ProcessorSpecs, sets them up by sending parallel SetupFlowRequest RPCs, and waits until the nodes are ready. It also sets up and returns the local end of the flow on the gateway node. Let's have a look at what happens on the remote nodes.

distsql.ServerImpl implements the DistSQL gRPC service. The SetupFlow() method ends up dispatching to colflow.NewVectorizedFlow() which sets up the flow using the vectorized execution engine (the only engine we'll discuss here). This engine represents processors and stream infrastructure as colexecop.Operator, where the Next() method returns a set of processed results as a Batch of column vectors — this is typically called recursively on any child operators to obtain input data.

The vectorized flow setup iterates over the flow processor specs, first setting up any processor inputs. For local inputs it simply returns a reference to the operator, but for remote inputs it creates an colrpc.Inbox operator which receives data through a gRPC ProducerMessage stream set up with a FlowStream() call. The processor setup then creates an operator based on the processor spec type — we'll get back to these shortly when we run the flow. Finally, it sets up any processor outputs, by either linking it to local operators (as we saw for inputs) or creating a colrpc.Outbox that sends batches to a remote Inbox operator serialized as Apache Arrow. In the case of multiple outputs (as seen with the HashJoiners in our example), it sets up a HashRouter which hashes the input data and routes it to the appropriate output operator.

The gateway node has now set up a flow of processors and streams, implemented as vectorized operators exchanging column batches over gRPC streams. What happens when the DistSQL processor runs the flow?

At the root of the vectorized flow sits a BatchFlowCoordinator. When it's run it continually pulls a batch from its input operator by calling Next() and pushes it to its output BatchReceiver — a DistSQLReceiver which was connected to the client session way back in connExecutor.execWithDistSQLEngine(). This triggers a cascade of recursive Next() calls on child operators all the way down to the flow leaves. Using the example from earlier (see diagram), these operators are:

An interesting side note is that in order to avoid the overhead of runtime type assertions and reflection normally needed because of Go's lack of generics, the vectorized execution engine instead uses "execgen templates" to generate typed operators for all combinations of input datatypes. See for example sort_tmpl.go which is the input template for the type-specific sort operators in sort.eg.go.

We've now gotten to where the rubber meets the road — or rather, where the SQL engine meets the KV service: the cFetcher. We won't explore the mapping of KV data to SQL data here (see the encoding tech note for details), but for our purposes we can say that, in broad terms, each row in a table is stored as a serialized binary value with a key consisting of a table identifier followed by the row's primary key value — something like /companies/Apple. All rows for a table are therefore stored as a contiguous key span ordered by primary key under a common prefix.

Recall that the TableReaderSpec already includes the key span of the table scan from back when the DistSQLPlanner was figuring out which nodes contained the table. The ColBatchScan operator passes this span to the cFetcher via StartScan(). The cFetcher itself is primarily concerned with decoding KV data into SQL data structures, and delegates to KVFetcher. It, in turn, is also primarily concerned with decoding — this time of MVCC metadata — and delegates further to txnKVFetcher where the actual fetching happens. txnKVFetcher, as the name implies, uses the kv.Txn transaction that was originally set up for the SQL session back in the connExecutor to send KV gRPC requests. Specifically, it submits roachpb.ScanRequests for the requested key span and returns the roachpb.ScanResponse results.

We'll now leave the SQL engine behind, and dive into the key/value service.

KV: Key/Value Storage Service

The KV layer of CockroachDB is a transactional, distributed key/value storage service. A full description of its architecture is outside of the scope of this article, see the architecture overview for details.

Briefly, the KV service maintains a single ordered key/value map, split into multiple contiguous ranges with a target size of 512 MB. Each range corresponds to a separate Raft consensus cluster, where each Raft node is called a replica. A single CockroachDB node contains many replicas belonging to many ranges, using a common underlying on-disk data store. Cross-range transactions are achieved through a distributed 2-phase commit protocol coordinated by the KV client.

The KV service is request-oriented, with a Protocol Buffers-based gRPC API defined in api.proto. In practice, the KV client always sends roachpb.BatchRequest, a generic request containing a collection of other requests. All requests have a Header which contains routing information (e.g. which replica a request is destined for) and transaction information (which transaction's context to execute the request in). The corresponding response is, unsurprisingly, roachpb.BatchResponse.

The KV Client Interface

Clients send KV requests via the internal kv.DB client interface, typically by obtaining a kv.Txn transaction handle from DB.NewTxn() and calling methods on it such as Get() and Put(). Recall the SQL engine using this to execute statements in the context of a transaction. KV transactions are ACID-compliant with serializable isolation, so their writes only take effect after calling Commit(), and they can be discarded with Rollback().

Since we often want to send multiple operations in the same request (for performance), they are grouped in a kv.Batch which can be obtained via Txn.NewBatch(). Calling e.g. Batch.Get() and Batch.Put() adds those operations to the batch, and Txn.Run() sends the batch to the server as a BatchRequest, populating Batch.Results with the results. Convenience methods such as Txn.Get() simply create an internal batch with a single request and run it.

Let's see what happens inside Txn.Run(). Tracing the code, we eventually get to txn.db.sendUsingSender(ctx, batchRequest, txn.mu.sender) which calls sender.Send(ctx, batchRequest) on the passed sender. At this point, the request starts percolating through a hierarchy of Senders — objects that perform various peripheral tasks before routing the request to a replica for execution.

Senders have a single method, Send(), which ultimately passes the request to the next sender. Let's go down this "sending" rabbit hole:

TxnCoordSenderDistSenderNodeStoresStoreReplica

The first two run on the same node that received the SQL query and is doing the SQL processing (the "gateway node"), the others run on the nodes responsible for the data that is being accessed (the "range node").

Note that in the case of a DistSQL flow (see earlier section), additional "leaf" transactions (of type LeafTxn) are set up on the remote processing nodes for interacting with KV, using their own local TxnCoordSenders. However, writes always go via the "root" transaction on the gateway node. We won't dwell on leaf transactions here, and only consider a root transaction executing on the gateway node.

TxnCoordSender

The top-most sender is the TxnCoordSender, which is responsible for managing transaction state (see the "Transaction Management" section of the design doc). All operations for a transaction go through a TxnCoordSender instance, and if it crashes then the transaction is aborted.

Internally it is subdivided into an interceptor stack consisting of lockedSenders — basically senders that share the TxnCoordSender.mu mutex and make up a single critical section. These interceptors are, from top to bottom:

  • txnHeartbeater: creates and periodically heartbeats the transaction record to mark the transaction as alive, and detects external aborts of the transaction (e.g. by a conflicting transaction).

  • txnSeqNumAllocator: assigns sequence numbers to the operations within a transaction, for ordering and idempotency.

  • txnPipeliner: pipelines writes by asynchronously submitting them to consensus and coordinating them with later operations that depend on their results.

  • txnSpanRefresher: keeps track of transactional reads, and in the case of serialization errors checks if its past reads are still valid at a higher transaction timestamp. If they are, the transaction can continue at the higher timestamp without propagating the error to the client and retrying the entire transaction.

  • txnCommitter: manages commits and rollbacks and, in particular, implements the parallel commit protocol.

  • txnMetricRecorder: records various transaction metrics.

  • txnLockGatekeeper: unlocks the TxnCoordSender.mu while requests are in flight, and enforces a synchronous client protocol for relevant requests.

After traversing the TxnCoordSender stack, the request is passed on to the DistSender.

DistSender

The DistSender is truly a workhorse: it handles the communication between the gateway node and the (possibly many) range nodes, putting the "distributed" in "distributed database". It receives BatchRequests, looks at the requests inside the batch, figures out what range each command needs to go to, finds the nodes/replicas responsible for that range, routes the requests there and then collects and reassembles the results.

Let's go through the code a bit:

  1. The request is subdivided into ranges: DistSender.Send() calls DistSender.divideAndSendBatchToRanges() which iterates over the constituent ranges of the request by using a RangeIterator. Recall that a single request, such as a ScanRequest, can refer to a key span that might straddle many ranges.

    A lot of things hide behind this innocent-looking iteration: the cluster's range metadata needs to be accessed in order to find the mapping of keys to ranges (info on this metadata can be found in the Range Metadata section of the design doc). Range metadata is stored as regular data in the cluster, in a two-level index mapping range end keys to descriptors about the replicas of the respective range (the ranges storing this index are called "meta-ranges"). The RangeIterator logically iterates over these descriptors, in range key order.

    Brace yourself: when moving from one range to the next, the iterator calls back into the DistSender, which knows how to find the descriptor of the range responsible for one particular key. The DistSender delegates range descriptor lookup to the RangeCache (a LRU tree cache indexed by range end key). This cache desynchronizes with reality as ranges in a cluster split or move around; when an entry is discovered to be stale, we'll see below that the DistSender removes it from the cache.

    In the happy case, the cache has information about a descriptor covering the key we're interested in and it returns it. In the unhappy case, it needs to look up the range descriptor in the range database RangeCache.db. This is an interface which hides the DistSender itself, which will recursively look up the range's meta-key by calling into DistSender.Send(), which calls into the range cache, and so on.

    This recursion cannot go on forever. The descriptor of a regular range is in a meta2-range (a second-level index range), and the descriptors for meta2-ranges are present in the (one and only) meta1-range, which is provided by Gossip.GetFirstRangeDescriptor().

  2. Each sub-request (partial batch) is sent to its range. This is done through a call to DistSender.sendPartialBatchAsync() which truncates all the requests in the batch to the current range and then sends the truncated batch to it. All these partial batches are sent concurrently.

    sendPartialBatch() is where errors stemming from stale RangeCache information is handled: the stale range descriptor is evicted from the cache and the partial batch is retried.

  3. Sending a partial batch to a single range involves selecting the appropriate replica for that range and making an RPC call to it. By default, each range is replicated three ways, but only one of the three replicas is the "lease holder" — the temporarily designated owner of that range, in charge of coordinating all reads and writes to it (see the Range Leases section in the design doc). The leaseholder is also cached in RangeCache, and the information can get stale.

    The DistSender method dealing with this is sendToReplicas. It will use the cache to send the request to the lease holder, but is also prepared to try the other replicas, in order of "proximity". The replica that the cache says is the leaseholder is simply moved to the front of the list of replicas to be tried and then an RPC is sent to all of them, in order, until one succeeds. If the leaseholder fails, we assume it is stale and evict it from the cache.

  4. Actually sending the RPCs is hidden behind the Transport interface. Concretely, grpcTransport.SendNext() makes gRPC calls to the nodes containing the destination replicas, via the Internal service.

  5. The (async) responses from the different replicas are combined into a single BatchResponse, which is ultimately returned from DistSender.Send().

We've now gone through the relevant things that happen on the gateway node. Next, we'll have a look at what happens on the "remote" side — on each of the ranges.

RPC Server: Node

We've seen how the DistSender splits BatchRequest into partial batches, each containing commands local to a single replica, and how these commands are sent to the lease holders of their ranges through RPCs. We're now moving to the server side of these RPCs.

The Internal RPC service is implemented by Node. It doesn't do anything of great relevance itself, but delegates the request to Stores — a collection of on-disk data stores (see the Architecture section of the design doc). Stores implements the Sender interface, just like the layers we've seen before, and identifies which particular store contains the destination replica (based on request routing info filled in by the DistSender) before routing the request there.

Store

A Store typically represents one physical disk device. For our purposes, it mostly delegates the request to a Replica, but one other interesting thing it does is to update the upper bound on the uncertainty interval for the current request in case other requests from this transaction have already been processed on this node.

The uncertainty interval dictates which values are ambiguous due to clock skew between nodes, i.e. values that we don't know whether were written before or after the serialization point of the current transaction. This code realizes that, if a request from the current transaction has been processed on this node before, then no value written after that node's timestamp at the time of that other request being processed is ambiguous. For more details on uncertainty intervals, see the "Choosing a Timestamp" section of the design doc.

Replica: Executing Requests

Recall that a range is a contiguous slice of the KV keyspace managed by one instance of the Raft consensus algorithm. A Replica is one node in the range's Raft cluster, maintaining one copy of the range state.

The Replica is the final Sender in our hierarchy. The role of all the other Senders was, mostly, to route requests to the Replica currently acting as the lease holder for the range (a primus inter pares Replica that takes on a bunch of coordination responsibilities that we'll explore below).

A replica deals with read requests differently than write requests: reads are evaluated directly, whereas writes will enter another big chapter in their life and go through the Raft consensus protocol. This is readily apparent as Replica.Send() quickly branches off based on the request type. We'll talk about each of these paths in turn.

Before that, however, both paths are wrapped in executeBatchWithConcurrencyRetries() which handles concurrency control for the request batch: in a big retry loop it will attempt to acquire latches (internal locks) via concurrency.Manager.SequenceReq() and then execute the batch. Any concurrency-related errors will be handled here, if possible, before retrying the batch, such as write intent resolution, transaction conflicts, lease errors, and so on.

Concurrency Control

Our request will most likely not arrive at the replica by itself, but along with a variety of other requests sent by many different nodes, clients, and transactions. The concurrency.Manager is responsible for deciding who goes when and in what order. Broadly, it will allow readers to run concurrently while writers run alone, in FIFO order and taking into account e.g. key spans, MVCC timestamps, and transaction isolation. It supports "latches", single-request internal locks taken out for the duration of request evaluation and replication, and "locks", cross-request transactional locks (e.g. for an uncommitted write) that persist until the transaction is finalized — both with read-only and read/write access modes.

First, the replica figures out which latches and locks need to be taken out for the requests. To do this, it looks up the command for each request, and calls its DeclareKeys method with the request header. This method is provided during command registration (see e.g. the Put command), and will often delegate to either DefaultDeclareKey (taking out latches) or DefaultDeclareIsolatedKeys (taking out locks and latches) for the keys listed in the header.

Once the necessary latches and locks are known, the request will pass these to the concurrency manager and wait for its turn by calling Manager.SequenceReq(). This will first attempt to acquire latches in the latch manager, which may require it to wait for them to be released by other requests first. It then goes on to acquire locks in the lock table, waiting for any conflicting transactions to complete — this will also attempt to push the lock holder's timestamp forward, which may allow our transaction to run "before it", or even abort it depending on transaction priorities.

Once the request has acquired its latches and locks, it is ready to execute. At this point, the request no longer has to worry about concurrent requests, as it is fully isolated. There is one notable exception, however: write intents (new values) written by concurrent transactions, which have not yet been fully migrated into the concurrency manager, and must be handled as they are encountered. We'll get back to these later.

Lease Acquisition and Redirection

The first thing both Replica.executeReadBatch() and Replica.executeWriteBatch() does is to call Replica.checkExecutionCanProceed() to perform a few pre-flight checks. Most importantly, it checks that the request got to the right place, i.e. that the replica is the current lease holder — remember that a lot of the routing was done based on caches or outright guesses.

The leaseholder check is performed by Replica.leaseGoodToGoRLocked(), a rabbit hole in its own right. Let's just say that, if the current replica isn't the lease holder, it returns either InvalidLeaseError, which will cause the replica to try acquiring the lease, or NotLeaseholderError, which is returned to the DistSender and redirects the request to the real leaseholder.

Requesting a lease is done through the pendingLeaseRequest helper struct, which coalesces multiple requests for the same lease and eventually constructs a RequestLeaseRequest that is sent for execution directly to the replica (as we've seen in other cases, bypassing all the senders to avoid recursing infinitely). In case a lease is requested, redirectOnOrAcquireLease will wait for that request to complete and check if it was successful. If it was, the batch will be retried. If not, the request is redirected to the actual leaseholder.

Read Request Path

Once the lease situation has been settled, Replica.executeReadOnlyBatch() is ready to actually start reading. It grabs a storage engine handle and executes the batch on it, eventually moving control to Replica.evaluateBatch() which calls Replica.evaluateCommand() for each request in the batch. This looks up the command for the request method in a command registry, and passes execution to it.

One typical read request is a ScanRequest, which is evaluated by batcheval.Scan. The code is rather brief, and mostly calls a corresponding function for the underlying storage engine.

Engine: On-Disk Storage

We're getting to the bottom of the CockroachDB stack. The Engine is an interface abstracting away different on-disk data stores. From version 21.1 onwards we only support Pebble, an embedded key/value database developed in-house based on RocksDB/LevelDB. RocksDB was used previously, but got replaced in pursuit of performance, stability, and tighter integration. Although the storage engine is a crucial part of servicing a request, for the sake of brevity we won't go into any details on Pebble's internals here.

For reads, we typically make use of the storage.Reader interface, a subset of Engine. The Scan command from the previous section passes this Reader to storage.MVCCScanToBytes(), which is a helper function that fetches a list of key/value pairs in a given key span from the storage engine.

The CockroachDB storage layer uses Multi-Version Concurrency Control (MVCC), storing multiple versions of Protobuf-encoded Values for a given key. Each version is identified by the value's Timestamp, and the latest version visible to a given transaction is given by its read timestamp. As a special case, new values (called write intents) are written without a timestamp, and once the transaction commits these will have to be cleaned up by rewriting them with the transaction's final write timestamp — more on this later.

The workhorse of MVCC reads is the storage.MVCCIterator, which iterates over MVCC key/value pairs in order. MVCCScanToBytes makes use of it with the help of pebbleMVCCScanner to scan over the key/value versions in the storage engine, collect the appropriate version of each key, and return them to the caller.

However, if the scan encounters conflicting write intents from a different transaction, these will have to be dealt with first.

Intent Resolution

Whenever a request runs into write intents (i.e. uncommitted values) that weren't written by its own transaction, it's facing a possible conflict with another transaction. To resolve this conflict, the MVCC storage operation returns a WriteIntentError containing the intents. This is caught further up the stack by Replica.executeBatchWithConcurrencyRetries, which delegates conflict resolution to concurrency.Manager.HandleWriterIntentError. This will first add the intents to the lock table, since it clearly did not know about them yet (or the request wouldn't have failed), and then wait for the conflicting transaction to complete. As part of this waiting, it will try to push the lock holder using the IntentResolver to resolve the conflicting intents.

To resolve an intent, we first need to figure out if the transaction that owns it is still pending — it might already have committed or aborted. If it is, then we can try to push it such that it restarts at a higher timestamp or gets aborted. If successful then the intents can be resolved by either replacing them with a committed value or discarding them, depending on the final transaction state. Otherwise, the current transaction must wait for it to complete.

Figuring out the other transaction's status and pushing it happens in IntentResolver.MaybePushTransaction, where a series of PushTxnRequests are batched and sent to the cluster (using the hierarchy of Senders to route them to the various transactions records). If the conflicting transaction is still pending, the decision about whether or not it gets pushed is made deep in the PushTxnRequest processing based on the relative priorities of the transactions. Resolving the intents themselves is done through calls to IntentResolve.ResolveIntent.

Once the conflicting intents are resolved, either by pushing the other transaction or waiting for it to complete, we can continue where we left off back in Replica.executeBatchWithConcurrencyRetries and retry the request.

Write Request Path

Write requests are conceptually more interesting than reads because they're not simply serviced by one node/replica. Instead, they go through the Raft consensus protocol which eventually commits them to a replicated ordered command log that gets applied by all of the range's replicas (see the Raft section of the design doc for details). The replica that initiates this process is, just like in the read case, the lease holder. Execution on the lease holder is thus broken into two stages: before ("upstream of") Raft and below ("downstream of") Raft. The upstream stage will eventually block until the submitted Raft command is applied locally, at which point future reads are guaranteed to see its effects.

Write requests go through much of the same preparation as reads, such as concurrency control and lease acquisition/redirection. However, it has one additional step that is closely linked to the read path: it consults the timestamp cache, and moves the transaction's timestamp forward if necessary. This structure is a bounded in-memory cache containing the latest timestamp a given key span was read, and serves to protect against violations of snapshot isolation: a key write at a lower timestamp than a previous read must not succeed (see the "Read-Write Conflicts – Read Timestamp Cache" section in Matt's blog post). This cache is updated during the read path epilogue and write path proposal epilogue.

With that out of the way, we can evaluate the request and propose resulting Raft commands. We'll describe the process below (it will be fun), but before we do, let's see what the current method does afterwards. The call returns a channel that we'll wait on. This is the decoupling point that we've anticipated above — the point where we cede control to the Raft machinery. The Replica submitting the proposals accepts its role as merely one of many replicas and waits for the consensus group to make progress in lock-step. The channel will receive a result when the (local) replica has applied the respective commands, which can happen only after the commands have been committed to the shared Raft log (a global operation).

Request Evaluation

As promised, let's see what happens inside Replica.evalAndPropose(). We first evaluate the KV request, which turns it into a Raft command. This is done by calling requestToProposal(), which quickly calls evaluateProposal(), which in turn calls evaluateWriteBatch(). This last method "simulates" the execution of the request, if you will, and records all the would-be changes to the Engine into a "batch" (these batches are how Pebble models transactions). This batch will be serialized into a Raft command. If we were to commit this batch now, the changes would be live, but just on this one replica — a potential data consistency violation. Instead, we abort it. It will be resurrected when the command "comes out of Raft", as we'll see.

The simulation part takes place inside evaluateWriteBatchWrapper(). This takes in the roachpb.BatchRequest (the KV request we've been dealing with all along), allocates an engine.Batch, and delegates to evaluateBatch(). This fellow finally iterates over the individual requests in the batch and, for each one, calls evaluateCommand(). We've seen evaluateCommand before, on the read path: it looks up the corresponding command for each request's method and calls it. One such method would be batcheval.Put, which writes a value for a key. Inside it we see a call to the engine to perform the write — but remember, it's all performed inside a Pebble transaction, the storage.Batch.

This was all for the purposes of recording the engine changes that need to be proposed to Raft. Let's unwind the stack to evalAndPropose() (where we started this section), and see what happens with the result of requestToProposal().

For one, we grab proposalCh, the channel which will be returned to the caller to wait for the proposal's local application. This gets inserted into the pending proposals map — a structure that will make the connection between a command being applied and executeWriteBatch() which will be blocked on a channel waiting for the local application. More importantly, the proposal is passed to Replica.propose which inserts it into the proposal buffer where it eventually ends up being proposed to raftGroup.

This raftGroup is a handle for the Raft consensus cluster, implemented by the Etcd Raft library. We treat this as a black box that we submit command proposals to, and it deals with running them through Raft consensus, committing them to the replicated command log, and applying them onto every replica.

One notable exception here is requests with the AsyncConsensus flag set. These are used by pipelined writes and the parallel commit protocol to avoid waiting for writes to go through Raft, which significantly improves write throughput. Instead, as we briefly touched on earlier, the TxnCoordSender will check that these writes went through as necessary -- in particular, when committing the transaction. For these requests, evalAndPropose() immediately sends a result via proposalCh, causing executeWriteBatch() to return instead of waiting for Raft application.

Raft Command Application

We've seen how commands are "put into" Raft, but how do they "come out"? To integrate with the Etcd Raft library we implement a state machine that it applies log commands to. A complete description of this is beyond the scope of this article, but suffice it to say that this involves the raftProcessor interface, which is implemented by our old friend Store.

The important method is Store.processReady(), which eventually calls back into the Replica being modified via handleRaftReadyRaftMuLocked(). This applies committed entries by taking in the serialized storage.Batch and applying it to the state machine by committing the batch to Pebble. The command has now been applied on this particular replica, but keep in mind that this process happens on every replica.

Let's not forget about the waiting proposer, who is still blocked on the proposal channel back in executeWriteBatch(). After applying the command, on the proposing replica (i.e. typically the lease holder), we need to return the result back through the channel. This happens at the end of command application, where AckOutcomeAndFinish() gets called on every command: if the command was local, it will eventually signal the proposer by sending the result.

We've now come full circle: the proposer unblocks and receives its result, unwinding the stack to let the client know that the request is complete. The response travels back through the hierarchy of Senders, from the lease holder node to the gateway node, through the DistSQL flow and transaction coordinator, via the session manager and pgwire implementation, before finally reaching the SQL client.