Back to Cockroach

Summary

docs/RFCS/20220129_sqlproxy_connection_migration.md

26.1.327.7 KB
Original Source

Summary

Migrate SQL connections from one SQL pod to another within the SQL proxy.

Motivation

Tenants running on CockroachDB Serverless are subjected to scale-up and scale-down events by the autoscaler due to a variation in SQL traffic. Scale-up events are often triggered when the CPU loads on existing pods are too high. When that happens, new SQL pods are created for that given tenant. However, CPU loads on existing pods may still be high since connections remain in their initial pods, leading to an imbalance of load usage across all instances. This imbalance may also occur during a steady state when existing connections fluctuate in their CPU usages. On the other hand, during a scale-down event from N + 1 to N pods, where N > 0<sup>1</sup>, pods transition into the draining phase, and the proxy stops routing new connections to them. This draining phase lasts for at most 10 minutes, and if connections are still active at the end of the phase, they will be terminated abruptly, which can lead to poor user experiences.

The connection migration mechanism builds on top of the SQL session migration work by the SQL Experience team, and solves the aforementioned issues by enabling us to transfer connections from one pod to another. In particular, connections can be transferred from draining pods to running ones during scale-down events, and from busy SQL pods to lightly loaded ones during scale-up events and steady state. The success criteria of this mechanism is that there should be minimal rude disconnects during Serverless scale-down events, and fewer hot spots through better tenant load rebalancing. From the user's perspective, the connection remains usable without interruption, and there will be better resource utilization across all provisioned SQL pods.

Background

In multi-tenant CockroachDB, many tenant clusters may exist on a single shared host cluster. In the context of CockroachDB Serverless, these tenant clusters may have zero or more SQL pods, depending on their loads. The SQL proxy component is a reverse proxy that is used to route incoming traffic for a given tenant to one of its active SQL pods. This SQL proxy component is what end users connect to in the Serverless offering. For more details about the Serverless architecture, see this blog post: How we built a forever-free serverless SQL database.

For every client connection, the proxy selects a backend based on some routing algorithm, performs authentication with it, and does a transparent forwarding of all packets from the client to the server without intercepting messages after the authentication phase.

Requirements

This RFC is only scoped to how to perform the migration from one SQL pod to another within the SQL proxy. The details of when to perform the migration will be discussed in a follow-up RFC. Note that the main use-case here is for CockroachDB Serverless, and we should optimize for that.

The design is constrained by the following requirements:

  1. Correctness of Postgres protocol flow and data transfer:
    • Pgwire messages should not be corrupted during transmission.
    • Messages received by the client must be correct, i.e. the client should only receive responses for requests that it initiates.
  2. Zero allocations and copying during connection proxying in steady state.
  3. Connection migrations are on a best-effort basis as it is known that not every connection can be transferred (e.g. sessions with temporary tables or active transactions).
  4. Proxy can support up to 50K active connections across all tenants, so per-connection resource overhead (e.g. memory) should be taken into account where appropriate.

Technical Design

The design is broken down into three parts. The first is message forwarding, which describes how io.Copy can be replaced. The second is connection establishment, which means selecting a new SQL pod, and authenticating in a way that the connection can be used by the existing SQL session. Finally, the third is the actual connection migration mechanism. Throughout this design, a couple of new components will be introduced: connection interceptors, connector, and forwarder.

Message Forwarding

The proxy's main purpose is to forward Postgres messages from the client to the server (also known as SQL pod), and vice-versa. In the case of client to server, we will need to be able to:

  • pause the forwarding of client messages,
  • send a proxy-crafted Postgres message to the server to retrieve the connection's state,
  • connect to a different SQL pod, and
  • resume forwarding of client messages after the connection's state has been reloaded.

Note that connection authentication details have been omitted here, and will be discussed at a later section. All these operations require that some component is aware of Postgres message boundaries.

