docs/RFCS/20210604_distributed_token_bucket.md
Design of a subsystem relevant in the multi-tenant setting ("serverless") which rate limits tenant KV operations in conformance to a budget target.
Serverless users will pay per usage and need to be able to set an upper bound on how much they are willing to pay in a billing cycle. Free tier users must be rate limited in order to curtail resource usage.
To encompass the various resources used, we have created an abstract unit called a "request unit". There will be a direct conversion from RUs to a dollar amount that is part of a tenant's bill. Request units are determined based on CPU usage in the SQL pods and on the KV requests from the tenant. For more details, see the "Managing Serverless Resource Usage" RFC (internal-only).
Each tenant will have a number of RUs available to use as needed, plus a "baseline" rate of replenishment. These map directly to the concept of a token bucket. The values are determined by a higher-level billing algorithm that is beyond the scope of this RFC and need to be configurable on-the-fly.
The enforcement of RU consumption will happen inside the SQL pods, at the point where KV requests are sent. The SQL pods communicate with centralized state (for that tenant) which lives in a system table inside the system tenant. The centralized state also tracks and reports the consumption of RUs.
The new subsystem will only be enabled on our multi-tenant clusters. The subsystem involves overhead related to communicating with and updating the centralized state for each tenant.
Note that admission control for preventing overload in the host cluster or for reducing the variability in user-visible performance (depending on other load on the host cluster) are not in the scope of this RFC.
The higher-level motivation is covered by the "Managing Serverless Resource Usage" RFC (internal-only). This RFC covers the implementation details of a subsystem necessary to enforce resource usage.
We implement a "global" (distributed) token bucket for each tenant; the metadata for all these buckets lives in a system table on the system tenant. The global bucket is configured with:
Each SQL pod implements a local token bucket and uses it for admission control at the KV client level (i.e. before sending out KV requests to the host cluster). Each KV request consumes tokens and might need to wait until there are enough tokens available. CPU usage on the pod also consumes tokens, but there is no mechanism to limit CPU usage; we account for it after-the-fact, by periodically (e.g. every second) retrieving the recent CPU usage and removing tokens from the bucket. This can put the bucket in debt, which will need to be paid before we can issue more KV requests.
The local bucket is periodically filled with tokens from the global bucket, or
assigned a fraction of the global fill rate. This happens via API calls through
the KV connector (see kvtenant.Connector). Each call is implemented as a
transaction involving the system table.
The global bucket keeps track of cumulative RU usage, which is reported (on a per-tenant basis) to Prometheus. All nodes on the host cluster expose per-tenant RU usage metrics which reflect the total usage since beginning of time. Whichever node services a global bucket request (through the KV connector) updates its instance of the metrics for that tenant. The current usage is the maximum value across all host cluster nodes.
We describe the algorithm in terms of "nodes" (a tenant's SQL pods) communicating with a centralized ("global") token bucket. The tokens are RUs. Note that we describe a single instance of the algorithm; in reality there is a separate instance for each tenant.
Each node requests tokens from the global bucket as needed:
when the node starts up, it immediately requests a small initial amount. The node doesn't need to block on this request, it could start using tokens (up to this small initial amount) right away.
when the node is getting close to running out of tokens (e.g. 1 second at the current usage rate). The amount of tokens that the node requests is calculated (based on the recent usage) so that it lasts for a target time period (e.g. 10 seconds). This target request period is the main configuration knob for the algorithm, reflecting the trade-off between a) how frequently each node makes requests to the global bucket and b) how fast we can respond to changing workload distribution among multiple nodes.
The global bucket behavior corresponds to the desired functionality: we want to allow customers to freely use resources without limitation up to a given fraction of their budget ("burst tokens"), after which we want to rate-limit resource consumption to a given rate ("refill rate"). So the global bucket operates in two modes:
Once a request is completed, the node adjusts its local token bucket accordingly, either adding burst tokens or setting up a refill rate.
The global refill rate is distributed among the nodes using a shares-based system. Each node calculates a share value and the global bucket maintains the sum of the current share values. The fraction of rate granted to a node is the ratio between that node's shares and the total shares. The shares reflect the load at each node (more details in the shares calculation section).
Note that the global bucket can transition between the two modes arbitrarily; even if it ran out of burst tokens, it can accumulate tokens enough to where it can start granting them for immediate use again. The global bucket state does not affect the shares tracking sub-system; we always update and keep track of shares even if we are not currently using them.
At any point in time, the global bucket maintains these values:
We describe the input and output values of a request to the global bucket.
type TenantConsumption struct {
RU float64
ReadRequests uint64
ReadBytes uint64
WriteRequests uint64
WriteBytes uint64
}
type TokenBucketRequest struct {
InstanceID uint32
// Uniquely identifies this instance (in light of instance ID reuse).
InstanceLease string
// Sequence number used to detect duplicate requests.
InstanceSeq uint64
// Amount of requested tokens.
RequestedTokens float64
// New shares value for the node.
NewShares float64
// TargetRequestPeriod configuration knob.
TargetRequestPeriod time.Duration `protobuf:"bytes,6,opt,name=target_request_period,json=targetRequestPeriod,proto3,stdduration" json:"target_request_period"`
// Consumption that occurred since this node's last request.
ConsumptionSinceLastRequest TenantConsumption
}
type TokenBucketResponse struct {
// Amount of tokens granted. In most cases, this is equal to the requested
// tokens; the only exception is when TrickleTime would otherwise exceed
// TargetRequestPeriod, in which case the number of tokens are reduced.
GrantedTokens float64
// If zero, the granted tokens can be used immediately, without restriction.
// If set, the granted tokens become available at a constant rate over this
// time period. E.g. if we are granted 1000 tokens and the trickle duration is
// 10 seconds, the tokens become available at a rate of 100tokens/sec for the
// next 10 seconds.
// TrickleTime is at most the TargetRequestPeriod.
TrickleTime time.Duration
// Maximum number of unused tokens that we can accumulate over the trickle
// time; once we reach the limit we have to start discarding tokens. Used to
// limit burst usage for the free tier. Only set if TrickleTime is set.
MaxBurstTokens float64
}
The shares have two terms:
load estimation. This is an exponentially weighted moving average of the load at the node, in RUs requested per second.
backlog term. This term ensures that nodes which have accumulated a longer backlog will "catch up" to the other nodes. It is a sum to which each queued operation contributes its RUs, exponentially scaled with the age of the operation. The idea is that the older operations become dominant in the calculations of the shares. Without this age-dependent scaling, differences in the top-of-the line can persist between nodes that receive workloads at different rates.
For example, let's say node 1 has 10s worth of backlog of a constant 100 RUs/s workload and node 2 has 5s worth of backlog of a constant 200RU/s. The behavior of an ideal token bucket would actually be to block the second node altogether for 5s and give the first node everything until it catches up. If we just sum up the RUs, the two nodes get the same share and the head-of-queue discrepancy can persist for a long time. The exponential is a compromise between the two where the more a node is behind other nodes, the more of a share it gets.
Note that there can be periods where shares consistently grow (or shrink) for a period of time, e.g. when we keep accumulating backlog. Whenever we calculate a refill rate for a node, we are using somewhat stale values for the shares of the other nodes. This can lead to systematic errors where we give out too much or too little rate in aggregation. When we give out too little, the bucket will automatically compensate by accumulating tokens and using them as "burst". When we give out too much, the bucket will accumulate debt.
The local buckets can be "in debt", due to operations that can only be accounted for after the fact (e.g. size of reads, CPU usage). The debt is accounted for in the number of tokens that a node requests from the global bucket. When a local bucket is in debt, KV operations are blocked until the debt is paid. This includes reads which have a size-independent cost component that is requested up-front. An improvement here is to allow the debt to be paid over a time period (e.g. 1 second) from the refill rate, to avoid "stop" and "start" behavior (especially w.r.t periodic collection of CPU usage).
The global bucket can also accumulate debt (as explained in the previous section). To correct for this, debt causes a reduction in the effective rate (which is portioned to the nodes according to the shares) so that the debt would be paid out over the next target request period.
Note that because we are pre-distributing tokens over the target request period,
it is expected for the global bucket to be in debt of up to
<refill rate> * <target request period>. For the purpose of reducing the
effective rate, we only treat anything above this threshold as systematic debt.
We list the configuration parameters for the algorithm, in decreasing order of importance:
10s and 30s.0.5.e^(<age>/<backlog_time_scale>). Default value: 10s.0.01.In the proposed system, all these configuration knobs "live" on the tenant side.
We estimate the number and size of KVs generated over time and the number of transactions per second to the token bucket. We assume the serverless cluster has at most 5000 SQL pods running at any time.
| 10 sec target duration | 30 sec target duration | |
|---|---|---|
| Transactions / second | 500 | 167 |
| KVs written / second | 1000 | 333 |
| KVs written / hour | 3.6M | 1.2M |
| KVs written / day | 86M | 29M |
| KV bytes written / second | 100 KB | 33 KB |
| KV bytes written / hour | 360 MB | 120 KB |
| KV bytes written / day | 8.6 GB | 2.8 GB |
Notes:
We created a simulation and visualization framework in order to prototype this algorithm. The input is a workload, which is a set of requested RU/s graphs (one per node). The output is a set of granted RU/s graphs. The simulation implements this algorithm and compares against an ideal distributed token bucket.
The simulation can be accessed at: https://raduberinde.github.io/distbucket-v21-06-14/?workload=steps.
The page shows the granted workload over time for both the prototyped algorithm and an ideal (instantly synchronized) token bucket. We also show the graphs of total consumption between the two, to see if there is any systematic (long-term) divergence.
The workload can be changed from the dropdown or the yaml can be manually edited. Each workload function is a sum of terms; the supported terms can be inferred from the code.
The code is available here: https://github.com/RaduBerinde/raduberinde.github.io/blob/master/distbucket-v21-06-14/lib/dist_token_bucket_3.go.
The system table stores two kinds of rows:
Note that we could use two tables for these different purposes, but we prefer to "interleave" the data into a single table for performance reasons: for most tenants, the rows for that tenant will fit in a single range.
Schema for the system table:
CREATE TABLE system.tenant_usage (
tenant_id INT NOT NULL,
-- For each tenant, there is a special row with instance_id = 0 which contains
-- per-tenant stat. Each SQL instance (pod) also has its own row with
-- per-instance state.
instance_id INT NOT NULL,
-- next_instance_id identifies the next live instance ID, with the smallest ID
-- larger than this instance_id (or 0 if there is no such ID).
-- We are overlaying a circular linked list of all live instances, with
-- instance 0 acting as a sentinel (always the head of the list).
next_instance_id INT NOT NULL,
-- -------------------------------------------------------------------
-- The following fields are used only for the per-tenant state, when
-- instance_id = 0.
-- -------------------------------------------------------------------
-- Bucket configuration.
ru_burst_limit FLOAT,
ru_refill_rate FLOAT,
-- Current amount of RUs in the bucket.
ru_current FLOAT,
-- Current sum of the shares values for all instances.
current_share_sum FLOAT,
-- Cumulative usage statistics.
total_ru_usage FLOAT,
total_read_requests INT,
total_read_bytes INT,
total_write_requests INT,
total_write_bytes INT,
total_sql_pod_cpu_seconds FLOAT, -- TODO: Maybe milliseconds and INT8?
-- -------------------------------------------------------------
-- The following fields are used for per-instance state, when
-- instance_id != 0.
-- --------------------------------------------------------------
-- The lease is a unique identifier for this instance, necessary because
-- instance IDs can be reused.
instance_lease BYTES,
-- Last request sequence number. These numbers are provided by the
-- instance and are monotonically increasing; used to detect duplicate
-- requests and provide idempotency.
instance_seq INT,
-- Current shares value for this instance.
instance_shares FLOAT,
-- Time when we last heard from this instance.
instance_last_update TIMESTAMP,
FAMILY "primary" (
tenant_id, instance_id, next_instance_id,
ru_burst_limit, ru_refill_rate, ru_current, current_share_sum,
total_ru_usage, total_read_requests, total_read_bytes, total_write_requests,
total_write_bytes, total_sql_pod_cpu_seconds,
instance_lease, instance_seq, instance_shares, instance_last_update
),
PRIMARY KEY (tenant_id, instance_id)
)`
An operation to the bucket is implemented with a transaction; the most common path would look like:
BEGIN;
SELECT ... FROM system.tenant_usagee
WHERE tenant_id=$1 AND instance_id IN (0, $2)
FOR UPDATE;
-- Calculate new state..
UPSERT INTO system.tenant_usage (...) VALUES
( ... row for instanceid=0 ... ),
( ... row for the instance ... )
END;
In general, we may need to add a new instance to the "list". For that, we will need to find the previous instance ID, with a query like:
SELECT ... FROM system.tenant_usage
WHERE tenant_id=$1 AND instance_id > 0 AND instance_id < $2
Subsequently, the previous instance's next_instance_id will need to be updated
(with an UPSERT).
Any failed request is retried with the same instance_seq. If the previous
attempt went through, we will notice that the instance_seq in our table is the
same and allow special handling of this case.
To perform garbage collection of dead instances, we will use extra information
from the instance side. We assume that each instance has a recent list of "live"
instance IDs (this will be implemented by a separate work-in-progress
subsystem). Whenever an instance makes a token bucket request, it includes the
next instance ID in order. On the server side, we can cross-check this value
against the next_instance_id in the table; if they differ, we can trigger an
asynchronous cleanup step that deletes instances from the table if the last
update happened a long time ago.
The KV Connector API is used by the SQL pods to make requests to the distributed
bucket. This API is implemented on the host cluster side by performing a
transaction on tenant_usage. The request/response structs are described above.
The UUID argument identifies the each request, allowing to retry calls that may
have gone through on the host cluster side without losing tokens or
double-counting consumption.
type TenantTokenBucketProvider interface {
Request(
ctx context.Context,
req TokenBucketRequest,
) (TokenBucketResponse, error)
}
The limits for a tenant are periodically reconfigured by a higher-level usage controller running in our CockroachCloud infrastructure, as outlined in the "Managing Serverless Resource Usage" RFC (internal-only).
The proposed configuration interface is via an internal SQL function; we can easily implement other endpoints on the system tenant as necessary.
SELECT crdb_internal.update_tenant_resource_limits(
tenant_id, -- INT
available_request_units, -- FLOAT
refill_rate, -- FLOAT
max_burst_request_units -- FLOAT
as_of, -- TIMESTAMP
as_of_consumed_request_units, -- FLOAT
)
The statement completes after a new row is inserted in the system table.
The configuration parameters are available_request_units, refill_rate,
max_burst_request_units.
The rest of the parameters exist for technical purposes:
operation_uuid allows us to guarantee idempotency of the operation, if a
reconfiguration request is retried. This is not a strong requirement (the
consequences of repeating a reconfiguration requests are small) but our
schema makes it easy to provide.
as_of is the time at which the consumption state was consulted by the usage
controller and as_of_consumed_request_units is the total consumption value
at that time. The as_of timestamp originates from the host cluster (it
corresponds to the time when that total consumption value was emitted as a
statistic). These parameters allow us to accurately deal with the reality
that the usage controller's operation is not instant. Specifically:
as_of_consumed_request_unitsas_of.This API should be initially called as part of creation of a new tenant (with
as_of_consumed_request_units = 0). The system can start with some reasonable
defaults (e.g. free tier settings) until that happens.
In the initial implementation, this function will be called separately for each tenant. We will investigate batching opportunities at a later point.
The system must have reasonable behavior if the bucket range becomes temporarily inaccessible. To achieve this, in the short term each node continues operating at the previous rate of consumption. Longer term, the rate can decay over to 1/N of the total refill rate (where N is the number of SQL pods).
The system table could get a high amount of aggregate traffic. If we have 5,000 SQL pods across all tenants, we expect 500 operations per second (for a 10s target request period).
Load-based splitting of ranges would reduce the range sizes; we could also force split points between the tenants.
The minimum refill amount can be tuned to reduce the frequency of bucket operations during periods of very low activity on a single node.
Complexity
Periodic transactions on the host cluster
We have explored a number of existing systems for cloud rate limiting. Most of these use a gossip-style algorithm to disseminate relative usage information. This is not necessary in our system which provides consistent access to shared state.
An alternative to using transactions as the only centralization point would be to elect one "leader" pod which implements the distributed bucket logic and which all other pods communicate with (perhaps even in a streaming fashion). This SQL pod would update the persistent state (at a reduced frequency), using the same mechanism (KV connector). The difficulty with this approach is around implementing election reliably; this could be done by keeping track of a "lease" in the bucket state (in the system table). There will likely be other usecases for having a pod leader; if that happens we can implement this idea as an improvement on top of the proposed system (most parts would not need to change).