docs/rfcs/035-safekeeper-dynamic-membership-change.md
To quickly recover from safekeeper node failures and do rebalancing we need to be able to change set of safekeepers the timeline resides on. The procedure must be safe (not lose committed log) regardless of safekeepers and compute state. It should be able to progress if any majority of old safekeeper set, any majority of new safekeeper set and compute are up and connected. This is known as a consensus membership change. It always involves two phases: 1) switch old majority to old + new configuration, preventing commits without acknowledge from the new set 2) bootstrap the new set by ensuring majority of the new set has all data which ever could have been committed before the first phase completed; after that switch is safe to finish. Without two phases switch to the new set which quorum might not intersect with quorum of the old set (and typical case of ABC -> ABD switch is an example of that, because quorums AC and BD don't intersect). Furthermore, procedure is typically carried out by the consensus leader, and so enumeration of configurations which establishes order between them is done through consensus log.
In our case consensus leader is compute (walproposer), and we don't want to wake up all computes for the change. Neither we want to fully reimplement the leader logic second time outside compute. Because of that the proposed algorithm relies for issuing configurations on the external fault tolerant (distributed) strongly consistent storage with simple API: CAS (compare-and-swap) on the single key. Properly configured postgres suits this.
In the system consensus is implemented at the timeline level, so algorithm below applies to the single timeline.
A configuration is
struct Configuration {
generation: SafekeeperGeneration, // a number uniquely identifying configuration
sk_set: Vec<NodeId>, // current safekeeper set
new_sk_set: Optional<Vec<NodeId>>,
}
Configuration with new_set present is used for the intermediate step during
the change and called joint configuration. Generations establish order of
generations: we say c1 is higher than c2 if c1.generation >
c2.generation.
Safekeeper starts storing its current configuration in the control file. Update of is atomic, so in-memory value always matches the persistent one.
External CAS providing storage (let's call it configuration storage here) also stores configuration for each timeline. It is initialized with generation 1 and initial set of safekeepers during timeline creation. Executed CAS on it must never be lost.
ProposerGreeting message carries walproposer's configuration if it is already
established (see below), else null. AcceptorGreeting message carries
safekeeper's current Configuration. All further messages (VoteRequest,
VoteResponse, ProposerElected, AppendRequest, AppendResponse) carry
generation number, of walproposer in case of wp->sk message or of safekeeper in
case of sk->wp message.
Basic rule: once safekeeper observes configuration higher than his own it
immediately switches to it. It must refuse all messages with lower generation
that his. It also refuses messages if it is not member of the current generation
(that is, of either sk_set of sk_new_set), though it is likely not unsafe to
process them (walproposer should ignore result anyway).
If there is non null configuration in ProposerGreeting and it is higher than
current safekeeper one, safekeeper switches to it.
Safekeeper sends its current configuration in its first message to walproposer
AcceptorGreeting. It refuses all other walproposer messages if the
configuration generation in them is less than its current one. Namely, it
refuses to vote, to truncate WAL in handle_elected and to accept WAL. In
response it sends its current configuration generation to let walproposer know.
Safekeeper gets PUT /v1/tenants/{tenant_id}/timelines/{timeline_id}/membership
accepting Configuration. Safekeeper switches to the given conf if it is higher than its
current one and ignores it otherwise. In any case it replies with
struct TimelineMembershipSwitchResponse {
conf: Configuration,
term: Term,
last_log_term: Term,
flush_lsn: Lsn,
}
Basic rule is that joint configuration requires votes from majorities in the
both set and new_sk_set.
Compute receives list of safekeepers to connect to from the control plane as
currently and tries to communicate with all of them. However, the list does not
define consensus members. Instead, on start walproposer tracks highest
configuration it receives from AcceptorGreetings. Once it assembles greetings
from majority of sk_set and majority of new_sk_set (if it is present), it
establishes this configuration as its own and moves to voting.
It should stop talking to safekeepers not listed in the configuration at this point, though it is not unsafe to continue doing so.
To be elected it must receive votes from both majorities if new_sk_set is present.
Similarly, to commit WAL it must receive flush acknowledge from both majorities.
If walproposer hears from safekeeper configuration higher than his own (i.e. refusal to accept due to configuration change) it simply restarts.
The following algorithm can be executed anywhere having access to configuration
storage and safekeepers. It is safe to interrupt / restart it and run multiple
instances of it concurrently, though likely one of them won't make
progress then. It accepts desired_set: Vec<NodeId> as input.
Algorithm will refuse to make the change if it encounters previous interrupted change attempt, but in this case it will try to finish it.
It will eventually converge if old majority, new majority and configuration storage are reachable.
new_set is different from desired_set
refuse to change. However, assign join conf to (in memory) var
joint_conf and proceed to step 4 to finish the ongoing change.joint_conf: Configuration: increment current conf number
n and put desired_set to new_sk_set. Persist it in the configuration
storage by doing CAS on the current generation: change happens only if
current configuration number is still n. Apart from guaranteeing uniqueness
of configurations, CAS linearizes them, ensuring that new configuration is
created only following the previous one when we know that the transition is
safe. Failed CAS aborts the procedure.PUT configuration on safekeepers from the current set,
delivering them joint_conf. Collecting responses from majority is required
to proceed. If any response returned generation higher than
joint_conf.generation, abort (another switch raced us). Otherwise, choose
max <last_log_term, flush_lsn> among responses and establish it as
(in memory) sync_position. Also choose max term and establish it as (in
memory) sync_term. We can't finish the switch until majority of the new set
catches up to this sync_position because data before it could be committed
without ack from the new set. Similarly, we'll bump term on new majority
to sync_term so that two computes with the same term are never elected.new_sk_set where it
doesn't exist yet by doing pull_timeline from the majority of the
current set. Doing that on majority of new_sk_set is enough to
proceed, but it is reasonable to ensure that all new_sk_set members
are initialized -- if some of them are down why are we migrating there?POST bump_term(sync_term) on safekeepers from the new set.
Success on majority is enough.PUT configuration on safekeepers from the new set,
delivering them joint_conf and collecting their positions. This will
switch them to the joint_conf which generally won't be needed
because pull_timeline already includes it and plus additionally would be
broadcast by compute. More importantly, we may proceed to the next step
only when <last_log_term, flush_lsn> on the majority of the new set reached
sync_position. Similarly, on the happy path no waiting is needed because
pull_timeline already includes it. However, we should double
check to be safe. For example, timeline could have been created earlier e.g.
manually or after try-to-migrate, abort, try-to-migrate-again sequence.new_conf: Configuration incrementing joint_conf generation and having new
safekeeper set as sk_set and None new_sk_set. Write it to configuration
storage under one more CAS.PUT configuration on safekeepers from the new set,
delivering them new_conf. It is enough to deliver it to the majority
of the new set; the rest can be updated by compute.I haven't put huge effort to make the description above very precise, because it is natural language prone to interpretations anyway. Instead I'd like to make TLA+ spec of it.
Description above focuses on safety. To make the flow practical and live, here a few more considerations.
6 is completed
it is safe to rollback to the old conf with one more CAS.pull_timeline is generally unsafe without involving
generations, so seems simpler to treat existing timeline as success. However, this also
has a disadvantage: you might imagine an surpassingly unlikely schedule where condition in
the step 5 is never reached until compute is (re)awaken up to synchronize new member(s).
I don't think we'll observe this in practice, but can add waking up compute if needed.desired_set,
jump to step 7, using it as new_conf.The procedure ought to be driven from somewhere. Obvious candidates are control plane and storage_controller; and as each of them already has db we don't want yet another storage. I propose to manage safekeepers in storage_controller because 1) since it is in rust it simplifies simulation testing (more on this below) 2) it already manages pageservers.
This assumes that migration will be fully usable only after we migrate all tenants/timelines to storage_controller. It is discussible whether we want also to manage pageserver attachments for all of these, but likely we do.
This requires us to define storcon <-> cplane interface and changes.
First of all, control plane should change storing safekeepers per timeline instead of per tenant because we can't migrate tenants atomically.
The important question is how updated configuration is delivered from
storage_controller to control plane to provide it to computes. As always, there
are two options, pull and push. Let's do it the same push as with pageserver
/notify-attach because 1) it keeps storage_controller out of critical compute
start path 2) uniformity. It makes storage_controller responsible for retrying
notifying control plane until it succeeds.
It is not needed for the control plane to fully know the Configuration. It is
enough for it to only to be aware of the list of safekeepers in the latest
configuration to supply it to compute, plus associated generation number to
protect from stale update requests and to also pass it to compute.
So, cplane /notify-safekeepers for the timeline can accept JSON like
{
tenant_id: String,
timeline_id: String,
generation: u32,
safekeepers: Vec<SafekeeperId>,
}
where SafekeeperId is
{
node_id: u64,
host: String
}
In principle host is redundant, but may be useful for observability.
The request updates list of safekeepers in the db if the provided conf
generation is higher (the cplane db should also store generations for this).
Similarly to
/notify-attach,
it should update db which makes the call successful, and then try to schedule
apply_config if possible, it is ok if not. storage_controller should rate
limit calling the endpoint, but likely this won't be needed, as migration
throughput is limited by pull_timeline.
Timeline (branch) creation in cplane should call storage_controller POST
tenant/:tenant_id/timeline like it currently does for sharded tenants.
Response should be augmented with safekeepers_generation and safekeepers
fields like described in /notify-safekeepers above. Initially (currently)
these fields may be absent; in this case cplane chooses safekeepers on its own
like it currently does. The call should be retried until it succeeds.
Timeline deletion and tenant deletion in cplane should call appropriate storage_controller endpoints like it currently does for sharded tenants. The calls should be retried until they succeed.
When compute receives safekeeper list from control plane it needs to know the
generation to check whether it should be updated (note that compute may get
safekeeper list from either cplane or safekeepers). Currently neon.safekeepers
GUC is just a comma separates list of host:port. Let's prefix it with
g#<generation>: to this end, so it will look like
g#42:safekeeper-0.eu-central-1.aws.neon.tech:6401,safekeeper-2.eu-central-1.aws.neon.tech:6401,safekeeper-1.eu-central-1.aws.neon.tech:6401
To summarize, list of cplane changes:
safekeeper_generation field./notify-safekeepers endpoint.neon.safekeepers GUC should be prefixed with g#<generation>:.If desired, we may continue using current 'load everything on startup and keep in memory' approach: single timeline shouldn't take more than 100 bytes (it's 16 byte tenant_id, 16 byte timeline_id, int generation, vec of ~3 safekeeper ids plus some flags), so 10^6 of timelines shouldn't take more than 100MB.
Similar to pageserver attachment Intents storage_controller would have in-memory
MigrationRequest (or its absense) for each timeline and pool of tasks trying
to make these request reality; this ensures one instance of storage_controller
won't do several migrations on the same timeline concurrently. In the first
version it is simpler to have more manual control and no retries, i.e. migration
failure removes the request. Later we can build retries and automatic
scheduling/migration around. MigrationRequest is
enum MigrationRequest {
To(Vec<NodeId>),
FinishPending,
}
FinishPending requests to run the procedure to ensure state is clean: current
configuration is not joint and the majority of safekeepers are aware of it, but do
not attempt to migrate anywhere. If the current configuration fetched on step 1 is
not joint it jumps to step 7. It should be run at startup for all timelines (but
similarly, in the first version it is ok to trigger it manually).
safekeepers table mirroring current nodes should be added, except that for
scheduling_policy: it is enough to have at least in the beginning only 3
fields: 1) active 2) paused (initially means only not assign new tlis there
3) decommissioned (node is removed).
timelines table:
table! {
// timeline_id is primary key
timelines (tenant_id, timeline_id) {
timeline_id -> Varchar,
tenant_id -> Varchar,
start_lsn -> pg_lsn,
generation -> Int4,
sk_set -> Array<Int8>, // list of safekeeper ids
new_sk_set -> Nullable<Array<Int8>>, // list of safekeeper ids, null if not joint conf
cplane_notified_generation -> Int4,
sk_set_notified_generation -> Int4, // the generation a quorum of sk_set knows about
deleted_at -> Nullable<Timestamptz>,
}
}
start_lsn is needed to create timeline on safekeepers properly, see below. We
might also want to add ancestor_timeline_id to preserve the hierarchy, but for
this RFC it is not needed.
cplane_notified_generation and sk_set_notified_generation fields are used to
track the last stage of the algorithm, when we need to notify safekeeper set and cplane
with the final configuration after it's already committed to DB.
The timeline is up-to-date (no migration in progress) if new_sk_set is null and
*_notified_generation fields are up to date with generation.
It's possible to replace *_notified_generation with one boolean field migration_completed,
but for better observability it's nice to have them separately.
Node management is similar to pageserver:
/control/v1/safekeeper inserts safekeeper./control/v1/safekeeper lists safekeepers./control/v1/safekeeper/:node_id gets safekeeper./control/v1/safekeper/:node_id/scheduling_policy changes status to e.g.
offline or decomissioned. Initially it is simpler not to schedule any
migrations here.Safekeeper deploy scripts should register safekeeper at storage_contorller as they currently do with cplane, under the same id.
Timeline creation/deletion will work through already existing POST and DELETE
tenant/:tenant_id/timeline. Cplane is expected to retry both until they
succeed. See next section on the implementation details.
We don't want to block timeline creation/deletion when one safekeeper is down. Currently this is crutched by compute implicitly creating timeline on any safekeeper it is connected to. This creates ugly timeline state on safekeeper when timeline is created, but start LSN is not defined yet. Next section describes dealing with this.
Tenant deletion repeats timeline deletion for all timelines.
Migration API: the first version is the simplest and the most imperative:
/control/v1/safekeepers/migrate schedules MigrationRequests to move
all timelines from one safekeeper to another. It accepts json{
"src_sk": NodeId,
"dst_sk": NodeId,
"limit": Optional<u32>,
}
Returns list of scheduled requests.
/control/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate schedules MigrationRequest
to move single timeline to given set of safekeepers:struct TimelineSafekeeperMigrateRequest {
"new_sk_set": Vec<NodeId>,
}
In the first version the handler migrates the timeline to new_sk_set synchronously.
Should be retried until success.
In the future we might change it to asynchronous API and return scheduled request.
Similar call should be added for the tenant.
It would be great to have some way of subscribing to the results (apart from looking at logs/metrics).
GET /control/v1/tenant/:tenant_id/timeline/:timeline_id/ should return
current in memory state of the timeline and pending MigrationRequest,
if any.
PUT /control/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate_abort tries to abort the
migration by switching configuration from the joint to the one with (previous) sk_set under CAS
(incrementing generation as always).
For timeline creation/deletion we want to preserve the basic assumption that unreachable minority (1 sk of 3) doesn't block their completion, but eventually we want to finish creation/deletion on nodes which missed it (unless they are removed). Similarly for migration; it may and should finish even though excluded members missed their exclusion. And of course e.g. such pending exclusion on node C after migration ABC -> ABD must not prevent next migration ABD -> ABE. As another example, if some node missed timeline creation it clearly must not block migration from it. Hence it is natural to have per safekeeper background reconciler which retries these ops until they succeed. There are 3 possible operation types, and the type is defined by timeline state (membership configuration and whether it is deleted) and safekeeper id: we may need to create timeline on sk (node added), locally delete it (node excluded, somewhat similar to detach) or globally delete it (timeline is deleted).
Next, on storage controller restart in principle these pending operations can be
figured out by comparing safekeepers state against storcon state. But it seems
better to me to materialize them in the database; it is not expensive, avoids
these startup scans which themselves can fail etc and makes it very easy to see
outstanding work directly at the source of truth -- the db. So we can add table
safekeeper_timeline_pending_ops
table! {
// timeline_id, sk_id is primary key
safekeeper_timeline_pending_ops (sk_id, tenant_id, timeline_id) {
sk_id -> int8,
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
op_type -> Varchar,
}
}
We load all pending ops from the table on startup into the memory. The table is needed only to preserve the state between restarts.
op_type can be include (seed from peers and ensure generation is up to
date), exclude (remove locally) and delete. Field is actually not strictly
needed as it can be computed from current configuration, but gives more explicit
observability.
generation is necessary there because after op is done reconciler must remove
it and not remove another row with higher gen which in theory might appear.
Any insert of row should overwrite (remove) all rows with the same sk and
timeline id but lower generation as next op makes previous obsolete. Insertion
of op_type delete overwrites all rows.
About exclude: rather than adding explicit safekeeper http endpoint, it is
reasonable to reuse membership switch endpoint: if safekeeper is not member
of the configuration it locally removes the timeline on the switch. In this case
404 should also be considered an 'ok' answer by the caller.
So, main loop of per sk reconcile reads safekeeper_timeline_pending_ops
joined with timeline configuration to get current conf (with generation n)
for the safekeeper and does the jobs, infinitely retrying failures:
include):exclude):delete), call delete.In cases 1 and 2 remove safekeeper_timeline_pending_ops for the sk and
timeline with generation <= n if op_type is not delete.
In case 3 also remove safekeeper_timeline_pending_ops
entry + remove timelines entry if there is nothing left in safekeeper_timeline_pending_ops for the timeline.
Let's consider in details how APIs can be implemented from this angle.
Timeline creation. It is assumed that cplane retries it until success, so all actions must be idempotent. Now, a tricky point here is timeline start LSN. For the initial (tenant creation) call cplane doesn't know it. However, setting start_lsn on safekeepers during creation is a good thing -- it provides a guarantee that walproposer can always find a common point in WAL histories of safekeeper and its own, and so absence of it would be a clear sign of corruption. The following sequence works:
Create timeline (or observe that it exists) on pageserver, figuring out last_record_lsn in response.
Choose safekeepers and insert (ON CONFLICT DO NOTHING) timeline row into the db. Note that last_record_lsn returned on the previous step is movable as it changes once ingestion starts, insert must not overwrite it (as well as other fields like membership conf). On the contrary, start_lsn used in the next step must be set to the value in the db. cplane_notified_generation can be set to 1 (initial generation) in insert to avoid notifying cplane about initial conf as cplane will receive it in timeline creation request anyway.
Issue timeline creation calls to at least majority of safekeepers. Using majority here is not necessary but handy because it guarantees that any live majority will have at least one sk with created timeline and so reconciliation task can use pull_timeline shared with migration instead of create timeline special init case. OFC if timeline is already exists call is ignored.
For minority of safekeepers which could have missed creation insert
entries to safekeeper_timeline_pending_ops. We won't miss this insertion
because response to cplane is sent only after it has happened, and cplane
retries the call until 200 response.
There is a small question how request handler (timeline creation in this case) would interact with per sk reconciler. In the current implementation we first persist the request in the DB, and then send an in-memory request to each safekeeper reconciler to process it.
For pg version / wal segment size: while we may persist them in timelines
table, it is not necessary as initial creation at step 3 can take them from
pageserver or cplane creation call and later pull_timeline will carry them
around.
Timeline migration.
pull_timeline onto new_sk_set, update membership
configuration on all safekeepers, notify cplane, etc. All operations are idempotent,
so we don't need to persist anything in the database at this stage. If any errors occur,
it's safe to retry or abort the migration.exclude entries into safekeeper_timeline_pending_ops
in the same DB transaction. Adding exclude entries atomically is nesessary because after
CAS we don't have the list of excluded safekeepers in the timelines table anymore, but we
need to have them persisted somewhere in case the migration is interrupted right after the CAS.exclude
requests to in-memory queue for safekeeper reconciler. If the algrorithm is retried, it's
possible that we have already committed exclude requests to DB, but didn't send them to
the in-memory queue. In this case we need to read them from safekeeper_timeline_pending_ops
because it's the only place where they are persistent. The fields sk_set_notified_generation
and cplane_notified_generation are updated after each step. The migration is considered
fully completed when they match the generation field.In practice, we can report "success" after stage 3 and do the "finish" step in per-timeline reconciler (if we implement it). But it's wise to at least try to finish them synchronously, so the timeline is always in a "good state" and doesn't require an old quorum to commit WAL after the migration reported "success".
Timeline deletion: just set deleted_at on the timeline row and insert
safekeeper_timeline_pending_ops entries in the same xact, the rest is done by
per sk reconcilers.
When node is removed (set to decommissioned), safekeeper_timeline_pending_ops
for it must be cleared in the same transaction.
Operations described above executed concurrently might create some errors but do not prevent progress, so while we normally don't want to run multiple instances of storage_controller it is fine to have it temporarily, e.g. during redeploy.
To harden against some controller instance creating some work in
safekeeper_timeline_pending_ops and then disappearing without anyone pickup up
the job per sk reconcilers apart from explicit wakeups should scan for work
periodically. It is possible to remove that though if all db updates are
protected with leadership token/term -- then such scans are needed only after
leadership is acquired.
Any interactions with db update in-memory controller state, e.g. if migration request failed because different one is in progress, controller remembers that and tries to finish it.
neon_local should be switched to use storage_controller, playing role of
control plane.
There should be following layers of tests:
Model checked TLA+ spec specifies the algorithm and verifies its basic safety.
To cover real code and at the same time test many schedules we should have simulation tests. For that, configuration storage, storage_controller <-> safekeeper communication and pull_timeline need to be mocked and main switch procedure wrapped to as a node (thread) in simulation tests, using these mocks. Test would inject migrations like it currently injects safekeeper/walproposer restarts. Main assert is the same -- committed WAL must not be lost.
Since simulation testing injects at relatively high level points (not
syscalls), it omits some code, in particular pull_timeline. Thus it is
better to have basic tests covering whole system as well. Extended version of
test_restarts_under_load would do: start background load and do migration
under it, then restart endpoint and check that no reported commits
had been lost. I'd also add one more creating classic network split scenario, with
one compute talking to AC and another to BD while migration from nodes ABC to ABD
happens.
Simple e2e test should ensure that full flow including cplane notification works.
Note that
Let's have the following implementation bits for gradual rollout:
neon.safekeepers_proto_version flag.
Initially both compute and safekeepers will be able to talk both
versions so that we can delay force restart of them and for
simplicity of rollback in case it is needed.-set-safekeepers config option disabled by
default. Timeline creation request chooses safekeepers
(and returns them in response to cplane) only when it is set to
true.neon.safekeepers GUC with generation number. When it is 0
(or prefix not present at all), walproposer behaves as currently, committing on
the provided safekeeper list -- generations are disabled.
If it is non 0 it follows this RFC rules.notify-safekeepers.Then the rollout for a region would be:
--set-safekeepers so that all new timelines
are on storage controller.Until all timelines are managed by storcon we'd need to use current ad hoc script to migrate if needed. To keep state clean, all storage controller managed timelines must be migrated before that, or controller db and configurations state of safekeepers dropped manually.
Very rough implementation order:
Currently, pull_timeline doesn't work correctly with evicted timelines because
copy would point to original partial file. To fix let's just do s3 copy of the
file. It is a bit stupid as generally unnecessary work, but it makes sense to
implement proper migration before doing smarter timeline archival. Issue
Steps above suggest walproposer restart (with re-election) and thus reconnection to safekeepers. Since by bumping term on new majority we ensure that leader terms are unique even across generation switches it is possible to preserve connections. However, it is more complicated, reconnection is very fast and it is much more important to avoid compute restart than millisecond order of write stall.
Multiple joint consensus: algorithm above rejects attempt to change membership while another attempt is in progress. It is possible to overlay them and AFAIK Aurora does this but similarly I don't think this is needed.
We should use Compute <-> safekeeper protocol change to include other (long yearned) modifications: