Back to Materialize

Life of a Query

doc/developer/life-of-a-query.md

12323.6 KB
Original Source

Life of a Query

Inspired by CRDB: Life of a SQL Query.

Introduction

This document aims to provide an overview of the components involved in processing a SQL query, following the code paths through the various layers: network protocol, session management, the adapter, query parsing, planning, and optimization, interaction of the adapter layer and clusters, the storage layer, and delivery of results from clusters, back through the adapter layer to the client.

We don't discuss design decisions and historical evolution but focus on the current code. We also only cover the common and most significant parts and omit discussing details and special cases. In the following, we will discuss how SELECT queries are processed, but other types of queries will use the same components, if in slightly different ways.

PostgreSQL Client Protocol

Queries arrive at Materialize through the PostgreSQL wire protocol.

The protocol is implemented in the pgwire crate. Incoming connections are handled in handle_connection, which passes on to run, and ultimately we create a StateMachine whose run handles the connection until completion.

The state machine has an adapter client (SessionClient) that it needs to use to accomplish most "interesting" things. It's a client to the adapter layer, with its main component, the Coordinator. The work of processing queries and maintaining session state is split between the "front end" state machine that runs the pgwire protocol and the "back end" adapter layer that handles talking to the other components.

[!NOTE] We will keep using the terms adapter front end and adapter back end below to mean, respectively, the state machine that is responsible for a single connection and can do work concurrently with other connections, and the adapter/Coordinator which has to sequentialize work and is therefore more limited in the amount of work it can do concurrently.

If it is clear from context, we will drop the adapter prefix.

Adapter / Coordinator

The adapter is named such because it translates user commands into commands that the other internal systems understand. It currently understands SQL but is intended as a generic component that isolates the other components from needing to know details about the front-end commands (SQL).

A core component is the Coordinator. It mediates access to durable environment state, kept in a durable catalog and represented by the Catalog in memory, and to the controllers for the storage and compute components (more on that later).

It holds on to mutable state and clients to other components. Access and changes to these are mediated by a "single-threaded" event loop that listens to internal and external command channels. Commands are worked of sequentially. Other parts can put in commands, for example, the front end calling into the adapter is implemented as sending a command which then eventually causes a response to be sent back, but the Coordinator will also periodically put in commands for itself.

[!NOTE] Most of the time, when talking of the controller or controllers, we are talking about the compute and storage controller, which are driving around computation that is happening on clusters or cluster replicas, to be more precise. And you will sometimes hear people say just controller when they mean the ensemble of the different controllers.

[!NOTE] There is active work in progress on changing this design because it doesn't lend itself well to scaling the amount of work that the adapter can do. We want to move more work "out to the edges", that is towards the front end (which can do work for different connections/sessions concurrently) and the controllers for the other components. See Design Doc: A Small Coordinator for details.

Query Processing

Query processing follows these steps:

parsing & describing → query planning → timestamp selection → optimization → execution

Parsing & Describing

The Materialize parser is a hand-written recursive-descent parser that we forked from sqlparser-rs. The main entry point is parser.rs.

Parsing happens completely in the front end, it produces an AST of Statement nodes. The AST only represents the syntax of the query and says nothing about how or if it can be executed.

Describing is the process of figuring out the result type of a statement. For this, the front end needs access to the Catalog, for which it needs to call into the adapter.

Both parsing and describing happen in the front end, with calls into the adapter as needed. All further steps are orchestrated by the adapter/Coordinator: the front end passes the AST as a command and will become involved again when sending results back to the client.

Query Planning

Query planning generates a Plan from the AST. Glossing over some details, this first binds referenced names based on the Catalog as of planning time. Then plan determines an execution plan from the resolved AST.

One of the differences between the user (SQL) commands that are input to the adapter and commands for the rest of the system is the use of user-defined and reusable names in SQL statements rather than the use of immutable, non-reusable IDs. The binding mentioned above turns those user-defined names into IDs.

As mentioned in the previous section, query planning happens inside the Coordinator.

Timestamp Selection

Another difference between user commands and internal commands is the absence of explicit timestamps in user (SQL) commands, quoting from formalism.md:

A SELECT statement does not indicate when it should be run, or against which version of its input data. The Adapter layer introduces timestamps to these commands, in a way that provides the appearance of sequential execution.

The internal interface that we use for determining timestamps is TimestampOracle. The production implementation is an oracle that uses CRDB as the source of truth. See Design Doc: Distributed Timestamp Oracle for more context.

Optimization & Physical Planning

There are multiple stages to optimization and different internal representations, and different types of queries or created objects will instantiate different optimizer pipelines. The optimization pipeline for SELECT is this snippet.

