docs/tech-notes/query_shutdown.md
Author: Yahor Yuzefovich
This document aims to explain the protocol for shutting down the distributed
query execution in CockroachDB. It covers both cases when the query completes
successfully (it has returned all necessary rows to the client) and when the
query is canceled for any reason (an error is encountered, a node crashed, the
statement timeout is reached, etc). CockroachDB has two execution engines that
use different components for the distributed execution, and this document
describes both in some details; however, both engines utilize the same
FlowStream RPC to communicate between different CockroachDB nodes, and this
RPC is covered first. Additionally, this document points out how all goroutines
that power all the necessary infrastructure are being accounted for.
Quick note is that the common FlowStream RPC section doesn't contain many code
pointers since two execution engines use different components; instead, that
section should be read as an abstract in order to get the general understanding
of the control flow. Once the general idea is obtained, the reader is encouraged
to dive into the corresponding section for the execution engine of their
interest where many code pointers can serve as a guide for the details.
In order to communicate the intermediate results of the query evaluation between
different CockroachDB nodes, the FlowStream RPC
is used. The RPC is initiated by the producer (the outbox, the "client" from the
perspective of the RPC) that wants to push those intermediate results to the
consumer (the inbox, the "server" from the perspective of the RPC). Thus,
FlowStream RPC sets up a bi-directional gRPC stream between two nodes.
Each outbox runs in a separate goroutine that is instantiated on the flow setup.
After getting the client-side of the gRPC stream by calling FlowStream, the
outbox spins up another "watchdog" goroutine to "watch" the stream.
The main goroutine is responsible for Sending data to the consumer until there
is no more data to send (i.e. the outbox tree has been fully exhausted) or until
the drain is requested by the consumer. In the end, the outbox calls CloseSend
on the stream to notify the consumer that all data has already been pushed.
Additionally, any error if encountered when performing Send is examined
io.EOF, it indicates that the FlowStream RPC was completed
gracefullyio.EOF, then the RPC was completed ungracefully.The "graceful" termination means that the consumer doesn't need any more data from the outbox (because it has satisfied its LIMIT, etc), and the query as a whole is still being executed successfully. The "ungraceful" termination though indicates that some unrecoverable error has been encountered, so the whole query should be shutdown as quickly as possible, and it is achieved by the outbox canceling the context of the whole flow.
The "watchdog" goroutine is responsible for Recving signals from the consumer.
The consumer sends a handshake message (which is basically ignored by the
outbox) and can also ask the outbox to drain (by sending the corresponding
message). Once drain is requested, the outbox collects all available metadata,
pushes it to the inbox, and then performs CloseSend.
Note that the "watchdog" goroutine exits only after receiving an error from
the stream. Because the outbox is the client of the RPC, it cannot unilaterally
close it, and by calling CloseSend it advises the server to complete the RPC
call. The error encountered on Recving is examined in the similar manner as
when Sending.
On the inbox side, the stream is also utilized by two goroutines: the "stream handler" and the "reader".
The stream handler goroutine is spun up by the gRPC to perform FlowStream RPC
that was issued by the outbox side. All stream handler goroutines are tracked by
the wait group of the flow (the setup is done in StartInternal).
These goroutines first wait for the flow (that contains their reader) to arrive
in ConnectInboundStream
(with the timeout of sql.distsql.flow_stream_timeout, 10 seconds by default),
and then block until either the flow context is canceled or their reader tells
the stream handler to exit.
Importantly, the error (or nil) returned by
flowinfra.InboundStreamHandler.Run
becomes the result of the FlowStream RPC call overall. In other words, this is
what the "watchdog" goroutine of the outbox side will Recv last. Each stream
handler goroutine is released from the wait group of the flow when it exits from
the Run method.
The reader goroutine is responsible for both Recving the data produced by the
outbox and for Sending signals (like a drain request) to the producer. See
the corresponding section for details on when this goroutine is created since it
differs between the execution engines.
The reader goroutine keeps on running until it Recves an error which is then
examined in the same way as on the outbox:
io.EOF is received when the outbox called CloseSend to indicate that
everything has already been sent across the stream. This results in the stream
handler returning nil to indicate the graceful completion of the FlowStream
RPC.io.EOF error indicates the ungraceful termination of the gRPC stream
(most likely either because the context of the stream is canceled on the outbox
side or because the connection is broken - possibly because a node crashed).In the vectorized execution engine, the distributed query is shutdown in the
same manner regardless of the reason (i.e. both graceful and ungraceful
completions). The shutdown here relies on the assumption that all components of
the flow use the context.Context that is a descendant of the "flow context";
thus, canceling the flow context leads to all infrastructure on that node that
is part of the distributed query to exit.
The shutdown protocol is as follows:
FlowCoordinator
or BatchFlowCoordinator)
which cancels the flow context (1,
2)
at the end of the query execution. It doesn't matter whether the query resulted
in an error or not because we know that this flow context cancellation will not
be propagated to the client who issued the query, so it's totally fine that
flows on other nodes can think of this shutdown as ungraceful.FlowStream RPCs for which the gateway node is the
inbox host (the consumer).We also have to consider the case when all outboxes on a remote node have already exited gracefully before the flow context cancellation is performed on the gateway node. In such case, each outbox cancels its own "outbox" context which terminates all of the components (including each input goroutine of the parallel unordered synchronizer) that are part of the tree with this outbox at the root. The fact that the outbox exited indicates that the stream handler goroutine on the inbox side for the same gRPC stream has also exited.
Another source of the concurrency in the vectorized flows is the hash router which runs in a separate goroutine and listens on the flow context for cancellation. Thus, we separately have to consider how the hash router is being shutdown.
routerOutputOps
(outputs of the hash router) will get a zero-length batch iff the hash router's
input returned a zero-length batch too, which means that the hash router has
already exited as well.The main goroutine of the outbox is tracked by the wait group of the flow.
It first establishes the connection to the target node and calls FlowStream
RPC, importantly, using the flow context.
The usage of the flow context when instantiating the RPC allows the outbox to
cancel its own context without shutting down the gRPC stream prematurely.
Once the gRPC stream is established, another "watchdog" goroutine is spun up. This goroutine is not tracked by any wait group, but the main goroutine blocks before exiting until the watchdog routine exits. TODO(yuzefovich): we should probably track the watchdog goroutine with the flow's wait group as well as use the stopper to spin it up.
Then the main goroutine goes into sendBatches
loop where it reads batches from the input and sends them over the stream to the
consumer. That loop terminates in several ways:
DrainMeta
is not called on the sources of metadata), and the outbox terminates.See Metadata handling for some more information about draining and metadata.
The stream handler goroutine non-blockingly passes the gRPC stream to the reader goroutine and then performs two blocking actions:
If - while waiting during either of the above two blocking actions - the stream handler observes that the stream context or the flow context is canceled, then it exits with the corresponding error (here and here). TODO(yuzefovich): rearrange blocks to be in the same order for both blocking actions. If the reader context is canceled, then in order to protect against a possible race between the reader context and the flow context (which must be the ancestor of the reader context) being canceled, the flow context is explicitly checked for cancellation as well.
Although we do listen on the stream context cancellation, that seems to not be
strictly necessary (because when the stream context is canceled, an error will
be observed when Sending/Recving on the stream). Still, it seems cleaner to
explicitly listen on the stream context too.
The reader goroutine is not spun up explicitly by the stream handler, instead,
it is "fused" with the reader's consumer. If the inbox is pushing into a
synchronizer, then it'll be a separate input goroutine created by
the parallel unordered synchronizer or the only goroutine of the serial (either
ordered or unordered) synchronizer. If the inbox is feeding into a
colexecop.Operator, then it'll be that operator's goroutine.
The reader goroutine can exit in several ways:
Recved from the stream. It is examined as usual, with
non-io.EOF errors are marked as "inbox communication errors".
Such errors are propagated both to the consumer of the inbox
and to the stream handler
goroutine (so they become the result of FlowStream RPC).FlowStream RPC ungracefully since that would trigger the immediate
cancellation of all other RPC calls, and we still are interested in draining the
metadata.
TODO(yuzefovich): I think it's not necessary to close the inbox explicitly in
this case - we won't be able to drain the metadata from the outbox since the
stream handler has exited.statement_timeout implementationstatement_timeout in CockroachDB is implemented by setting up a timer
which upon firing cancels the query. The query cancellation is done by
canceling
the context of the transaction. The flow context on the gateway node is a
descendant of the transaction context, so canceling the latter results in
exactly the shutdown protocol described above.
Before we can talk about the shutdown protocol in the row-by-row engine, we need
to mention the abstractions that the engine is using, namely RowSource
and RowReceiver
interfaces. The RowSource on Next
call returns a row or a metadata object which is then Pushed into the
RowReceiver. In return, the RowReceiver shares its possibly updated
ConsumerStatus.
Essentially all Processors
implement the RowSource interface, and we use different components to connect
the processors together (routers, RowChannels,
synchronizers), possibly on different nodes. Such chaining allows for
propagation of the ConsumerStatus to all processors of the distributed plan.
Most processors delegate the implementation of Processor.Run
method to the embedded ProcessorBase
which Starts itself and then goes into the Run
loop. The loop continuously gets the next row or metadata object from the
RowSource (the processor embedding the helper), pushes that to the
RowReceiver, and then examines the status to see whether any changes of the
control flow are requested by the consumer.
At the root of the flow on the gateway we have a special DistSQLReceiver
guy that implements the RowReceiver interface, and it is where the results of
the query are ultimately Pushed into. The gateway flow is being Run
which means starting all processors except for the last one in separate
goroutines and running the "root" processor in the flow's goroutine.
(There is a caveat that we try to "fuse" components - that feed directly into
each other without using concurrency - to run in the same goroutine, but we'll
ignore that for this discussion.) The output of the root processor is always the
DistSQLReceiver.
Let's first discuss the way the distributed query is shutdown gracefully when
all processors have been exhausted fully. In such a scenario each processor goes
from StateRunning
to StateDraining
by calling MoveToDraining.
That function notifies the processor's input that it should be drained (by
calling ConsumerDone), and then the processor is using DrainHelper
to first fetch all of the metadata from its input and then return any trailing
metadata for the processor itself. Once all trailing metadata
has been propagated, the processor is closed automatically (by calling
InternalClose).
At this point the processor's input must have already been closed too since it
was fully exhausted when the metadata was drained.
The same transitions apply to the DistSQLReceiver's input - the root processor
on the gateway: the processor is drained first, and then it is closed. These
transitions are noticed by other processors running on the gateway node because
the RowReceivers that those other processors are pushing into have a change of
the ConsumerStatus in that Run
loop. Right before Run exits ProducerDone is called on the RowReceiver.
Apart from the processors, there are two other sources of concurrency in the
row-based flows: routers and outbox/inbox streams. The routers get notified
about the closure when ProducerDone is called, so that case is already
handled; however, the outbox/inbox streams deserve a bit more attention and they
are discussed below, but for now we'll just assume that they also get the
information about these transitions. Thus, all goroutines of all flows of the
distributed query first transition into draining and then exit.
Another way of gracefully completing the query but without fully exhausting the
processors is satisfying the LIMIT clause. This is achieved by the root
processor moving to draining once the limit is reached (this is done in
ProcessorBase.ProcessRowHelper).
Whenever the query execution encounters an error, we can think of that query
being completed "ungracefully". In such a case, regardless of where the error
occurred, it is propagated as a metadata object and eventually reaches the
DistSQLReceiver.
The DistSQLReceiver then transitions
into draining, and the shutdown is performed in the same manner as described
above.
The same process is followed in case of the gRPC infrastructure failure because
an error will eventually be pushed into the DistSQLReceiver. Notably, even if
the flow context on the gateway node is canceled (because an outbox on the
gateway observed ungraceful termination of the FlowStream RPC), the
DistSQLReceiver will still transition to draining first in SetError
because the DistSQLReceiver's ctx is the transaction context, and it hasn't
been canceled.
The main goroutine of the outbox is tracked by the wait group of the flow.
Similar to the vectorized case, it first establishes the connection to the
target node and performs the FlowStream RPC using the flow context.
Next, a separate "watchdog" goroutine is spun up using the stopper
which is responsible for Recving from the gRPC stream (mainly to listen for
drain requests as well as to any errors). This goroutine is not tracked
explicitly, and it will exit once an error is Recved from the gRPC stream.
TODO(yuzefovich): track this goroutine with the wait group.
Tha main goroutine goes into the loop
where it reads from the input (either a processor or a router) and writes to the
gRPC stream. Rows and metadata are encoded in AddRow
into messages which are flushed
to the consumer once 16 rows
have been accumulated or 100 microseconds
have passed since the last flush, the exception is only made to errors
which are flushed immediately.
The main goroutine exits from the loop when one of the following events occurs:
ProducerDone has been called on the outbox's input which resulted in the
closure of the embedded RowChannel.
This includes the case when the draining was requested (the "watchdog" goroutine
notifies the main goroutine about it, and then the main goroutine only calls
AddRow for metadata objects after calling ConsumerDone on its input).
As a reminder, ProducerDone is always called at the end of the Run loop
of the processor.AddRow is called
(either a communication error because of some gRPC problem or an encoding
error).Recves an error from the gRPC stream
io.EOF, then the main goroutine exits gracefully.io.EOF, then the main goroutine cancels the context of the
flow and exits ungracefully.ConsumerClosed is always
called on the input to the outbox regardless of the way it exited its main loop.
Unlike in the vectorized engine, the row-by-row engine doesn't have an explicit
component that is responsible for the consumer-side of the FlowStream RPC;
however, it still involves two goroutines.
The "stream handler" goroutine (that is spun up by the gRPC framework) first creates the "reader" goroutine and then proceeds to block until either the flow context is canceled or the reader has communicated that it is done. TODO(yuzefovich): use the stopper to spin up the reader goroutine.
The "reader" goroutine is tracked by the wait group of the flow,
and it keeps on Recving
from the gRPC stream. Any error, if encountered upon Recving, is examined in
the same fashion as usual and is communicated to the stream handler.
Additionally, non-io.EOF errors are marked as "inbox communication errors".
The received messages are decoded
and then pushed
to the consumer of this inbound stream. If that consumer asks for draining, then
the drain signal is sent
on the gRPC stream.
statement_timeout implementationAs described in the same section for the vectorized engine above,
statement_timeout is implemented by canceling the context of the transaction.
Notably, this is an ancestor of the context that the DistSQLReceiver
operates with.
Once the transaction context is canceled, the DistSQLReceiver does not itself
immediately notify the query infrastructure about the cancellation; however, the
next time anything is pushed into the DistSQLReceiver, it notices the
cancellation.
The ConsumerStatus is changed
to ConsumerClosed which triggers the shutdown protocol described above.
Additionally, since the flow context of the gateway node is canceled (because it is a descendant of the transaction context), all open gRPC streams for which the gateway is the inbox host terminate ungracefully. This, in turns, makes the corresponding outboxes to cancel the flow contexts of their hosts and exit too. This context cancellation propagation speeds up the query shutdown.
All metadata objects (parts of the "control" plane of the query execution) can be divided into two types:
LeafTxnFinalState
which contains all KV spans that have been read on a node and that need to be
propagated to the gateway to perform a refresh.A quick note on the way metadata is handled is worthwhile: unlike in the
row-by-row engine where the metadata is propagated alongside the actual rows of
data (i.e. both "data" and "control" planes operate through the same Next
method
although not simultaneously), in the vectorized engine the metadata that
originates during the query execution is buffered in the "metadata sources".
Those "sources" are then drained after all the actual data has been processed,
at the end of the query execution. The only exception is made to the errors
propagated as metadata which are not buffered and are thrown
immediately.