docs/RFCS/20160420_proposer_evaluated_kv.md
Pursue an idea by @spencerkimball brought up in the context of #5985: move the bulk of the logic involved in applying commands to a Replica upstream of Raft.
The first exploration of migrations highlighted the complexity concomitant with migrations on top of the current Raft execution model.
As of writing, the leading replica is the only replica that proposes commands to Raft. However, each replica applies commands individually, which leads to large swaths of code living "downstream" of Raft. Since all of this code needs to produce identical output even during migrations, a satisfactory migration story is almost unachievable, a comment made by several reviewers (and the author).
By having the proposing replica compute the effects on the key-value state before proposing the command to Raft, much of this downstream code moves upstream of Raft; naively what's left are simple writes to key-value pairs (i.e. all commands move upstream except for a simplified MVCCPut operation). While some complexity remains, this has promise for a much facilitated migration story, as well as de-triplicating work previously done by each individual replica.
The main change in the execution path will be in proposeRaftCommand, which
currently takes a BatchRequest and creates a pendingCmd, which currently
has the form
type pendingCmd struct {
ctx context.Context
idKey storagebase.CmdIDKey
done chan roachpb.ResponseWithError // Used to signal waiting RPC handler
raftCmd struct {
RangeID RangeID
OriginReplica ReplicaDescriptor
Cmd BatchRequest
}
}
and within which the main change is changing the type of Cmd from
BatchRequest to a new type which carries the effects of the application of
the batch, as computed (by the lease holder) earlier in proposeRaftCommand.
For simplicity, in this draft we will assume that the raftCmd struct will
be modified according to these requirements.
The "effects" contain
Put, DeleteRange, CPut, BeginTransaction, or RequestLease.
Some alternatives exist: writes could be encoded as []MVCCKeyValue, or even
as post-MVCC raw (i.e. opaque) key-value pairs. We'll assume the former for
now.EndTransaction with a commit
trigger, or when the lease holder changes (to apply the new lease).
There are more of them, but this already shows that these triggers (which are
downstream of Raft) will require various pieces of data to be passed around
with them.Note that for the "triggers" above, a lot of the work they currently do can
move pre-Raft as well. For example, for RequestLease below we must change some
in-memory state on *Replica (lease proto and timestamp cache) plus call out
to Gossip. Still, the bulk of the code in (*Replica).RequestLease (i.e. the
actual logic) will move pre-Raft, and the remaining trigger is conditional on
being on the lease holder (so that one could even remove it completely from the
"effects" and hack their execution into the lease holder role itself).
Thus, we arrive at the following (proto-backed) candidate struct:
# Pseudo-Go-Protobuf, all naming ad-hoc and TBD.
message RaftCmd {
optional SomeMetadata # tbd (proposing replica, enginepb.MVCCStats diff, ...)
repeated oneof effect {
# Applied in field and slice order.
optional Writes writes = ...; # see MVCC section for Writes type
# Carries out the following:
# * sanity checks
# * compute stats for new left hand side (LHS) [move pre-raft]
# * copy some data to new range (abort cache) [move pre-raft]
# * make right-hand side `*Replica`
# * copy tsCache (in-mem) [lease holder only]
# * r.store.SplitRange
# * maybe trigger Raft election
optional Split ...;
# Carries out the following:
# * sanity checks
# * stats computations [move pre-raft]
# * copy some data to new range (abort cache) [move pre-raft]
# * remove some metadata [move pre-raft]
# careful: this is a DeleteRange; could benefit from a ClearRange
# Raft effect (all inline values; no versioning needed)
# * clear tsCache (not really necessary)
optional Merge ...:
# Carries out the following:
# * `(*Replica).setDesc`
# * maybe add to GC queue [lease holder only]
optional ChangeReplicas ...;
# Carries out the following:
# * maybe gossips system config [lease holder only]
optional ModifiedSpan ...;
# Carries out the following:
# * sanity checks (in particular FindReplica)
# * update in-memory lease
# * on Raft(!) leader: maybe try to elect new leader
# * on (new) lease holder: update tsCache low water mark
# (might be easier to do this unconditionally on all nodes)
# * on (new) lease holder: maybe gossip system configs
optional Lease ...;
# Those below may not be required, but something to think about
optional GCRange ...; # allow optimized GC (some logic below of Raft?)
optional ClearRange ...; # allow erasing key range without huge proposal
}
}
Since this simple structure is easy to reason about, it seems adequate to strive for the minimum amount of interaction between the various effects (though the data required for the triggers may benefit from more interaction between them in some cases)
Before a lease holder proposes a command, it has the job of translating a
BatchRequest into a RaftCmd. Note that we assume that writes wait for
both reads and writes in the command queue (so that at the point at which
the RaftCmd is constructed, all prior mutations have been carried out and
the timestamp cache updated).
This largely takes over the work which was done in
applyRaftCommandInBatch -> executeWriteBatch -> executeBatch
-> (ReplicaCommands) -> (MVCC)
This is hopefully largely mechanical, with the bulk of work in the MVCC layer, outlined in the next section.
The work to be carried out in ReplicaCommands is mostly isolating the various
side effects (most prominently the commit and range lease triggers). In the
process, it should be possible to remove these methods from (*Replica)
(though that is not a stated goal of this RFC).
Operations in the MVCC layer (MVCC{Get,Put,Scan,DeleteRange,...}) currently
return some variation of ([]roachpb.KeyValue, []roachpb.Intent, error) and
operate on the underlying engine.Engine (which in practice is a RocksDB
batch) with heavy use of iterators.
With the new code, we want mostly the same, but with two important changes:
Writes field, which (in some way or
another) tracks the key-value pairs written (i.e. created, changed or
deleted) over the lifetime of the batch.It's also desirable to avoid invasive changes to the MVCC layer. The following
extension to the Engine interface seems adequate (mod changes to improve
buffer reuse):
type Engine interface {
// ...
NewBatch() Batch
}
var ErrNotTracking = errors.New("batch is not tracking writes")
type Writes []MVCCKeyValue // actual type see below
type Batch interface {
Engine
// GetWrites returns a slice of Write updates which were carried out during
// batch, or ErrNotTracking if the batch does not support this feature.
// The ordering of the writes is not guaranteed to be stable and may not
// correspond to the actual order in which the writes were carried out.
// However, the result of applying the updates in the returned order *is*
// stable, i.e. non-commutative (overlapping writes) can't rearrange
// themselves freely). [i.e. can use a map under the hood]
// The returned Writes are read-only and must not be used after their Batch
// is finalized. [i.e. can reclaim the memory at that point, or we add an
// explicit method to finalize the writes if that is more convenient].
GetWrites() (Writes, error)
}
With this strategy, the MVCC changes are fairly locally scoped and not too invasive.
Naively, this can be achieved by wrapping a "regular" batch with an appropriate in-memory map which is populated on writes.
However, using RocksDB's WriteBatch
format
seems opportune:
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring
// kTypeDeletion varstring
// kTypeSingleDeletion varstring
// kTypeMerge varstring varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// varstring :=
// len: varint32
// data: uint8[len]
WriteBatch directly from this representation.
Serialization is a key point in performance[1].WriteBatches
(which would be needed to remove write-write blocking).The format should be relatively stable (likely only additions), but there might be future migrations coming up as RocksDB changes their underlying assumptions. We should consider checking in with them about this idea.
Using WriteBatch also means that the writes are completely opaque to the
stack (at least unless unserialized early, and loaded, and then accessed
through MVCC operations), so the followers essentially can't act based on
them. That should not be a problem (but could if we want the side effects
to use information from the write set in a performant way).
This will be a new, straightforward facility which applies a RaftCmd against
the underlying Engine in a batch. The point of this whole change is that this
part of the stack is very straightforward and with no side effects.
(*Replica).executeCmd is likely a good place where most of this code would
live.
Replay protection is naively an issue because applying a logicless batch of writes is at odds with our current method, which largely relies on write/timestamp conflicts. However, as @bdarnell pointed out, as of recently (#5973) we (mostly) colocate the Raft lease holder and the range lease holder on the same node and have more options for dealing with this (i.e. making the application of a command conditional on the Raft log position it was originally proposed at).
More concretely (via @bdarnell):
The RaftCmd would also include a term and log index, to be populated with the lease holder's latest log information when it is proposed. The writes would only be applied if it committed at that index; they would be dropped (and reproposed) if they committed at any other index. This would only be a possibility when the lease holder is not the raft leader (and even then it's unlikely in stable lease situations). We may be able to be even stricter about colocating the two roles (in which case we could perhaps remove raft-level replays completely), but I'm wary of forcing too many raft elections.
These should be limited to those tests that exercise triggers and/or use
lower-level Raft internals inside of the storage package (plus some surprises
which are certain to occur with a change of this magnitude).
We implement the Freeze/Unfreeze Raft commands suggested by @bdarnell in
#5985: everything proposed between Freeze and Unfreeze applies as a
non-retryable error (and in particular, does not change state). Assuming a
working implementation of this (which will need to be hashed out separately and
may require its own migration considerations), a migration entails the
following:
Freeze on all ReplicasUnfreeze on all Replicas.Since the replaced binary will not have to apply Raft log entries proposed by the previous binary, all is well.
This work is nicely isolated from the remainder of this RFC, and can (should) be tackled separately as soon as consensus is reached.
This migration strategy also has other upshots:
There were concerns that this change would make it harder/impossible to get rid of write-write blocking in the future. However, that concern was based on the possibility of Raft reordering, which we can eliminate at this point (see Replay protection), and the problem boils down to having all intermediate states which would result from applying the prefixes of the existing overlapping writes (including side effects) available to execute on top of, which happens at a layer above the changes in this RFC.
Tackle the following in parallel (and roughly in that order):
{F,Unf}reeze; these are immediately useful for
all upcoming migrations.*Replica and clearly
separate the parts which can be moved pre-proposal as well as the parameters
required for the post-proposal part. This informs the actual design of the
Raft "effects" in this proposal.
Most of these should come up naturally when trying to remove the (*Replica)
receiver on most of the ReplicaCommands (RequestLease, Put, ...) and
manually going through the rest of the call stack up to processRaftCommand.RocksDB-specific
implementation and guarding against upstream changes.
We can trust RocksDB to be backwards-compatible in any changes they make, but
we require bidirectional compatibility. We create a batch on the lease holder and
ship it off to the follower to be applied, and the follower might be running
either an older or a newer version of the code. If they add a new record
type, we either need to be able to guarantee that the new record types won't
be used in our case, or translate them back to something the old version can
understand.Batch interface. This could be done after the
WriteBatch encoding is done or, if required to unblock the implementation
of this RFC, start with a less performant version based on an auxiliary map
and protobuf serialization.The goal here should be to identify all the necessary refactors to keep them out of the remaining "forklift" portion of the change.
It's scary
As I understand it, the change is very ambitious, but not as vast in scope as, for example, the plans we have with distributed SQL (which, of course, are not on such a tight deadline).
Other than leaving the current system in place and pushing complexity onto the migration story, none.
None currently.
[1]: via @petermattis:
There is another nice effect from using the WriteBatch: instead of marshaling and unmarshaling individual keys and values, we'll be passing around a blob. While investigating batch insert performance I added some instrumentation around RaftCommand marshaling and noticed:
raft command marshal: 20002: 6.2ms
raft command unmarshal: 20002: 29.1ms
raft command marshal: 20002: 6.0ms
raft command unmarshal: 20002: 12.4ms
raft command marshal: 20002: 6.0ms
raft command unmarshal: 20002: 42.6ms
The first number (20002) is the number of requests in the BatchRequest and the second is the time it took to marshal/unmarshal. The marshaled data size is almost exactly 1MB.