We propose to make the proxy aware of pgwire message boundaries. This implies that the current forwarding approach through io.Copy will no longer work. We will introduce a new connection interceptor component that provides us a convenient way to read and forward Postgres messages, while minimizing IO reads and memory allocations. At a steady state, no allocations should occur, and the proxy is only expected to parse the message headers, which is a byte for message type, and an unsigned int32 for length.

Since we already know the message length from the header, we could just use io.CopyN to forward the message body, after forwarding the headers to the server. One immediate drawback to this direct approach is that more system calls will be incurred in the case where workloads have extremely small messages (e.g. kv). To solve that, we will introduce an internal buffer within the interceptor to read messages in batches where possible. This internal buffer will have a default size of 8KB, which is the same as Postgres' send and receive buffers of 8KB each. Each connection uses two interceptors: one for client to server, and the other for server to client, so at 50K connections, we are looking at a memory usage of 800MB (8KB x 2 x 50,000), which seems reasonable.

If a Postgres message fits within the buffer, only one IO Write call to the server will be incurred. Otherwise, we would invoke a single IO Write call for the partial message, followed by io.CopyN for the remaining bytes. Using io.CopyN directly on the remaining bytes reduces the overall number of IO Write calls in the case where the entire message is at least double the buffer's size. Even with an internal buffer, we do not restrict the maximum number of bytes per message. This means that the sql.conn.max_read_buffer_message_size cluster setting that sets the read buffer size in the SQL pod will still work on a per-tenant basis.

On a high-level overview, the interceptor will have the following API:

go
// PeekMsg returns the header of the current pgwire message without advancing
// the interceptor. On return, err == nil if and only if the entire header can
// be read. The returned size corresponds to the entire message size, which
// includes the header type and body length.
func (p *pgInterceptor) PeekMsg() (typ byte, size int, err error) {
    ...
}

// ReadMsg returns the current pgwire message in bytes. It also advances the
// interceptor to the next message. On return, the msg field is valid if and
// only if err == nil.
//
// The interceptor retains ownership of all the memory returned by ReadMsg; the
// caller is allowed to hold on to this memory *until* the next moment other
// methods on the interceptor are called. The data will only be valid until then
// as well. This may allocate if the message does not fit into the internal
// buffer, so use with care. If we are using this with the intention of sending
// it to another connection, we should use ForwardMsg, which does not allocate.
func (p *pgInterceptor) ReadMsg() (msg []byte, err error) {
    ...
}

// ForwardMsg sends the current pgwire message to dst, and advances the
// interceptor to the next message. On return, n == pgwire message size if
// and only if err == nil.
func (p *pgInterceptor) ForwardMsg(dst io.Writer) (n int, err error) {
    ...
}

The generic interceptor above will then be used by interceptors which are aware of the frontend and backend Postgres protocol. For example, the following describes the interceptor for frontend (i.e. server to client), and the backend version will be similar:

go
func (c *FrontendConn) PeekMsg() (typ pgwirebase.ServerMessageType, size int, err error) {
	byteType, size, err := c.interceptor.PeekMsg()
	return pgwirebase.ServerMessageType(byteType), size, err
}

func (c *FrontendConn) ReadMsg() (msg pgproto3.BackendMessage, err error) {
	msgBytes, err := c.interceptor.ReadMsg()
	if err != nil {
		return nil, err
	}
	// errWriter is used here because Receive must not Write.
	return pgproto3.NewFrontend(newChunkReader(msgBytes), &errWriter{}).Receive()
}

func (c *FrontendConn) ForwardMsg(dst io.Writer) (n int, err error) {
	return c.interceptor.ForwardMsg(dst)
}

The caller calls PeekMsg to determine the type of message. It could then decide if it wants to forward the message (through ForwardMsg), or parse the message for further processing (through ReadMsg). Note that the interceptors will only be used when reading or forwarding messages. Proxy-crafted messages are written to the connection directly.

Connection Establishment

