docs/rfcs/018-storage-messaging-2.md
Safekeepers need to communicate to each other to
Pageservers need to communicate to safekeepers to decide which SK should provide WAL to the pageserver.
This is an iteration on 015-storage-messaging describing current situation, potential performance issue and ways to address it.
What we have currently is very close to etcd variant described in
015-storage-messaging. Basically, we have single SkTimelineInfo message
periodically sent by all safekeepers to etcd for each timeline.
Also, safekeepers use etcd elections API to make sure only single safekeeper offloads WAL.
It works, and callmemaybe is gone. However, this has a performance
hazard. Currently deployed etcd can do about 6k puts per second (using its own
benchmark tool); on my 6 core laptop, while running on tmpfs, this gets to
35k. Making benchmark closer to our usage etcd watch bench,
I get ~10k received messages per second with various number of publisher-subscribers
(laptop, tmpfs). Diving this by 12 (3 sks generate msg, 1 ps + 3 sk consume them) we
get about 800 active timelines, if message is sent each second. Not extremely
low, but quite reachable.
A lot of idle watches seem to be ok though -- which is good, as pageserver subscribes to all its timelines regardless of their activity.
Also, running etcd with fsyncs disabled is messy -- data dir must be wiped on each restart or there is a risk of corruption errors.
The reason is etcd making much more than what we need; it is a fault tolerant store with strong consistency, but I claim all we need here is just simplest pub sub with best effort delivery, because
We already have centralized source of truth for long running data, like which tlis are on which nodes -- the console.
Momentary data (safekeeper/pageserver progress) doesn't make sense to persist. Instead of putting each change to broker, expecting it to reliably deliver it is better to just have constant flow of data for active timelines: 1) they serve as natural heartbeats -- if node can't send, we shouldn't pull WAL from it 2) it is simpler -- no need to track delivery to/from the broker. Moreover, latency here is important: the faster we obtain fresh data, the faster we can switch to proper safekeeper after failure.
As for WAL offloading leader election, it is trivial to achieve through these heartbeats -- just take suitable node through deterministic rule (min node id). Once network is stable, this is a converging process (well, except complicated failure topology, but even then making it converge is not hard). Such elections bear some risk of several offloaders running concurrently for a short period of time, but that's harmless.
Generally, if one needs strong consistency, electing leader per se is not enough; it must be accompanied with number (logical clock ts), checked at every action to track causality. s3 doesn't provide CAS, so it can't differentiate old/new leader, this must be solved differently.
We could use etcd CAS (its most powerful/useful primitive actually) to issue these leader numbers (and e.g. prefix files in s3), but currently I don't see need for that.
Obviously best effort pub sub is much more simpler and performant; the one proposed is
I took tonic and prototyped the replacement of functionality we currently use with grpc streams and tokio mpsc channels. The implementation description is at the file header.
It is just 500 lines of code and core functionality is complete. 1-1 pub sub
gives about 120k received messages per second; having multiple subscribers in
different connections quickly scales to 1 million received messages per second.
I had concerns about many concurrent streams in singe connection, but 2^20
subscribers still work (though eat memory, with 10 publishers 20GB are consumed;
in this implementation each publisher holds full copy of all subscribers). There
is bench.rs nearby which I used for testing.
SkTimelineInfo is wired here, but another message can be added (e.g. if
pageservers want to communicate with each other) with templating.
Since such broker is stateless, we can run it under k8s. Or add proxying to other members, with best-effort this is simple.
Communication happens in a private network that is not exposed to users; additionally we can add auth to the broker.
We could take some existing pub sub solution, e.g. RabbitMQ, Redis. But in this case IMV simplicity of our own outweighs external dependency costs (RabbitMQ is much more complicated and needs VM; Redis Rust client maintenance is not ideal...). Also note that projects like CockroachDB and TiDB are based on gRPC as well.
Apart from being transport, broker solves one more task: discovery, i.e. letting safekeepers and pageservers find each other. We can let safekeepers know, for each timeline, both other safekeepers for this timeline and pageservers serving it. In this case direct communication is possible:
It was mostly described in 014-safekeeper-gossip, but I want to recap on that.
The main pro is less one dependency: less moving parts, easier to run Neon locally/manually, less places to monitor. Fault tolerance for broker disappears, no kuber or something. To me this is a big thing.
Also (though not a big thing) idle watches for inactive timelines disappear: naturally safekeepers learn about compute connection first and start pushing status to pageserver(s), notifying it should pull.
Importantly, I think that eventually knowing and persisting peers and pageservers on safekeepers is inevitable:
I suppose this membership data should be passed to safekeepers directly from the console because
Cons of direct communication are
I'd use gRPC for direct communication, and in this sense gRPC based broker is a step towards it.