pip/pip-468.md
Sub-PIP of PIP-460: Scalable Topics
PIP-460 introduces scalable topics as a new topic type in Pulsar, built on a DAG of range segments that can be split and merged to scale independently of the initial configuration. PIP-460 defines the overall vision but defers the detailed design of each component to dedicated sub-PIPs.
This PIP specifies the Scalable Topic Controller — the broker-side component responsible for:
Without a well-defined controller, split and merge operations would be unsafe: producers could write to segments that have no subscription cursors, consumers could miss messages during topology changes, and concurrent layout modifications could corrupt the segment DAG.
The controller subsystem consists of four layers:
┌──────────────────────────────────────────────────────────┐
│ Admin API Layer │
│ ScalableTopics REST + Segments REST │
└──────────────┬───────────────────────┬───────────────────┘
│ │
┌──────────────▼───────────────────────▼───────────────────┐
│ ScalableTopicService │
│ Per-broker singleton; manages controllers │
└──────────────────────────┬───────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────┐
│ ScalableTopicController (per topic) │
│ Leader-elected; owns layout, split/merge, consumers │
│ │
│ ┌─────────────────┐ ┌───────────────────────────────┐ │
│ │ SegmentLayout │ │ SubscriptionCoordinator (×N) │ │
│ │ (immutable DAG) │ │ per-subscription assignments │ │
│ └─────────────────┘ └───────────────────────────────┘ │
└──────────────────────────┬───────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────┐
│ ScalableTopicResources │
│ Metadata store access (read/write/watch) │
└──────────────────────────────────────────────────────────┘
A scalable topic involves three distinct broker roles. Understanding which broker handles which operation is critical to the design.
A DAG watch session (scalable topic lookup) can be served by any broker in the cluster. The session reads segment metadata from the shared metadata store and registers a watch for changes. No state is held on the broker beyond the watch registration. This means producers and consumers can connect to any broker for topic discovery — the same way regular topic lookups work today.
When the metadata changes (due to a split or merge performed by the controller leader), the metadata store notification fires on whichever broker is serving the watch session, and that broker pushes the updated DAG to the client.
Each scalable topic has exactly one controller leader across the cluster, elected via the metadata store. The controller leader is the only broker that can:
Clients discover the controller leader's broker URL through the DAG watch session response (controller_broker_url field). StreamConsumer and CheckpointConsumer clients then connect directly to the controller leader to register and receive assignments.
Admin API requests for split and merge are routed to the ScalableTopicService on any broker, which obtains (or creates) a local ScalableTopicController that participates in leader election. If the local controller is the leader, it executes the operation directly. If not, the request must be redirected to the leader (or the controller must first win the election). The split and merge admin endpoints do not require the request to land on the leader broker — the ScalableTopicService handles this transparently.
Individual segment topics (segment://) are regular persistent topics from the broker's perspective. They are distributed across the cluster by the standard Pulsar load manager and namespace bundle assignment — the same mechanism used for persistent:// topics. Each segment topic is owned by exactly one broker at any time, determined by its namespace bundle hash.
The controller leader does not need to own any of the segment topics. When the controller needs to operate on a segment (create, terminate, delete, discover subscriptions), it uses the Segments admin API, which routes the request to whichever broker currently owns that segment's namespace bundle. This decoupling means the controller can coordinate segments spread across many brokers without requiring co-location.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Broker A │ │ Broker B │ │ Broker C │
│ │ │ (controller │ │ │
│ segment-0 │ │ leader) │ │ segment-2 │
│ segment-1 │ │ │ │ │
│ │ │ DAG watch │ │ DAG watch │
│ DAG watch │ │ sessions │ │ sessions │
│ sessions │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘
│
Split/merge ops
use admin API to
reach segments on
Broker A and C
In this example, Broker B is the controller leader but owns no segments. When it executes a split of segment-0, it calls the Segments admin API which routes to Broker A (the owner of segment-0).
A per-broker singleton, created and owned by BrokerService. It manages the lifecycle of all ScalableTopicController instances on the broker.
Responsibilities:
getOrCreateController() and releases them on topic unload or broker shutdown.createScalableTopic() and deleteScalableTopic() admin operations, including creating/deleting the underlying segment topics via the Segments admin API.splitSegment() and mergeSegments() requests to the appropriate controller.A per-topic coordinator. Only one instance across the cluster is the leader for a given scalable topic, ensured by leader election through the metadata store. The leader is the only instance that modifies the segment layout or assigns consumers.
State:
| Field | Type | Description |
|---|---|---|
topicName | TopicName | The topic:// name of the scalable topic |
currentLayout | SegmentLayout | The current immutable snapshot of the segment DAG |
subscriptions | Map<String, SubscriptionCoordinator> | Per-subscription consumer coordinators |
leaderState | LeaderElectionState | Current leader election state |
leaderElection | LeaderElection<String> | Metadata store leader election handle; value is the broker URL |
Key operations:
initialize() — loads the current metadata from the store, then attempts leader election. The elected leader stores its broker service URL so that clients can discover and connect to it.splitSegment(segmentId) — splits an active segment at its midpoint (see Split Protocol).mergeSegments(segmentId1, segmentId2) — merges two adjacent active segments (see Merge Protocol).registerConsumer(subscription, consumer) — registers a consumer and returns its initial segment assignment.unregisterConsumer(subscription, consumer) — removes a consumer and triggers rebalancing.getLeaderBrokerUrl() — returns the current leader's broker URL, used by clients to connect to the controller.An immutable, versioned snapshot of the segment DAG. All mutation methods (splitSegment, mergeSegments, pruneSegment) return a new SegmentLayout instance — the original is never modified. This ensures safe concurrent reads without locking.
Fields:
| Field | Type | Description |
|---|---|---|
epoch | long | Monotonically increasing version; incremented on every layout change |
nextSegmentId | long | Counter for assigning IDs to new segments |
allSegments | Map<Long, SegmentInfo> | Complete DAG: all segments (active + sealed) |
activeSegments | Map<Long, SegmentInfo> | Filtered view: only ACTIVE segments |
Key operations:
findActiveSegment(hash) — returns the active segment whose hash range contains the given hash value. Used by producers for message routing.splitSegment(segmentId) — validates the segment is active, computes the midpoint of its hash range, creates two child SegmentInfo records covering [start, mid] and [mid+1, end], seals the parent, and returns a new layout with incremented epoch.mergeSegments(id1, id2) — validates both segments are active and adjacent (the end of one equals start - 1 of the other), creates a merged SegmentInfo covering the combined range, seals both parents, and returns a new layout.getLineage(segmentId) — returns the full ancestor + descendant chain for a segment, used for DAG traversal during catch-up reads.toMetadata() / fromMetadata() — converts to/from the persisted ScalableTopicMetadata format.A record representing a single segment in the DAG:
| Field | Type | Description |
|---|---|---|
segmentId | long | Unique, monotonically increasing identifier |
hashRange | HashRange | The [start, end] inclusive hash range this segment covers |
state | SegmentState | ACTIVE or SEALED |
parentIds | List<Long> | Parent segments (empty for root segments) |
childIds | List<Long> | Child segments (empty for leaf/active segments) |
createdAtEpoch | long | Layout epoch in which this segment was created (0 for the initial segments) |
sealedAtEpoch | long | Layout epoch in which this segment was sealed; only meaningful when state == SEALED. 0 when the segment is ACTIVE; this sentinel is unambiguous because sealing always happens as part of a split or merge, which increments the epoch — so a segment can never be sealed at epoch 0. |
Factory methods: SegmentInfo.active(id, range, epoch) and SegmentInfo.sealed(...).
Manages segment-to-consumer assignments for a single subscription within a scalable topic.
Assignment strategy: round-robin. Active segments are sorted by hash range start; consumers are sorted by name. Segments are distributed evenly across consumers. When a consumer is added or removed, or when the layout changes (split/merge), the coordinator recomputes assignments and pushes updates to affected consumers.
Key operations:
addConsumer(consumer) — adds a consumer, rebalances, and returns the full assignment map.removeConsumer(consumer) — removes a consumer and redistributes its segments.onLayoutChange(newLayout) — called after a split or merge; recomputes all assignments against the new set of active segments.ConsumerRegistration is a record identifying a connected consumer:
| Field | Type | Description |
|---|---|---|
consumerId | long | Protocol-level consumer ID |
consumerName | String | Stable name chosen by the client |
cnx | TransportCnx | The connection to push assignment updates |
ConsumerAssignment is the assignment sent to a consumer:
| Field | Type | Description |
|---|---|---|
layoutEpoch | long | The layout epoch this assignment is based on |
assignedSegments | List<AssignedSegment> | The segments assigned to this consumer |
Each AssignedSegment contains: segmentId, hashRange, and underlyingTopicName (the segment:// topic name the consumer should connect to).
Scalable topic metadata is stored in the metadata store under a well-known path structure:
/topics/{tenant}/{namespace}/{encodedTopicName}
The value at this path is a JSON-serialized ScalableTopicMetadata:
{
"epoch": 3,
"nextSegmentId": 5,
"segments": {
"0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, "state": "SEALED", "parentIds": [], "childIds": [1, 2], "createdAtEpoch": 0, "sealedAtEpoch": 1 },
"1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, "state": "SEALED", "parentIds": [0], "childIds": [3, 4], "createdAtEpoch": 1, "sealedAtEpoch": 3 },
"2": { "segmentId": 2, "hashRange": {"start": 32768, "end": 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [], "createdAtEpoch": 1, "sealedAtEpoch": 0 },
"3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3, "sealedAtEpoch": 0 },
"4": { "segmentId": 4, "hashRange": {"start": 16384, "end": 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3, "sealedAtEpoch": 0 }
},
"properties": {}
}
The controller leader lock is stored at:
/topics/{tenant}/{namespace}/{encodedTopicName}/controller
The value is the broker service URL of the elected leader.
As described in Broker Roles, any broker can serve a DAG watch session because the segment metadata lives in the shared metadata store — no controller leader involvement is required. The session is established via the binary protocol:
CommandScalableTopicLookup with a client-assigned sessionId and the topic:// topic name.DagWatchSession that:
ScalableTopicMetadata from the store.segment:// topic (via topic lookup).CommandScalableTopicUpdate with the initial ScalableTopicDAG containing the full segment DAG, broker addresses, and controller URL.CommandScalableTopicUpdate to the client.CommandScalableTopicClose to tear down the session.The ScalableTopicDAG protocol message contains:
| Field | Type | Description |
|---|---|---|
epoch | uint64 | Layout epoch |
segments | repeated SegmentInfoProto | Full segment DAG |
segment_brokers | repeated SegmentBrokerAddress | Broker URL for each active segment |
controller_broker_url | string | Controller leader's broker URL |
controller_broker_url_tls | string | TLS variant |
Splitting a segment is the core scaling-out operation. The protocol is carefully ordered to guarantee that no messages are lost and all subscription cursors exist before producers are redirected.
Controller Metadata Store Broker(s)
│ │ │
1. Discover │──── getSubscriptions ───────>│ │
subscriptions│<─── [sub-A, sub-B] ─────────│ │
│ │ │
2. Create │──── createSegment(child1, ──>│ ─── route ────────> │
children │ [sub-A, sub-B]) │ │ create topic
│──── createSegment(child2, ──>│ ─── route ────────> │ + cursors
│ [sub-A, sub-B]) │ │
│ │ │
3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
parent │ (parent) │ │ seal ML
│ │ │
4. Update │──── CAS update ─────────────>│ │
metadata │ (atomic) │ │
│ │ │
5. Notify │──── push to consumers ──────────────────────────> │
consumers │ (rebalance) │ │
Step 1: Discover subscriptions. The controller queries the parent segment topic for its list of subscriptions. It first checks locally (if the segment is on this broker), falling back to the admin API for remote segments.
Step 2: Create child segment topics. Two new segment:// topics are created via the Segments admin API. Each creation request includes the list of subscriptions discovered in step 1. The Segments API handler creates the persistent topic and initializes subscription cursors at the Earliest position. The admin API routes to whichever broker owns the namespace bundle for each segment, so child segments may be created on different brokers.
Step 3: Terminate parent segment. The parent segment topic is terminated via the Segments admin API, which writes a termination marker to the managed ledger. After termination, producers writing to the parent receive TopicTerminated and must re-route to child segments.
Step 4: Atomic metadata update. The controller issues a compare-and-swap (CAS) update to the metadata store. The update transitions the parent to SEALED state with child pointers, and adds the two new ACTIVE child segments. The CAS guarantees atomicity — if another operation modified the metadata concurrently, the update fails and must be retried.
Step 5: Notify consumers. All SubscriptionCoordinator instances are notified of the layout change. They recompute segment-to-consumer assignments and push updates to connected consumers.
Earliest: this is safe because the child topics are brand new (empty). The cursor starts at the beginning, and the first message written by a redirected producer will be the first message consumed.Merging two adjacent segments is the core scaling-in operation. The protocol follows the same ordering principle: create first, update metadata last.
Step 1: Discover subscriptions. The controller queries both parent segments for their subscriptions and takes the union, so no subscription is lost.
Step 2: Create merged segment topic. A single new segment:// topic is created with all discovered subscriptions.
Step 3: Terminate both parent segments. Both parents are terminated via the admin API. This is done sequentially to avoid a race where producers of one parent are still writing while the other is already sealed.
Step 4: Atomic metadata update. The CAS update seals both parents with child pointers to the merged segment, and adds the new ACTIVE merged segment.
Step 5: Notify consumers. Same as split — rebalance and push.
Unlike splits, merges inherently involve multiple brokers because the two parent segments may be owned by different brokers (assigned by the load manager). The controller leader handles this transparently: all segment operations (create, terminate, delete) go through the Segments admin API, which routes each request to the broker that currently owns the target segment's namespace bundle. The controller does not need to know or care which broker owns which segment — the admin API routing handles it.
Segment topics are managed exclusively through a dedicated REST API at /admin/v2/segments. This is a deliberate design choice: segment topics use the segment:// domain and must not be confused with regular persistent:// topics.
| Method | Path | Description |
|---|---|---|
PUT | /{tenant}/{namespace}/{topic}/{descriptor} | Create a segment topic. Optional request body with subscription names to pre-create. |
POST | /{tenant}/{namespace}/{topic}/{descriptor}/terminate | Terminate (seal) a segment topic. |
DELETE | /{tenant}/{namespace}/{topic}/{descriptor} | Delete a segment topic. |
The {descriptor} is the segment's hash range and ID in the format {hexStart}-{hexEnd}-{segmentId} (e.g., 0000-7fff-1).
When creating a segment topic, the handler:
segment:// topic name from the path components.BrokerService.getOrCreateTopic().Earliest position.Terminates the managed ledger backing the segment topic. After termination, the topic accepts no further writes, and producers receive TopicTerminated.
Deletes the segment topic and its managed ledger. Used during scalable topic deletion to clean up all underlying storage.
Consumer assignment behavior depends on the consumer type:
ACTIVE and SEALED — and each segment-owning broker independently performs round-robin delivery across the queue consumers attached to that segment. New segments produced by a split are picked up transparently via the DAG watch session push; sealed segments keep serving messages until their backlog drains and are then dropped from the set. The controller leader is not in the data path for queue consumers and no SubscriptionCoordinator is involved for them.StreamConsumer connects to the controller broker (discovered via the DAG watch session's controller_broker_url).subscription name and consumerName.registerConsumer() method routes to the appropriate SubscriptionCoordinator (created on first use for that subscription).ConsumerAssignment — a list of (segmentId, hashRange, segmentTopicName) tuples.segment:// topic and begins consuming.Rebalancing occurs when:
The rebalancing algorithm is round-robin:
ConsumerAssignment.A consumer's registration and its segment assignment are treated as a persistent session, not as a transient association tied to a TCP connection. This is a deliberate departure from the existing Pulsar consumer model, where consumer liveness is asserted purely by the TCP connection to the broker.
Rationale. Scalable topic consumer assignments have non-trivial cost: when a consumer disappears, its segments are redistributed among the remaining consumers, each of which may need to reconnect to different segment brokers and (for ordered consumers) drain in-flight messages before the new assignment takes effect. Treating a brief disconnection as a full consumer loss would cause unnecessary rebalancing in common scenarios such as a consumer process restart or a broker restart.
What is persisted vs. in-memory.
The session itself is persisted — the keep-alive state is not.
ConsumerRegistration — consumer name and its current segment assignment — keyed by consumerName under the subscription path. Once a consumer successfully registers, this entry is durable and outlives TCP disconnects, client restarts, and controller leader failovers. The assignment survives failover without forcing consumers to re-register.Session semantics (steady state).
SubscriptionCoordinator writes its ConsumerRegistration and marks it connected in-memory.consumerName reconnects within the grace period, the timer is cancelled and the consumer resumes with the same persisted assignment — no rebalance, no other consumer disturbed.ConsumerRegistration from the metadata store and triggers a rebalance for the remaining consumers.Behavior on controller leader failover.
Because liveness timers are in-memory only, they are lost when the leader broker crashes. The new leader:
ConsumerRegistration entries for each subscription from the metadata store, restoring the prior segment assignment.This means a leader failover always gives clients a full grace period to reconnect, regardless of how long they had already been disconnected under the old leader. The trade-off is explicit: we keep the metadata store writes proportional to real membership changes (register / unregister / rebalance) rather than to liveness ticks.
Covered scenarios.
consumerName well within the grace period — the same segments are reassigned, no redistribution.The grace period and related tunables are configurable per broker and will be specified in a follow-up sub-PIP.
When a split or merge completes and the metadata is updated:
SegmentLayout.notifySubscriptions() is called, which iterates all SubscriptionCoordinator instances.onLayoutChange(newLayout), which recomputes assignments against the new active segments and pushes updates to affected consumers.Each ScalableTopicController participates in leader election via the metadata store's LeaderElection API. The election path is:
/topics/{tenant}/{namespace}/{encodedTopicName}/controller
The elected leader writes its broker service URL as the election value. Clients discover the controller by reading this value (delivered as part of the DAG watch session response).
When the leader broker fails:
NoLeader state change notification.ScalableTopicService.onLeaderStateChange() handler re-invokes controller.initialize(), which reloads metadata and re-attempts election.A broker that loses leadership or never wins it retains the controller instance but all mutating operations (splitSegment, mergeSegments, registerConsumer) throw IllegalStateException after a checkLeader() guard. The controller remains available for read operations like getLayout().
Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
└─> ScalableTopicService.createScalableTopic(topic, numInitialSegments)
├─> ScalableTopicController.createInitialMetadata(numInitialSegments)
│ └─> Divides [0x0000, 0xFFFF] into N equal ranges
│ Creates N SegmentInfo records (ACTIVE, no parents)
│ Returns ScalableTopicMetadata with epoch=0
├─> ScalableTopicResources.createScalableTopicAsync(topic, metadata)
│ └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
└─> For each segment: createSegmentAsync via Segments admin API
└─> Creates segment:// topic on owning broker
Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
└─> ScalableTopicService.deleteScalableTopic(topic)
├─> releaseController(topic)
│ └─> Closes leader election, removes from controllers map
├─> Load metadata from store
├─> For each segment: deleteSegmentAsync via Segments admin API
│ └─> Deletes segment:// topic on owning broker (best-effort)
└─> ScalableTopicResources.deleteScalableTopicAsync(topic)
└─> Removes metadata from store
Three new protocol commands (added to PulsarApi.proto):
| Command | Direction | Field ID | Description |
|---|---|---|---|
CommandScalableTopicLookup | Client → Broker | 70 | Initiates a DAG watch session |
CommandScalableTopicUpdate | Broker → Client | 71 | Initial response and subsequent pushed updates |
CommandScalableTopicClose | Client → Broker | 72 | Closes the DAG watch session |
New protocol messages:
ScalableTopicDAG — the full segment DAG with broker addresses and controller URL.SegmentInfoProto — per-segment record in the DAG.SegmentBrokerAddress — maps a segment ID to its owning broker.SegmentState enum — ACTIVE or SEALED.Scalable Topics (/admin/v2/scalable):
| Method | Path | Description |
|---|---|---|
GET | /{tenant}/{namespace} | List scalable topics |
PUT | /{tenant}/{namespace}/{topic} | Create scalable topic |
GET | /{tenant}/{namespace}/{topic} | Get topic metadata |
DELETE | /{tenant}/{namespace}/{topic} | Delete scalable topic |
GET | /{tenant}/{namespace}/{topic}/stats | Get aggregated stats for the scalable topic (segment counts, per-segment rates, per-subscription state, consumer counts) |
PUT | /{tenant}/{namespace}/{topic}/subscriptions/{subscription} | Create a subscription on the scalable topic. The controller propagates it to all active segments by issuing createSubscription on each segment:// topic via the Segments admin API. |
DELETE | /{tenant}/{namespace}/{topic}/subscriptions/{subscription} | Delete a subscription. The controller unregisters all consumers and deletes the subscription from every segment. |
POST | /{tenant}/{namespace}/{topic}/split/{segmentId} | Split a segment |
POST | /{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2} | Merge two segments |
Segments (/admin/v2/segments):
| Method | Path | Description |
|---|---|---|
PUT | /{tenant}/{namespace}/{topic}/{descriptor} | Create segment topic |
POST | /{tenant}/{namespace}/{topic}/{descriptor}/terminate | Terminate segment |
DELETE | /{tenant}/{namespace}/{topic}/{descriptor} | Delete segment topic |
New ScalableTopics interface on PulsarAdmin:
listScalableTopics(namespace) / listScalableTopicsAsync(namespace)createScalableTopic(topic, numSegments) / createScalableTopicAsync(...)getMetadata(topic) / getMetadataAsync(topic)getStats(topic) / getStatsAsync(topic)deleteScalableTopic(topic) / deleteScalableTopicAsync(topic)createSubscription(topic, subscription) / createSubscriptionAsync(...)deleteSubscription(topic, subscription) / deleteSubscriptionAsync(...)splitSegment(topic, segmentId) / splitSegmentAsync(...)mergeSegments(topic, segmentId1, segmentId2) / mergeSegmentsAsync(...)createSegment(segmentTopic, subscriptions) / createSegmentAsync(...)terminateSegment(segmentTopic) / terminateSegmentAsync(...)deleteSegment(segmentTopic, force) / deleteSegmentAsync(...)| Property | Default | Description |
|---|---|---|
scalableTopicEnabled | true | Enable scalable topic support on the broker |
Additional configuration for auto-split thresholds and consumer controller session grace period will be specified in follow-up sub-PIPs.
| Path | Content |
|---|---|
/topics/{tenant}/{namespace}/{topic} | ScalableTopicMetadata JSON — segment DAG and global topic state |
/topics/{tenant}/{namespace}/{topic}/controller | Leader broker URL (ephemeral) |
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription} | SubscriptionMetadata — subscription-level config and the persisted set of consumer registrations |
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName} | ConsumerRegistration — the durable session: consumer name and its current segment assignment. Keep-alive state (connected/disconnected, grace-period timer) is in-memory on the controller leader only and is not persisted here. |
The controller is a new component with no interaction with existing partitioned or non-partitioned topics. Enabling scalableTopicEnabled activates the new code paths. Existing topics are unaffected.
Scalable topic metadata and segment data remain in the metadata store and BookKeeper. Rolling back to a version without scalable topic support leaves this data intact but inaccessible. Upgrading again restores access.
The existing Pulsar client SDK rejects topic:// and segment:// domains with a clear error message directing users to the V5 client SDK. This check is enforced in PulsarClientImpl for producer, consumer, and reader creation paths.
The controller follows the same tenant/namespace/topic authorization model as existing Pulsar topics:
The controller leader lock in the metadata store is accessible only to authenticated brokers.