docs/RFCS/20191108_closed_timestamps_v2.md
This RFC discusses changing the way in which closed timestamps are published (i.e. communicated from leaseholders to followers). It argues for replacing the current system, where closed timestamps are tracked per store and communicated out of band from replication, and migrating to a hybrid scheme where, for “active” ranges, closed timestamps are piggy-backed on Raft commands, and "inactive" ranges continue to use a separate (but simpler) out-of-band/coalesced mechanism.
This RFC builds on, and contrasts itself, with the current system. Familiarity with the status quo is assumed.
The Non-blocking Transactions project requires us to have at least two categories of ranges with different policies for closing timestamps. The current system does not allow this flexibility. Even regardless of the new need, having a per-range closed timestamp was already something we've repeatedly wanted. The existing system has also proven to be subtle and hard to understand; this proposal seems significantly simpler which would be a big win in itself.
pkg/kv/kvserver/closedts/storage.initialMaxClosed value suitable for the new range. With this proposal, the
RHS's initial closed will simply be the closed ts carried by the command
corresponding to the EndTxn with the SplitTrigger.A concept fundamental to this proposal is that closed timestamps become associated with Raft commands and their LAIs. Each command carries a closed timestamp that says: whoever applies this command c1 with closed ts ts1 is free to serve reads at timestamps <= ts1 on the respective range because, given that c1 applied, all further applying commands perform writes at timestamps > ts1.
Closed timestamps carried by commands that ultimately are rejected are inconsequential. This is a subtle, but important difference from the existing infrastructure. Currently, an update says something like "if a follower gets this update (epoch,MLAI), and if, at the time when it gets it, the sender is still the leaseholder, then the receiver will be free to use this closed timestamp as soon as replication catches up to MLAI". The current proposal does away with the lease part - the receiver no longer concerns itself with the lease status. Referencing leases in the current system is necessary because, at the time when the provider sends a closed timestamp update, that update references a command that may or may not ultimately apply. If it doesn't apply, then the epoch is used by the receiver to discern that the update is not actionable. In this proposal, leases no longer play a role because updates refer to committed commands, not to uncertain commands.
The Right-Hand Side (RHS) of a split will start with the closed timestamps carried
by the command that creates the range (i.e. the EndTxn with the SplitTrigger).
We've considered choosing an initial closed timestamp for the RHS during
evaluation and putting it in the SplitTrigger. The problem with that is that
it'd allow the closed timestamp to regress for the RHS once the split is
applied, which would allow follower reads to use inconsistent snapshots across
ranges. Consider:
a-c into a-b and b-c. The AdminSplit is
evaluated. The SplitTrigger is computed to contain a closed ts = 10.AdminSplit is evaluated, but before the corresponding command
(c1) is sequenced by the proposal buffer, another request evaluates and its
proposal gets sequenced with a closed ts of 20.c1 applies. Then the split applies on some, but not, all replicas. So now,
the replicas that have applied the split have a closed ts of 20 for a-b and
a closed ts of 10 for b-c. The replicas that have not applied it have a
closed ts of 20 for a-c.15 to keys b and, say, x. Say that the
follower who hasn't yet applied the split continues to not apply it for a while,
and thus also doesn't apply the write to b.b and x @ 16. The read on b can be
served by a non-split follower because it's below (what it believes to be) the
relevant closed timestamp. This means that the read might fail to see the write in question,
which is no good - in particular because the read could see the write on x.For merges, the closed timestamp after the merge is applied will be the one of the LHS. This technically allows for a regression in the RHS's closed timestamp (i.e. the RHS might have been frozen at a time when its closed timestamps is 20 and the merge is completed when the closed ts of the LHS is 10), but this will be inconsequential because writes to the old RHS keyrange between 10 and 20 will be dissalowed by the timestamp cache: we'll initialize the timestamp cache of the RHS keyrange to the freeze time (currently we only muck with the RHS's timestamp cache if the leases for the LHS and RHS were owned by different stores).
Like the current implementation, this proposal would operate in terms of LAIs, not of Raft log indexes. Each command will carry a closed timestamp, guaranteeing that further applied commands write at higher timestamps. We can't perfectly control the order in which commands are applied to the log because proposing at index i does not mean that the entry cannot be appended to the log at a higher position in situations where the leaseholder (i.e. the proposer) is different from the Raft leader. This kind of reordering of commands in the log means that, even though the proposer increases the closed timestamps monotonically, the log entries aren't always monotonic. However, applying commands still have monotonic closed timestamps because reodered commands don't apply (courtesy of the LAI protection).
The fact that a log entry's closed timestamp is only valid if the entry passes the LAI check indicates that closed timestamps conceptually live in the lease domain, not in the log domain, and so we think of closed timestamps being associated with a LAI, not with a log index position. Another consideration is the fact that there are Raft entries that don't correspond to evaluated requests (things like leader elections and quiescence messages). These entries won't carry a closed timestamp.
The LAI vs log index distinction is inconsequential in the case of entries carrying their own closed timestamp, but plays a role in the design of the side-transport: that transport needs to identify commands somehow, and it will do so by LAI.
We'll now first describe how closed timestamps are propagated, and then we'll discuss how timestamps are closed.
For "active" ranges, closed timestamps will be carried by Raft commands. Each proposal carries the range's closed timestamp (at the time of the proposal). If the proposal is accepted, it will apply on each replica. At application time, each replica will thus become aware of a new closed timestamp. The one exception are lease request (including lease transfers): these proposals will not carry a closed timestamp explicitly. Instead, the lease start time will act as the closed timestamp associated with that proposal. This allows us to get rid of the reset of the tscache that we currently perform when a new lease is applied; writes can simply check against the closed timestamp.
One thing to note is that closed ts are monotonic below Raft, so when applying
commands a replica can blindly overwrite its known closed timestamp. Another
thing to note is that, while a range is active, the close-frequency parameter of
closed timestamps (i.e. the kv.closed_timestamp.close_fraction cluster
setting) doesn't matter. Closed timestamp publishing is more continuous, rather
than the discreet increments dictated by the current, periodic closing. The
periodic closing will still happen for inactive ranges, though.
Each replica will write the closed timestamp it knows about in a new field part
of the RangeStateAppliedKey - the range-local key maintaining the applied LAI
and the range stats, among others. This key is already being written to on every
command application (because of the stats). Snapshots naturally include the
RangeStateAppliedKey, so new replicas will be initialized with a closed
timestamp. Splits will initialize the RHS by propagating the RHS's closed
timestamp through the SplitTrigger structure (which btw will be a lot cleaner
than the way in which we currently prevent closed ts regressions on the RHS - a
below-Raft mechanism).
We've talked about "active/inactive" ranges, but haven't defined them. A range is inactive at a point in time if there's no inflight write requests for it (i.e. no writes being evaluated at the moment and no proposals in-flight). The leaseholder can verify this condition. If there's no lease, then no replica can verify the condition by itself, which will result in the range not advancing its closed timestamp (that's also the current situation); the next request will cause a lease acquisition, at which point timestamps will start being closed again. When a leaseholder detects a range to be inactive, it can choose to bump its closed timestamp however it wants as long as it stays below the lease expiration period (as long as it promises to not accept new writes below it). Each node will periodically scan its ranges for inactive ones (in reality, it will only consider ranges that have been inactive for a little while) and, when detecting a new one, it will move it over to the set of inactive ranges for which closed timestamps are published periodically through a side-transport (see below). An exception is a subsumed range, which cannot have its closed timestamp advanced (because that might cause an effecting closed ts regression once the LHS of the respective merge takes over the key space).
When the leaseholder for an inactive range starts evaluating a write request, the range becomes active again, and so goes back to having its closed timestamps transported by Raft proposals.
The inactivity notion is similar to quiescence, but otherwise distinct from it (there's no reason to tie the two). Quiescence is initiated on each range's Raft ticker (ticking every 200ms); we may or may not reuse this for detecting inactivity. 200ms might prove too high of a duration for non-blocking ranges, where this duration needs to be factored in when deciding how far in future a non-blocking txn writes.
As hinted above, each node will maintain a set of inactive ranges for which
timestamps are to be closed periodically and communicated to followers through a
dedicated streaming RPC. Each node will have a background goroutine scanning for
newly-inactive ranges to add to the set, and ranges for which the lease is not
owned any more to remove from the set (updates for a range are only sent for
timestamps at which the sender owns a valid lease). Writes remove ranges from
the set, as do lease expirations. Periodically, the node closes new timestamps
for all the inactive ranges. The exact timestamp that is closed depends on each
individual range's policy, but all ranges with the same policy are bumped to the
same timestamp (for compression purposes, see below). For "normal" ranges, that
timestamp will be now() - kv.closed_timestamp.target_duration.
Once closed, these timestamps need to be communicated to the followers of the respective ranges, and associated with a Raft entries. We don't want to communicate them using the normal mechanism (Raft messages) because we don't want to send messages on otherwise inactive ranges - in particular, such messages would unquiesce the range - thus defeating the point of quiescence. So, we'll use another mechanism: a streaming RPC between each provider node and subscriber node. This stream would carry compressed closed timestamp updates for the sender's set of inactive ranges, grouped by the ranges' closed ts policy. In effect, each node will broadcast information about all of its inactive ranges to every other node (thus every recipient will only be interested in some of the updates - namely the ones for which it has a replica). The sender doesn't try to customize messages for each recipient.
The first message on each stream will contain full information about the groups:
groups: [
{
group id: g1,
members: [{range id: 1, log index: 1}, {range id: 2, log index: 2}],
timestamp: 100,
},
{
group id: g2,
members: [{range id: 3, log index: 3}, {range id: 4, log index: 4}],
timestamp: 200,
},
...
]
Inactive ranges are grouped by their closed ts policy and the members of each group are specified. For each range, the last LAI (as of the time when the update is sent) is specified - that's the entry that the closed timestamp refers to. The receiver of this update will bump the closed timestamp associated with that LAI to the respective group's new closed timestamp.
After the initial message on a stream, following messages are deltas of the form:
groups: [
{
group id: g1,
removals: [{range id: 1, log index: 1}],
additions: [{range id: 5, log index: 5}],
timestamp: 200,
},
...
]
The protocol takes advantage of the fact that the streaming transport doesn't lose messages.
It may seem that this side-transport is pretty similar to the existing
transport. The fact that it only applies to inactive ranges results in a big
simplification for the receiver: the receiver doesn't need to be particularly
concerned that it hasn't yet applied the log position referenced by the update -
and thus doesn't need to worry that overwriting the <lai,closed ts> pair that it
is currently using for the range with this newer info will result in the
follower not being to serve follower-reads at any timestamp until replication
catches up. The current implementation has to maintain a history of closed ts
updates per range for this reason, and all that can go away.
The recipient of an update will iterate through all the update's ranges and increment the respective's replica's (if any) closed timestamp. A case to consider is when the follower hasn't yet applied the LAI referenced by the update; this is hopefully uncommon, since updates refer to inactive ranges so replication should generally be complete. When an update with a not-yet-applied LAI is received, the replica's closed timestamp cannot simply be bumped, so we'll just ignore such an update (beyond still processing the respective range's addition to an inactive group). An alternative that was considered is to have the replica store this prospective update in a dedicated structure, which could be consulted on command application; when the expected LAI finally applies, the command application could also bump the closed timestamp and clear the provisional update structure. We opted against this complication as it would only provide momentary improvements: without doing it, it takes an extra message from the side transport after the range's replication catches up to bump the closed timestamp.
Note that this scheme switches the tradeoff from the current implementation: currently, closed timestamp updates on the follower side are lazy; replicas are not directly updated in any way, instead letting reads consult a separate structure for figuring out what's closed. The current proposal makes updates more expensive but reads cheaper.
We've talked about how closed timestamps are communicated by a leaseholder to followers. Now we'll describe how a timestamp becomes closed. The scheme is pretty similar to the existing one, but simpler because LAI don't need to be tracked anymore, and because all the tracking of requests can be done per-range (instead of per-store). Another difference is that the existing scheme relies on a timer closing out timestamps, where this proposal relies just on the flow of requests to do it (for active ranges).
We can close a timestamp as soon as there's no in-flight evaluation of a request at a lower timestamp. As soon as we close a timestamp, any write at a lower timestamp needs to be bumped. So, we don't want to be too aggressive in closing timestamps; we want to always trail current time some (this doesn't talk about leading closed timestamps for non-blocking ranges) such that there's some slack for recent transactions to come and perform their writes without interference.
The scheme that directly encodes what we just said would be tracking in-flight evaluations in a min-heap, ordered by their evaluation timestamp. As soon as an evaluation is done, we could take it out of the heap and we could close the remaining minimum (if it is below the desired slack). Actually maintaining this heap seems pretty expensive, though. To make it cheaper, we could start compressing the requests into fewer heap entries: we could discretize the timestamps into windows of, say, 10ms. Each window could be represented by the heap by one element, with a ref count. If we'd maintain these buckets in the simplest way possible - a circular buffer - then their count would still be unbounded as it depends on the evaluation time of the slowest request. Going a step further would be placing a limit on the number of buckets and, once the limit is reached, just have new requests join the last bucket. What should this limit be? Well, one doesn't really work, and the next smallest number is two. So, at the limit, we could maintain just two buckets.
The current implementation uses two buckets, and the narrative above is a post-rationalization of how we got it. The current proposal keeps a two-buckets scheme in place.
Let's analyze how the two-buckets scheme behaves vs. the optimal approach of a
min-heap. What we want to know is 1) what is the rate that the tracker advances,
and 2) what is the lag that it imposes. Let's say that each write takes time L
to evaluate. The worst case is when a new request joins
cur immediately before it shifts. So then prev takes L to shift again, as it
needs to wait for the entire duration of the newly added request's evaluation.
So the rate of the tracker is 1/L.
Then there's the lag. We pick a timestamp that we want to close when
initializing cur. We then need to wait for one shift for this to be added to log
entries as the closed timestamp. This is then the active closed timestamp for L,
so the worst-case lag is 2L.
If we compare that to the min-heap, get a rate of ∞ (period of 0) - the closed timestamp can advance as fast as proposals happen. However, we do get a lag of L, because we need to wait for the earliest entry to finish evaluating before the closed timestamp can progress.
So the two-bucket approach leads to twice the lag of the "optimal" min-heap approach.
Each range will have a tracker which keeps track of what requests are currently evaluating - which indirectly means tracking what requests have to finish evaluating for various timestamps to be closed. The tracker maintains an ordered set of buckets of requests in the process of evaluating. Each bucket has a timestamp and a reference count associated with it; every request in the bucket evaluated (or is evaluating) as a higher timestamp than the bucket's timestamp. The timestamp of bucket i is larger than that of bucket i-1. The buckets group requests into "epochs", each epoch being defined by the smallest timestamp allowed for a request to evaluate at (a request's write timestamp needs to be strictly greater than its bucket's timestamp). At any point in time only the last two buckets can be non-empty (and so the implementation will just maintain two buckets and continuously swap them) - we'll call the last bucket cur and the previous one prev. We'll say that we "increment" prev and cur to mean that we move the prev pointer to the next bucket, and we similarly create a new bucket and move the cur pointer to it.
The tracker will logically be part of the ProposalBuffer - it will be
protected by the buffer's peculiar rwlock. Insertions into the buckets will need
the read lock (similar to insertions into the buffer), and the flushing of the
buffer (done under a write lock) removes the flushed requests from their
buckets. Integrating the tracker with the ProposalBuffer helps to ensure that
the closed timestamps carried by successive commands are monotonic. Requests
need to fix the closed timestamp that their proposal will carry either before,
or otherwise atomically with, exiting their bucket. It'd be incorrect to allow a
request to exit its bucket and fix its proposal's timestamp later, because we'd
run the risk of assigning a closed ts that's too high (as it possibly wouldn't
take into account other requests with higher timestamps that have also exited
the bucket after it).
As opposed to the current implementation, the buckets and their timestamps don't directly related to the current closed timestamp. The range's closed timestamp (meaning, the highest timestamp that was attached to a proposal) will be maintained separately by the tracker. This allows the command corresponding to the last requests that leaves the bucket (i.e. commands that leave both buckets empty when they exit) to carry a timestamp relative to the flush time, not one relative to when the request started evaluating. Any new request exiting the buckets, though, will carry a closed ts that's higher or equal to prev*'s timestamp.
When a request arrives, it enters cur. If cur's timestamp is not set, it is
set now to now() - kv.closed_timestamp.target_duration (or according to
whatever the range's policy is). If prev is found to be empty, both prev and
cur are incremented - after the increment, the request finds itself in prev,
and cur is a new, empty bucket.
When creating a new bucket (by incrementing cur), that bucket's timestamp is not set immediately. Instead, it is set by the first request to enter it. Notice that a bucket's timestamp does not depend on the request creating the bucket, so timestamps for buckets are monotonically increasing.
When a requests exits its bucket (i.e. at buffer flush time), the logic depends on what bucket it's exiting. A request that's not the last one in its bucket doesn't do anything special. When the request is the last one:
If exiting prev, then prev and cur are incremented. This way, a new bucket is created and the next arriving request will get to set its timestamp.
If exiting cur, then cur's timestamp is reset to the uninitilized state. This lets the next request set a new, higher one. Note that we can't necessarily increment cur* and prev because prev might not be empty.
Again, the closed ts that a proposal carries is determined at flush time - after all the requests sequenced in the log before it have exited the tracker, and before any request sequenced after it has exited.
If both buckets are empty, the timestamp is now() - kv.closed_timestamp.target_duration. Notice how the timestamps of the buckets
don't matter, and the request's evaluation write timestamp (evaluation
timestamp) also doesn't matter. Indeed, the closed timestamp carrier by a
proposal can be higher than its write timestamp (such a case simply means that a
proposal is writing at, say, 10, and is also promising that future proposals
will only write above, say, 20).
If prev is empty, then cur's ts is used. cur's timestamp must be set, otherwise we would have been in case 1.
Otherwise (i.e. if prev is not empty), prev's timestamp is used.
Lease requests are special as they don't carry a closed timestamp. As explained before, the lease start time acts as the proposal's closed ts. Such requests are not tracked by the tracker.
In the beginning, there are two empty buckets:
| | |
|prev;ts=<nil> |cur;ts=<nil> |
---------------------------------
The first request (r1) comes and enters cur. Since cur's ts is not set,
it gets sets now to now()-5s = 10(say) (say that the
range's target is a trail of 5s). Once the bucket timestamp is set, the request bumps
its own timestamp to its buckets timestamp. The request then notices that prev
is empty, so it shifts cur over (thus now finding itself in prev). Requests
from now on will carry a closed ts >= 10.
|refcnt: 1 (r1) | |
|prev;ts=10 |cur;ts=<nil> |
---------------------------------
While r1 is evaluating, 3 other requests come. They all enter cur. The first
one among them sets cur's timestamp to, say, 15.
|refcnt: 1 (r1) |refcnt:3 |
|prev;ts=10 |cur;ts=15 |
---------------------------------
Let's say two of these requests finish evaluating while r1's eval is still in
progress. They exit cur and they carry prev*'s ts=10). Then let's say r1
finishes evaluating and exits (also carrying 10). Nothing more happens, even
though prev is now empty; if cur were empty, prev and cur would now be
incremented.
|refcnt: 0 |refcnt:1 (r2) |
|prev;ts=10 |cur;ts=15 |
---------------------------------
Now say the last request out of cur (call it r2) exits the bucket. Since
both buckets are empty, it will carry now - 5s. We know that this is >= 15
because 15 was now - 5s at some point in the past. Since cur is empty, its
timestamp gets reset.
|refcnt: 0 |refcnt:0 |
|prev;ts=10 |cur;ts=<nil> |
---------------------------------
At this point the range is technically inactive. If enough time passes, the background process in charge of publishing updates for inactive ranges using the side transport will pick it up. But let's say another request comes in quickly. When the next request comes in, though, the same thing will happen as when the very first request came: it'll enter cur, find prev to be empty, shift over cur->prev and create a new bucket.
The existing closed timestamp tracker and transport will be left in place for a release, working in parallel with the new mechanisms. Followers will only activate closed timestamps communicated by the new mechanism after a cluster version check.