The final result of these stages will depend on the type of query we're optimizing, but for certain types of SELECT and permanent objects it will include a DataflowDescription. Which is a physical execution plan that can be given to the compute layer, to execute on a cluster.

For more details on query compilation and optimization, see 101-query-compilation.md.

Execution on a Cluster

For SELECT, which is internally called PEEK, there are three different execution scenarios:

  • Fast-Path Peek: there is an existing arrangement (think index) in the cluster that we're targeting for the query. We can read the result from memory with minimal massaging and return it to the client (through the adapter).
  • Slow-Path Peek: there is no existing arrangement that we can query. We have to create a dataflow in the targeted cluster that will ultimately fill an arrangement that we can read the result out of.
  • Persist Fast-Path Peek: there is no existing arrangement but the query has a shape that allows us to read a result right out of a storage collection (in persist).

The result of optimization will indicate which of these scenarios we're in, and the adapter will now have to talk to the compute controller to implement execution of the query.

Ultimately, for all of these scenarios the adapter will make a call into the compute controller to read out a peek result. For slow-path peeks it will first create the dataflow, but the functionality for reading our the result is the same for fast path and slow path after that. The entry point for setting of the actual peek is peek. And the entry point for creating a dataflow is create_dataflow.

The adapter will pass the sending endpoint of a channel to the compute controller, for sending back the results, and then setup up an async task that reads from that channel, massages results, and sends them out as another stream. The receiving end of that second stream is what the adapter returns in a ExecutionResponse::SendingRowsStreaming, to the adapter front end, which handles sending results back out to the pgwire client.

Dataflows & Arrangements

Clusters run dataflow computations, and within those dataflows, arrangements are the structure that data is being kept in. Arrangements are used both for keeping data internal to the dataflow but also for incrementally maintained results. For example, an index is a dataflow where at the end we have an arrangement that a peek can read out of.

On the more technical side, arrangements are multi-versioned indexes. The arrangements documentation describes them as an indexed representation of a stream of update triples (data, time, diff), organized by key for efficient lookups. For more background, see also the Shared Arrangements paper.

Controller <-> Cluster Protocol

The adapter talks to the controllers, and the controllers in turn use a protocol for talking to the clusters and getting responses back. For historic reasons, compute and storage have separate protocols even though today they can have computation running on the same cluster. The compute protocol commands are defined in ComputeCommand, and responses are defined in ComputeResponse. Anything from creating dataflows, setting up peeks, sending back peek responses, dropping dataflows, etc. happens using this protocol. For storage, the commands and responses are defined in StorageCommand and StorageResponse, respectively.

For the purposes of PEEK (aka. SELECT, remember) processing, the interesting commands/responses are:

  • ComputeCommand::CreateDataflow for creating a dataflow in case of a slow-path peek
  • ComputeCommand::Peek for initiating the actual peek
  • ComputeResponse::PeekResponse with the peek result

Between the controller and the cluster sits a "consolidation layer" that handles the details of multi-process cluster replicas. For example, PeekResponses from all replica processes get combined and get surfaced to the controller as only one PeekResponse. For peeks, this gets handled in absorb_peek_response.

Peek "Extraction"

In some way or other, a peek requires that a result be extracted from an arrangement or straight from persist. In handle_peek we set up the machinery for processing a peek and the method that drives peeks to completion is process_peeks.

From the perspective of the cluster, there are two ways of extracting peek results. Both fast-path and slow-path peeks ultimately look the same to the cluster: we extract a result from an arrangement. It's just that for slow-path peeks the dataflow and arrangement are created just for that peek. This kind of peek is set up in here in handle_peek.

A persist fast-path peek does not require an arrangement for extracting a result. Instead a result is extracted straight from persist with potential filters and projections applied. The entry point for that is this code, in handle_peek.

Results can be sent back in two ways: inline in the protocol via ComputeResponse, or using the peek result stash. The latter allows for sending back larger results but there are certain restrictions on what shapes of queries are eligible to use the stash. This has to do with post processing, which we will describe below. Roughly, queries that don't require post processing can use the peek stash, so queries that have OFFSET, LIMIT, or ORDER BY cannot use the large result stash.

For all queries, we start out trying to use the inline result protocol but then fall back to the stash when we notice the result size getting too large and a query is eligible. This code inside process_peek is where that decision happens.

Delivering Peek Results and Post Processing

Peek results get sent back to the controller from the cluster, inside a ComputeResponse::PeekResponse (code), either inline or using the large result stash. From there they have to get routed through the adapter and out through the pgwire protocol to the client.