Connection establishment has two parts: SQL pod selection, and authentication. We will reuse the existing SQL pod selection algorithm: weighted CPU load balancing. The probability of a pod being selected is inversely proportional to the pod's load. For security concerns, we will not store the original authentication information entered by the user within the proxy, and will make use of the token-based authentication added here. This token will be retrieved by the proxy during the migration process through a proxy-crafted Postgres message, and passed in as a custom crdb:session_revival_token_base64 parameter as part of the Postgres StartupMessage after connecting to the SQL pod. To ensure that this feature isn't abused by clients, the proxy will block all StartupMessage messages with that custom status parameter.

Currently, the logic that connects to a new SQL pod for a given tenant is coupled with the proxy's handler. We propose to add a new connector component that will be used to establish new connections, and authenticate with the SQL pod. This connector component will support all existing authentication methods, in addition to the newly added token-based authentication that was mentioned above. Connections established through the token-based authentication will not be throttled since that is meant to be used during connection migration.

Connection Migration

All message forwardings are handled by this ConnectionCopy function, which uses io.Copy for bi-directional copying of Postgres messages between the client and server. This function will be replaced with a new per-connection forwarder component. The forwarder uses two interceptors within the separate processors: request and response processors. The former handles packets between the client and the proxy, whereas the latter handles packets between the proxy and the connected SQL pod. Callers can attempt to suspend and resume these processors at any point in time, but they will only terminate at a pgwire message boundary.

When the forwarder is first created, all request messages are forwarded from the client to the server, and vice-versa for response messages. The forwarder exposes a RequestTransfer API that can be invoked, and this attempts to start the transfer process. The transfer process begins if the forwarder is in a safe transfer point which is defined by any of the following:

  1. The last message sent to the SQL pod was a Sync(S) or SimpleQuery(Q), and a ReadyForQuery(Z) has already been received at the time of evaluation.
  2. The last message sent to the SQL pod was a CopyDone(c), and a ReadyForQuery(Z) has already been received at the time of evaluation.
  3. The last message sent to the SQL pod was a CopyFail(f), and a ReadyForQuery(Z) has already been received at the time of evaluation.

Note that sending a new message to the SQL pod invalidates the fact that a ReadyForQuery has already been received. (3) handles the case where the COPY operation was requested through a SimpleQuery. CRDB does not currently support the COPY operation under the extended protocol, but if it does get implemented in the future, (1) will handle that case as well.

If no conditions are satisfied, the transfer process is not started, and forwarding continues. If any one of the above conditions holds true, the forwarder begins the transfer process by suspending both the request and response processors. If the processors are blocked waiting for I/O, they will be unblocked immediately when we set a past deadline through SetReadDeadline on the connections. Once the processors have been suspended, the forwarder sends a transfer request message with a randomly generated UUID that identifies the transfer request. The transfer request message to the SQL pod is a SimpleQuery message:

SHOW TRANSFER STATE [ WITH '<transfer_key>' ]

This query is an observer statement, allowing messages to go through if the session is in a failed transaction state. The goal of this statement is to return all the necessary information the caller needs in order to transfer a connection from one SQL pod to another. The query will always return a single row with 3 or 4 columns, depending on whether the transfer key was specified:

  • error: The transfer error if the transfer state cannot be retrieved (e.g. session_state cannot be constructed).
  • session_state_base64: The base64 serialized session state. This is equivalent to base64-encoding the result of crdb_internal.serialize_session.
  • session_revival_token_base64: The base64 token used for authenticating a new session through the token-based authentication described earlier. This is equivalent to base64-encoding the result of crdb_internal.create_session_revival_token.
  • transfer_key: The transfer key passed to the statement. If no key was given, this column will be omitted.

One unique aspect to the observer statement above is that all transfer-related errors are returned as a SQL field, rather than a pgError that gets converted to an ErrorResponse. The rationale behind this decision is that whenever an ErrorResponse gets generated during a transfer, there is ambiguity in detecting whether the response is for the client or the proxy, and the only way to ensure protocol correctness is to terminate the connection. Note that the design does not eliminate the possibility of an ErrorResponse since that may still occur before the query gets processed. In terms of implementation, the SHOW TRANSFER STATE statement will invoke internal methods directly (e.g. planner.CreateSessionRevivalToken()). See implementation here.

