design/hotshard.md
FoundationDB partitions the keyspace into shards (contiguous key ranges) and assigns each shard to a storage-team replica set.
Shards are just key ranges. Their size adapts continuously as Data Distributor (DD) monitors load and splits or merges ranges to stay within configured bounds (DDShardTracker:875, DDShardTracker:884). Splits are triggered when shard size (bytes) or write bandwidth exceeds thresholds (splitMetrics.bytes = shardBounds.max.bytes / 2, splitMetrics.bytesWrittenPerKSecond etc.). In other words, shard size is workload driven rather than fixed (DDShardTracker:875, DDShardTracker:879). Storage servers and commit proxies provide ongoing measurements so DD can resize ranges dynamically to relieve load hotspots and maintain balance (StorageMetrics:472, CommitProxyServer:742).
The maximum shard size can be no larger than 500MB (MAX_SHARD_BYTES). It is calculated this way:
int64_t getMaxShardSize(double dbSizeEstimate) {
int64_t size = std::min((SERVER_KNOBS->MIN_SHARD_BYTES + (int64_t)std::sqrt(std::max<double>(dbSizeEstimate, 0)) *
SERVER_KNOBS->SHARD_BYTES_PER_SQRT_BYTES) *
SERVER_KNOBS->SHARD_BYTES_RATIO,
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
| dbSizeEstimate | Calculation | Result (approx.) |
|---|---|---|
| 0 | (10 MB + 0) × 4 | 40 MB |
| 1 GB | (10 MB + √1 GB × 45) × 4 | 45.7 MB |
| 100 GB | (10 MB + √100 GB × 45) × 4 | 96.9 MB |
| 1 TB | (10 MB + √1 TB × 45) × 4 | 220 MB |
| ≈6.5 TB | (10 MB + √6.5 TB × 45) × 4 → hits cap | 500 MB |
| ≥10 TB | Any larger value → capped by MAX_SHARD_BYTES | 500 MB |
A shard is "write-hot" when it absorbs a disproportionate share of write traffic, triggering shard splitting. Read-hot shards also exist and trigger rebalancing (moving shards between teams), but they do not trigger splits (dd-internals:L129).
StorageServerMetrics::splitMetrics only emits a split point when it can carve the shard into two chunks that both respect the size/traffic limits. The loop stops attempting further splits when the remaining range would be smaller than 2 × MIN_SHARD_BYTES and write-based splitting is either disabled or below SHARD_SPLIT_BYTES_PER_KSEC. Separately, getSplitKey() returns the shard end (so no split is emitted) if the byte/IO samples can't find a valid interior key (StorageMetrics:286, StorageMetrics:302).
Storage servers continually sample bytes-read, ops-read, and shard sizes. A shard whose read bandwidth density exceeds configured thresholds is tagged as read-hot so the data distributor process can identify the offending range (StorageMetrics:472). This internal state isn't directly visible to users, though "ReadHotRangeLog" trace events in the server logs can help surface diagnostic information about detected hot ranges.
The data distributor decides a shard should shrink and calls splitStorageMetrics, which in turn asks each storage server in the shard for split points via SplitMetricsRequest. The tracker sets targets like "half of the max shard size" and minimum bytes/throughput (DDShardTracker:875).
The storage server handles that request in StorageServerMetrics::splitMetrics, pulling the live samples it already maintains for bytes, write bandwidth, and IOs (byteSample, bytesWriteSample, iopsSample) to compute balanced cut points (storageserver:1973, StorageMetrics:286).
Inside splitMetrics, the helper getSplitKey converts the desired metric offset into an actual key, i.e. the new key that defines the key that will become the new shard boundary. This is done using the sampled histogram. Jitter is added so repeated splits don't choose exactly the same boundary, and MIN_SHARD_BYTES plus optional write-traffic thresholds are enforced before accepting the split (StorageMetrics:286, StorageMetrics:302).
It loops until the remaining range falls under all limits, emitting each chosen key into the reply. The tracker then uses those keys to call executeShardSplit, which updates the shard map and kicks off the relocation (StorageMetrics:332, DDShardTracker:890).
StorageServerMetrics::splitMetrics only emits a split point when it can carve the shard into two chunks that both satisfy the size/traffic limits—specifically the loop exits if remaining.bytes < 2 * MIN_SHARD_BYTES or the write bandwidth is below SHARD_SPLIT_BYTES_PER_KSEC, and getSplitKey() will return the shard end if the byte/IO samples can't find an interior key that meets the target (StorageMetrics:286, StorageMetrics:302).
Hot shards typically cannot be split because they're already very small—often just a single hot key or small range. Such tiny shards hit two barriers: (1) they violate minimum size constraints (MIN_SHARD_BYTES), or (2) they don't contain enough keys for the storage server's sampling mechanism to identify a split point. Worse, even if a split were possible, the hot key would remain assigned to the same storage team, providing no relief from saturation.
When a hot shard cannot be split (due to minimum size constraints or lack of sample resolution), the cluster experiences cascading effects:
Ratekeeper tracks each storage server's write queue and durability lag; a hot shard drives both metrics up for the servers hosting it (Ratekeeper:894, Ratekeeper:910).
When the write queue grows faster than the configured target, Ratekeeper sets the limit reason to storage_server_write_queue_size and lowers the global transaction rate accordingly (Ratekeeper:835).
If durability lag remains high even after queue-based throttling, Ratekeeper applies the durability lag limiter to avoid violating MVCC guarantees (Ratekeeper:845).
Ratekeeper continuously monitors these queue and durability metrics to determine cluster-wide transaction rate limits.
When a storage server's write queue becomes excessive, Ratekeeper sets the limit reason to storage_server_write_queue_size and reduces the cluster-wide transaction rate proportionally.
This causes all transactions across the cluster to be throttled, even those not touching the hot shard, because the cluster cannot commit faster than its slowest storage server can durably persist data.
The rate limit is calculated as limitTps = min(actualTps * maxBytesPerSecond / max(1e-8, inputRate), maxBytesPerSecond * MAX_TRANSACTIONS_PER_BYTE), ensuring the cluster doesn't overwhelm the saturated storage server even when the measured input rate is near zero (Ratekeeper:795).
Detection Signs:
Verification Process:
/etc/foundationdb/fdb.clusterfdbcli (without arguments) and note the cluster file path shownSSH to one of the clients
Copy the connection string into a file (e.g., connections.txt)
Run the transaction profiling analyzer analyzer:
contrib/transaction_profiling_analyzer/transaction_profiling_analyzer.py -s "2023-09-14 16:40 UTC" -e "2023-09-14 16:50 UTC" -C connections.txt
Note: Use UTC format recommended above. The script uses dateparser library for time parsing.
Analyze the output:
Decode the hot key to get application-level details
Identify the hot range using logs:
ReadHotRangeLog events for detailed hot range informationResolution:
⚠️ Important: FoundationDB has no internal mechanism to resolve hot shards. Application-level throttling is required.
Note: This feature is experimental and disabled by default, guarded by the HOT_SHARD_THROTTLING_ENABLED server knob (ServerKnobs:872).
When enabled, write-hot shards are tracked by the commit proxies, which maintain a hot-shard table and reject incoming mutations against those ranges with the transaction_throttled_hot_shard error to keep them from overwhelming a single team (CommitProxyServer:742).
This targeted throttling attempts to throttle only transactions writing to the hot shard, rather than penalizing the entire cluster with global rate limits (CommitProxyServer:742).
Once the data distributor flags a hot shard, it can split the range into smaller pieces and/or relocate portions to other teams to spread the traffic while respecting health constraints and load targets (dd-internals:146).
FoundationDB uses several server knobs to control hot shard detection, splitting, and throttling behavior.
These knobs control the experimental hot shard throttling feature (PR #10970):
HOT_SHARD_THROTTLING_ENABLED (bool, default: false)
HOT_SHARD_THROTTLING_EXPIRE_AFTER (double, default: 3.0 seconds)
HOT_SHARD_THROTTLING_TRACKED (int64_t, default: 1)
HOT_SHARD_MONITOR_FREQUENCY (double, default: 5.0 seconds)
These knobs control when Data Distribution splits shards based on write bandwidth:
SHARD_MAX_BYTES_PER_KSEC (int64_t, default: 1,000,000,000 bytes/ksec = 1 GB/sec)
SHARD_MIN_BYTES_PER_KSEC (int64_t, default: 100,000,000 bytes/ksec = 100 MB/sec)
SHARD_MAX_BYTES_PER_KSEC to avoid merge/split cyclesSHARD_MIN_BYTES_PER_KSECSHARD_SPLIT_BYTES_PER_KSEC (int64_t, default: 250,000,000 bytes/ksec = 250 MB/sec)
SHARD_MAX_BYTES_PER_KSECSHARD_MIN_BYTES_PER_KSEC, generates immediate re-merging workThese knobs control read-hot shard detection (primarily for read load balancing):
SHARD_MAX_READ_OPS_PER_KSEC (int64_t, default: 45,000,000 ops/ksec = 45k ops/sec)
SHARD_READ_OPS_CHANGE_THRESHOLD (int64_t, default: SHARD_MAX_READ_OPS_PER_KSEC / 4)
READ_SAMPLING_ENABLED (bool, default: false)
ENABLE_WRITE_BASED_SHARD_SPLIT (bool)
SHARD_MAX_READ_DENSITY_RATIO (double)
SHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDS (int64_t)
SHARD_MAX_BYTES_READ_PER_KSEC_JITTER (int64_t)
These knobs control automatic shard movement when storage queues become unbalanced:
ENABLE_REBALANCE_STORAGE_QUEUE (bool, default: false)
REBALANCE_STORAGE_QUEUE_SHARD_PER_KSEC_MIN (int64_t, default: SHARD_MIN_BYTES_PER_KSEC)
DD_ENABLE_REBALANCE_STORAGE_QUEUE_WITH_LIGHT_WRITE_SHARD (bool, default: true)
These knobs control Data Distribution's ability to move shards based on read hotspots:
READ_REBALANCE_SHARD_TOPK (int)
READ_REBALANCE_MAX_SHARD_FRAC (double)
These knobs control the priority of different Data Distribution operations:
PRIORITY_SPLIT_SHARD (int)
PRIORITY_MERGE_SHARD (int)
WAIT_METRICS_WRONG_SHARD_CHANCE (double)
WRONG_SHARD_SERVER_DELAY (double, client-side)
SHARD_MIN_BYTES_PER_KSEC (100 MB/s)
↓ (must be significantly less than)
SHARD_SPLIT_BYTES_PER_KSEC (250 MB/s)
↓ (must be less than half of)
SHARD_MAX_BYTES_PER_KSEC (1 GB/s)
When a shard's write bandwidth exceeds SHARD_MAX_BYTES_PER_KSEC, it is split into pieces each with bandwidth < SHARD_SPLIT_BYTES_PER_KSEC. Shards with bandwidth > SHARD_MIN_BYTES_PER_KSEC will not be merged back together.
Read Hot Shard Detection Flow:
READ_SAMPLING_ENABLED, storage servers sample read operationsSHARD_MAX_READ_OPS_PER_KSEC or read density exceeds SHARD_MAX_READ_DENSITY_RATIOSHARD_READ_HOT_BANDWIDTH_MIN_PER_KSECONDSREAD_REBALANCE_SHARD_TOPK) for rebalancing