For this, the adapter sets up an async task that waits for the peek response from the controller, applies post processing when required and then yields it to a stream. This stream in turn is what gets returned to the adapter front end (which drives the pgwire state machine, remember) to return the result back to the client. This is somewhat reminiscent of continuation-passing style, where the components return "results" early and the whole pipeline for returning the result is established, but actual data will only flow once the result comes back from the cluster.

The code that sets up the async task in the adapter is create_peek_response_stream. Which in turn gets wrapped in a ExecuteResponse::SendResponseStreaming that makes its way back to the front end code.

As a last step, we then have this code in the front end that sets up the last piece of the pipeline for sending results back out to the client once they start flowing.

One final piece of the pipeline for delivering query results is what this document calls post processing but which the code calls "finishing" (see RowSetFinishing). We apply any ORDER BY, LIMIT, or OFFSET clauses only after results get back from the clusters, before sending the result out to the client. This happens within the async task that routes results from the controller to the front end, in this code. As mentioned above, the shape of the Finishing determines the eligibility of a query for the large result stash feature. Queries with ORDER BY or non-identity projections cannot use the stash (see RowSetFinishing::is_streamable).

Query Processing: A Flow Chart

Query Processing: CPU Work & Network Hops

The above outlined the components involved in processing a SELECT query and the interactions between them. We didn't explicitly mention how expensive the different steps are (in terms of CPU cycles) and which steps involve network hops rather than function calls within a process.

The CPU intensive parts of query processing are:

  • Optimization
  • Dataflow computation

A peek only requires specific dataflow computation when it is a slow-path peek. Otherwise, reading out of the arrangement is considered cheap.

Of the interactions outlined above in the flow chart, the network hops are:

  • The query from client to the adapter front end, via pgwire
  • Getting a timestamp from the timestamp oracle: this involves doing a read query against CRDB
  • The ComputeCommand::CreateDataflow command sent from controller to the cluster, optional and only required for slow-path peeks
  • The ComputeCommand::Peek command sent from controller to the cluster
  • The ComputeResponse::PeekResponse sent from the cluster to the controller
  • The result sent back from the adapter front end to the client

Details

Details on components and concepts that we didn't explain above.

Durable Storage

At the coarsest level, a Materialize Environment has two types of durable state:

  • The Catalog, which is the metadata about what objects exist, settings, roles, clusters, replicas, etc.
  • Collection data, which is both builtin collections and user collections, such as Tables, Sources, Materialized Views, etc.

Both of these types of data are stored in persist. The Catalog is stored in a single persist shard and collection data is also stored in persist shards.

Persist

Persist is Materialize's durable storage component as described in the (now maybe slightly outdated) persist design document.

The core abstraction is a "shard" - a durable TVC that can be written to and read from concurrently. Persist uses a rich client model where readers and writers interact directly with the underlying blob storage (typically S3) while coordinating through a consensus system for metadata operations.

Persist is built on two key primitives: Blob (a durable key-value store) and Consensus (a linearizable log). The blob storage holds the actual data in immutable batches, while consensus maintains a state machine that tracks metadata like shard frontiers, active readers/writers, and batch locations.

Clusters and Replicas

Clusters are containers for certain SQL-level objects. They represent compute resources and the objects that they contain are said to "run on them". Objects that need to run on a cluster are, among others: indexes, materialized views, sources, and sinks. Most queries also have to be executed on a cluster, in which case they use that clusters compute resources.

Clusters represent compute resources, but by themselves don't have any resources. A cluster must be backed by one or more replicas for actual computation to happen. Replicas come in multiple sizes that represent different allocations of CPU cores and amount of memory. Finally, replicas are backed by one or more clusterd processes/pods, depending on their size.

Common scenarios are:

  • a cluster with no replicas, for saving on resources while not needed
  • a cluster that permanently has multiple replicas, for high availability
  • a cluster that temporarily has multiple replicas, when reconfiguring from one replica size to another replica size without downtime

Compute & Storage Controllers

The adapter interacts with clusters and storage collections through two main controllers: the ComputeController and the StorageController. These controllers act as intermediaries that translate adapter commands into cluster commands and manage the life cycle of compute and storage resources.

The ComputeController manages "compute flavored" dataflows running on clusters (or rather cluster replicas, to be precise). It handles the creation and maintenance of indexes, materialized views, and other dataflows, talking to cluster replicas via the compute protocol.

The StorageController manages storage collections including sources, tables, and sinks. For ingestion from external systems, it needs to install computation on a cluster. Similarly to the compute controller, communication with the storage parts on a cluster replica happens via the storage protocol.

Both controllers maintain read and write capabilities for their respective resources, coordinate compaction policies, and ensure that data remains accessible as long as needed.