In the ideal scenario, this SimpleQuery generates four Postgres messages in the following order: RowDescription(T), DataRow(D), CommandComplete(C), ReadyForQuery(Z). The response processor will detect that we're in the transfer phase, and start parsing all messages if the types match one of the above.

If we receive an ErrorResponse message from the server at any point during the transfer phase before receiving the transfer state, the transfer is aborted, and the connection is terminated. Messages will always be forwarded back to the client until we receive the desired response. The desired response is said to match if we find the transfer key in the DataRow message. If we do not get a response within the timeout period, the transfer is aborted, and the connection is terminated. This situation may occur if previously pipelined queries are long-running, or the server went into a state where all messages are ignored until a Sync has been received.

The forwarder then uses the connector with the session revival token to establish a connection with a new SQL pod. In this new connection, the forwarder deserializes the session by sending a SimpleQuery with crdb_internal.deserialize_session(decode('<session_state>', 'hex')).

Once deserialization succeeds, the transfer process completes, and the processors are resumed. Note that we will try to resume a connection in non-ambiguous cases such as failing to establish a connection, or getting an ErrorResponse when trying to deserialize the session. If the transfer fails and the connection is resumed, it is the responsibility of the caller to retry the transfer process where appropriate. The entire transfer process must complete within 15 seconds, or a timeout handler will be triggered.

Rationale and Alternatives

Making the proxy protocol-aware instead of leaving it to the server

When forwarding pgwire messages, the current forwarding approach through io.Copy does not allow us to detect message boundaries easily. The SQL server is already protocol-aware, and could detect message boundaries easily. However, we'd need a way for the proxy to stop sending messages to the SQL pod. If we just stop the forwarding abruptly, some packets may have already been sent to the server. The only way to get this work is for the SQL pod to send back the partial packets, and is just too complex to get it right. Furthermore, we need to signal the server that the connection is about to be transferred, and all incoming packets should not be processed. This may require a separate connection to the server. To avoid all that complexity, the simplest approach here is to make the proxy protocol-aware since it has control over what gets sent to the SQL pod.

Creating a custom interceptor with internal buffer instead of reusing existing ones

Since we have decided to split io.Copy so that the proxy is protocol-aware, we'd need some sort of buffer to read the message types and lengths into.

The existing implementation of pgwirebase.ReadBuffer has a maximum message size limitation, which is configurable through the sql.conn.max_read_buffer_message_size cluster setting (default: 16MB). Using this within the proxy requires us to impose the same maximum message size limitation across all tenants, and this would render the existing cluster setting not very useful. To add on, pgwirebase.ReadBuffer allocates every time the message type is read, which is what we'd like to avoid during the forwarding process.

The other option would be to make use of pgproto3's Receive methods on the Frontend and Backend instances. These instances come with an additional decoding overhead for every message. For the proxy, there is no need to decode the messages in a steady state.

From the above, it is clear that existing buffers would not work for our use case, so we chose to implement a custom interceptor.

Using 8KB as the default internal buffer size within the interceptor

In the original design, 16KB was proposed, and there were concerns on the proxy taking up too much memory for idle connections since these buffers are fixed in size, and have to be allocated upfront. This would be a classic tradeoff between memory and time. With a smaller buffer size, up to a minimum of 5 bytes for the header, we incur more syscalls for small messages. On the other hand, we use more memory within the proxy and copy more bytes into the userspace if we use a larger buffer size. Note that the latter part is a disadvantage because io.CopyN with a TCP connection uses the splice system call to avoid copying bytes through the userspace.

We ran a couple of workloads to measure the median message sizes across a 30-second run. The results in bytes are as follows: kv (~10), tpcc (~15), tpch (~350). Considering the fact that small messages are very common, we would allocate a small buffer to avoid the performance hit due to excessive syscalls during proxying. Since one of the proxy's requirements would be to support up to 50K connections, an 8KB buffer is reasonable as that would total up to 8KB x 2 x 50,000 = 800MB. 8KB was also picked as it matches the lengths of Postgres' send and receive buffers.

