src/ray/pubsub/README.md
This doc has last been updated on Aug 19, 2025. This doc should be updated as the implementation changes.
Ray built a lightweight generalized pubsub module to fix a problem https://github.com/ray-project/ray/issues/14762.
Ray has required several different long-polling GRPC requests for some
protocols. For example, imagine the reference counting protocol. The owner
needs to "wait" until the borrower reports that the reference has gone out of
scope. It is used to be done by sending a single long-polling request "per
object". For example, the borrower replies to the owner's
WaitForRefRemoved(objectID) RPC when the ref is gone out of scope.
It causes a excessive memory usage when many objects need to be tracked because GRPC uses big memory when requests are not replied.
The pubsub module is developed to mitigate this problem. It has a publisher (borrower) and a subscriber (owner). Instead of caching individual RPCs for a long time, it batches a message and publish back to the subscriber. In this way, we could reduce the number of connections from O(#objects) to O(#subscribers). Since the number of subscribers are less than 10,000 in Ray most of the time, this could drastically reduce the memory usage in this situation.
If messages are published before a subscription, they're lost.
In this pubsub implementation, publishers directly send messages to subscribers. There are no intermediary brokers. The performance, especially throughput wasn't a requirement when developed, and therefore the module isn't designed for high throughput.
A command is an operation from a subscriber to publisher. Subscribe and
Unsubscribe are the only commands. Commands are served by PubsubCommandBatch,
which batches them in the FIFO order. We limit to it one in-flight PubsubCommandBatchRequest
at a time to prevent out of order subscribes / unsubscribes. Because of this,
we have to queue up commands and therefore have to batch commands when sending them.
Between the publisher and subscriber, there's only 1 long-polling connection
(only one in-flight request), no matter how many separate channels / keys the
subscriber is subscribed to. The subscriber will always have an in-flight
PubsubLongPollingRequest as long as it's subscribed to something. Whenever a
publisher publishes messages to that subscriber, they're batched to the reply
of the long polling request in FIFO order.
Breakdown of the pubsub flow from the subscriber and publisher Note that this section ignores fault tolerance.
On a Subscribe call
PubsubCommandBatchRequest with its own subscriber_id and a SubMessage
Command containing channel_type and optionally key_idPubsubLongPollingRequest with its own subscriber_idSubscribe done
PubsubCommandBatchReply and runs a callback if provided on subscribePubsubCommandBatchRequest to ensure command orderingMessage Processing
PubsubLongPollingRequest and processes published messagesPubsubLongPollingRequest if subscription still existsUnsubscribe
PubsubCommandBatchRequest with UnsubscribeMessage when unsubscribingSubscribe Handling
PubsubCommandBatchRequest and creates a SubscriberState for the
subscriber if it doesn't existEntityState and a SubscriberStateSubscriptionIndex for each channel, and each
SubscriptionIndex holds EntityState objects for each key. Each EntityState
holds SubscriberState pointers to send / queue up messages to send. There's a
special EntityState in every SubscriptionIndex for "subscribing to all"Initial Long Polling Request
PubsubLongPollingRequest and creates SubscriberState if it doesn't exist.
Note that the SubscriberState might not exist because the initial PubsubLongPollingRequest
could arrive before the associated PubsubCommandBatchRequest.LongPollConnection in the SubscriberState to store the reply + reply callbackSubsequent Long Polling
PubsubLongPollingRequest from the subscriber and checks mailboxUnsubscribe
SubscriberState from the appropriate
EntityStateEntityState if it no longer contains any SubscriberState pointersSubscriberState'sBoth pubsub RPC's will be retried by the client on transient network failures using the retryable grpc client used by other RPC's throughout.
Subscribing and unsubscribing are idempotent so the PubsubCommandBatchRequest can be resent.
Since we restrict it to one in-flight request, the commands will be ordered even with retries.
The subscriber's PubsubLongPollingRequest can also be retried since it comes with a
max_processed_sequence_id. The retry will be sent with the same max_processed_sequence_id
and therefore the publisher will send back the all the messages from max_processed_sequence_id
to max_sequence_id in that subscriber's mailbox. Messages will not be removed from a subscriber's
mailbox until the subscriber sends a request with max_processed_sequence_id > sequence id of message.
Sequence id increments on every publish on a publisher, regardless of channel or entity.
Publishers keep receiving long polling requests from a subscriber as long as there are subscribing entries from them. If subscribers are "dead", they are not sending any more long polling requests. Publishers check if there's been active long polling requests every 30 seconds to check if the subscriber is still alive. If there's no activity on a LongPollingRequest for subscriber_timeout_ms (300s by default), we'll flush the request (we'll reply) and wait to see if the subscriber sends another one. If there hasn't been an active long polling request for over subscriber_timeout_ms, the subscriber is considered dead and all metadata is cleaned up.