Using a custom pgwire message for the connection migration

In the design above, the forwarder checks for specific conditions before sending the transfer session message. The original design of the connection migrator did not have this step, and without this step, we have a problem when sending a transfer session message in the middle of a COPY operation because this will result in an unrecognized message type error, aborting the transfer, and finally closing the connection. One approach that can be used to solve this is to introduce a new pgwire message type that will return a custom response that the proxy could detect, and continue the COPY operation. As noted in various discussions, including the token-based authentication RFC, this is a more fundamental divergence from the standard protocol. This approach is risky since message types are denoted by a single byte, and there's a possibility that there will be conflicts in the future.

Detecting a safe transfer point through counting pgwire messages

This design proposes a best-effort migration strategy by checking for specific conditions before sending the transfer request. If by any chance none of the conditions hold, the forwarder just does not perform the transfer, and it is the responsibility of the caller to retry. This approach comes with several drawbacks:

  1. The ReadyForQuery in condition (1) may not correlate with the Sync or SimpleQuery that was sent earlier since there could be in-flight queries in the pipelined case.
  2. If there was an in-flight query that puts the SQL server in the copy-in or copy-out state, the transfer session query will fail with an error, and the connection will be closed.
  3. Previous in-flight queries may be long-running, and are blocking.
  4. The server could end up in a state where the transfer session request got ignored in the extended protocol because of a previous error.

An alternative design that was considered is to count the number of pgwire messages to detect a safe transfer point, which should solve all the drawbacks above. A safe transfer point is said to occur when the number of Sync and SimpleQuery messages is the same as the number of ReadyForQuery messages, with some caveats. We would need to handle edge cases for (4). Similarly, some state bookkeeping needs to be done within the proxy to handle COPY operations. All this could get complex, and can be difficult to get it right.

In the best-effort migration approach, if scenario (1) occurs, the forwarder will continue to send messages to the client. The only case where this matters is if an error occurs through an ErrorResponse message, which in this case, the connection will be closed. Based on telemetry data, the number of Serverless clusters using the COPY operation is extremely low, so scenario (2) is really rare. Scenarios (3) and (4) will be handled by the timeout handler, and the connection will be closed (if we're in a non-recoverable state) when that is triggered.

Using normal statements instead of observer statements

In condition (1), we will send the transfer session message whenever the last message sent to the SQL pod was Sync or SimpleQuery, and ReadyForQuery has already been received during evaluation time. This protocol flow has no notion of transaction status. If the transfer session message is a normal statement instead of an observer statement, we may fail with an ErrorResponse if the message was sent in a state where the session is in a failed transaction state, resulting in a connection termination due to ambiguity. An observer statement is simple, and it appears that there are no major drawbacks to this.

Possible Future Extensions

  1. Implement an adaptive system for the internal buffer within the interceptor. The buffer size can be adapted to a multiple of the median size of pgwire messages, up to a maximum limit. This may address the situation where idle connections are taking up 16KB of memory each for the internal buffers.
  2. Under memory pressure of SQL proxy, a hard limit can be imposed on the number of connections per tenant, and the proxy could return a “server busy” error.

Follow-Up Work

The connection migration feature as outlined in this document will not be immediately used by the proxy. A follow-up will be written to outline when the proxy will be invoking the RequestTransfer API on the active connection forwarders. That RFC will cover distributed tenant load balancing for fewer hot spots, and graceful connection draining to minimize rude disconnects during scale-down events. The former requires some analysis, whereas the latter is straightforward, which can occur whenever the pod watcher detects that a pod has transitioned into the DRAINING state. This, however, will require us to keep track of per-pod connection forwarders.

Unresolved questions

N/A.

Footnotes

<a name="footnote1">1</a>: In the case of a single SQL pod remaining for a given tenant, the autoscaler will only invoke a scale-down event if and only if there are no active connections on it.