Back to Pulsar

PIP-468: Scalable Topic Controller

pip/pip-468.md

5.0.0-M139.9 KB
Original Source

PIP-468: Scalable Topic Controller

Sub-PIP of PIP-460: Scalable Topics

Motivation

PIP-460 introduces scalable topics as a new topic type in Pulsar, built on a DAG of range segments that can be split and merged to scale independently of the initial configuration. PIP-460 defines the overall vision but defers the detailed design of each component to dedicated sub-PIPs.

This PIP specifies the Scalable Topic Controller — the broker-side component responsible for:

  1. Segment lifecycle management — creating, terminating, and deleting the underlying segment topics that back a scalable topic.
  2. Segment layout coordination — executing split and merge operations with correct ordering guarantees so that no messages are lost and all subscription cursors exist before producers are redirected.
  3. Consumer assignment — coordinating which consumers receive messages from which segments for a given subscription.
  4. Leader election — ensuring exactly one broker acts as the controller for each scalable topic, with automatic failover.
  5. DAG watch sessions — pushing topology updates to connected clients (producers and consumers) when the segment layout changes.

Without a well-defined controller, split and merge operations would be unsafe: producers could write to segments that have no subscription cursors, consumers could miss messages during topology changes, and concurrent layout modifications could corrupt the segment DAG.


Design

Architecture Overview

The controller subsystem consists of four layers:

┌──────────────────────────────────────────────────────────┐
│                    Admin API Layer                        │
│         ScalableTopics REST  +  Segments REST             │
└──────────────┬───────────────────────┬───────────────────┘
               │                       │
┌──────────────▼───────────────────────▼───────────────────┐
│                  ScalableTopicService                     │
│         Per-broker singleton; manages controllers         │
└──────────────────────────┬───────────────────────────────┘
                           │
┌──────────────────────────▼───────────────────────────────┐
│              ScalableTopicController (per topic)          │
│   Leader-elected; owns layout, split/merge, consumers     │
│                                                           │
│   ┌─────────────────┐  ┌───────────────────────────────┐ │
│   │  SegmentLayout   │  │  SubscriptionCoordinator (×N) │ │
│   │  (immutable DAG) │  │  per-subscription assignments │ │
│   └─────────────────┘  └───────────────────────────────┘ │
└──────────────────────────┬───────────────────────────────┘
                           │
┌──────────────────────────▼───────────────────────────────┐
│              ScalableTopicResources                       │
│         Metadata store access (read/write/watch)          │
└──────────────────────────────────────────────────────────┘

Broker Roles and Request Routing

A scalable topic involves three distinct broker roles. Understanding which broker handles which operation is critical to the design.

Any broker — DAG watch sessions

A DAG watch session (scalable topic lookup) can be served by any broker in the cluster. The session reads segment metadata from the shared metadata store and registers a watch for changes. No state is held on the broker beyond the watch registration. This means producers and consumers can connect to any broker for topic discovery — the same way regular topic lookups work today.

When the metadata changes (due to a split or merge performed by the controller leader), the metadata store notification fires on whichever broker is serving the watch session, and that broker pushes the updated DAG to the client.

Controller leader — layout mutations and consumer assignment

Each scalable topic has exactly one controller leader across the cluster, elected via the metadata store. The controller leader is the only broker that can:

  • Execute split and merge operations (the multi-step protocols described below).
  • Accept consumer registrations and compute segment-to-consumer assignments.
  • Notify consumers of assignment changes after topology updates.

Clients discover the controller leader's broker URL through the DAG watch session response (controller_broker_url field). StreamConsumer and CheckpointConsumer clients then connect directly to the controller leader to register and receive assignments.

Admin API requests for split and merge are routed to the ScalableTopicService on any broker, which obtains (or creates) a local ScalableTopicController that participates in leader election. If the local controller is the leader, it executes the operation directly. If not, the request must be redirected to the leader (or the controller must first win the election). The split and merge admin endpoints do not require the request to land on the leader broker — the ScalableTopicService handles this transparently.

Segment-owning brokers — produce and consume

Individual segment topics (segment://) are regular persistent topics from the broker's perspective. They are distributed across the cluster by the standard Pulsar load manager and namespace bundle assignment — the same mechanism used for persistent:// topics. Each segment topic is owned by exactly one broker at any time, determined by its namespace bundle hash.

The controller leader does not need to own any of the segment topics. When the controller needs to operate on a segment (create, terminate, delete, discover subscriptions), it uses the Segments admin API, which routes the request to whichever broker currently owns that segment's namespace bundle. This decoupling means the controller can coordinate segments spread across many brokers without requiring co-location.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Broker A    │     │  Broker B    │     │  Broker C    │
│              │     │  (controller │     │              │
│  segment-0   │     │   leader)    │     │  segment-2   │
│  segment-1   │     │              │     │              │
│              │     │  DAG watch   │     │  DAG watch   │
│  DAG watch   │     │  sessions    │     │  sessions    │
│  sessions    │     │              │     │              │
└─────────────┘     └─────────────┘     └─────────────┘
                           │
                    Split/merge ops
                    use admin API to
                    reach segments on
                    Broker A and C

In this example, Broker B is the controller leader but owns no segments. When it executes a split of segment-0, it calls the Segments admin API which routes to Broker A (the owner of segment-0).

Components

ScalableTopicService

A per-broker singleton, created and owned by BrokerService. It manages the lifecycle of all ScalableTopicController instances on the broker.

Responsibilities:

  • Controller management: maintains a map of active controllers keyed by topic name. Creates controllers on demand via getOrCreateController() and releases them on topic unload or broker shutdown.
  • Topic lifecycle: handles createScalableTopic() and deleteScalableTopic() admin operations, including creating/deleting the underlying segment topics via the Segments admin API.
  • Delegation: routes splitSegment() and mergeSegments() requests to the appropriate controller.
  • Leader state monitoring: listens for leader election state changes and re-attempts election when the current leader is lost.

ScalableTopicController

A per-topic coordinator. Only one instance across the cluster is the leader for a given scalable topic, ensured by leader election through the metadata store. The leader is the only instance that modifies the segment layout or assigns consumers.

State:

FieldTypeDescription
topicNameTopicNameThe topic:// name of the scalable topic
currentLayoutSegmentLayoutThe current immutable snapshot of the segment DAG
subscriptionsMap<String, SubscriptionCoordinator>Per-subscription consumer coordinators
leaderStateLeaderElectionStateCurrent leader election state
leaderElectionLeaderElection<String>Metadata store leader election handle; value is the broker URL

Key operations:

  • initialize() — loads the current metadata from the store, then attempts leader election. The elected leader stores its broker service URL so that clients can discover and connect to it.
  • splitSegment(segmentId) — splits an active segment at its midpoint (see Split Protocol).
  • mergeSegments(segmentId1, segmentId2) — merges two adjacent active segments (see Merge Protocol).
  • registerConsumer(subscription, consumer) — registers a consumer and returns its initial segment assignment.
  • unregisterConsumer(subscription, consumer) — removes a consumer and triggers rebalancing.
  • getLeaderBrokerUrl() — returns the current leader's broker URL, used by clients to connect to the controller.

SegmentLayout

An immutable, versioned snapshot of the segment DAG. All mutation methods (splitSegment, mergeSegments, pruneSegment) return a new SegmentLayout instance — the original is never modified. This ensures safe concurrent reads without locking.

Fields:

FieldTypeDescription
epochlongMonotonically increasing version; incremented on every layout change
nextSegmentIdlongCounter for assigning IDs to new segments
allSegmentsMap<Long, SegmentInfo>Complete DAG: all segments (active + sealed)
activeSegmentsMap<Long, SegmentInfo>Filtered view: only ACTIVE segments

Key operations:

  • findActiveSegment(hash) — returns the active segment whose hash range contains the given hash value. Used by producers for message routing.
  • splitSegment(segmentId) — validates the segment is active, computes the midpoint of its hash range, creates two child SegmentInfo records covering [start, mid] and [mid+1, end], seals the parent, and returns a new layout with incremented epoch.
  • mergeSegments(id1, id2) — validates both segments are active and adjacent (the end of one equals start - 1 of the other), creates a merged SegmentInfo covering the combined range, seals both parents, and returns a new layout.
  • getLineage(segmentId) — returns the full ancestor + descendant chain for a segment, used for DAG traversal during catch-up reads.
  • toMetadata() / fromMetadata() — converts to/from the persisted ScalableTopicMetadata format.

SegmentInfo

A record representing a single segment in the DAG:

FieldTypeDescription
segmentIdlongUnique, monotonically increasing identifier
hashRangeHashRangeThe [start, end] inclusive hash range this segment covers
stateSegmentStateACTIVE or SEALED
parentIdsList<Long>Parent segments (empty for root segments)
childIdsList<Long>Child segments (empty for leaf/active segments)
createdAtEpochlongLayout epoch in which this segment was created (0 for the initial segments)
sealedAtEpochlongLayout epoch in which this segment was sealed; only meaningful when state == SEALED. 0 when the segment is ACTIVE; this sentinel is unambiguous because sealing always happens as part of a split or merge, which increments the epoch — so a segment can never be sealed at epoch 0.

Factory methods: SegmentInfo.active(id, range, epoch) and SegmentInfo.sealed(...).

SubscriptionCoordinator

Manages segment-to-consumer assignments for a single subscription within a scalable topic.

Assignment strategy: round-robin. Active segments are sorted by hash range start; consumers are sorted by name. Segments are distributed evenly across consumers. When a consumer is added or removed, or when the layout changes (split/merge), the coordinator recomputes assignments and pushes updates to affected consumers.

Key operations:

  • addConsumer(consumer) — adds a consumer, rebalances, and returns the full assignment map.
  • removeConsumer(consumer) — removes a consumer and redistributes its segments.
  • onLayoutChange(newLayout) — called after a split or merge; recomputes all assignments against the new set of active segments.

ConsumerRegistration and ConsumerAssignment

ConsumerRegistration is a record identifying a connected consumer:

FieldTypeDescription
consumerIdlongProtocol-level consumer ID
consumerNameStringStable name chosen by the client
cnxTransportCnxThe connection to push assignment updates

ConsumerAssignment is the assignment sent to a consumer:

FieldTypeDescription
layoutEpochlongThe layout epoch this assignment is based on
assignedSegmentsList<AssignedSegment>The segments assigned to this consumer

Each AssignedSegment contains: segmentId, hashRange, and underlyingTopicName (the segment:// topic name the consumer should connect to).

Metadata Store Schema

Scalable topic metadata is stored in the metadata store under a well-known path structure:

/topics/{tenant}/{namespace}/{encodedTopicName}

The value at this path is a JSON-serialized ScalableTopicMetadata:

json
{
  "epoch": 3,
  "nextSegmentId": 5,
  "segments": {
    "0": { "segmentId": 0, "hashRange": {"start": 0, "end": 65535}, "state": "SEALED", "parentIds": [], "childIds": [1, 2], "createdAtEpoch": 0, "sealedAtEpoch": 1 },
    "1": { "segmentId": 1, "hashRange": {"start": 0, "end": 32767}, "state": "SEALED", "parentIds": [0], "childIds": [3, 4], "createdAtEpoch": 1, "sealedAtEpoch": 3 },
    "2": { "segmentId": 2, "hashRange": {"start": 32768, "end": 65535}, "state": "ACTIVE", "parentIds": [0], "childIds": [], "createdAtEpoch": 1, "sealedAtEpoch": 0 },
    "3": { "segmentId": 3, "hashRange": {"start": 0, "end": 16383}, "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3, "sealedAtEpoch": 0 },
    "4": { "segmentId": 4, "hashRange": {"start": 16384, "end": 32767}, "state": "ACTIVE", "parentIds": [1], "childIds": [], "createdAtEpoch": 3, "sealedAtEpoch": 0 }
  },
  "properties": {}
}

The controller leader lock is stored at:

/topics/{tenant}/{namespace}/{encodedTopicName}/controller

The value is the broker service URL of the elected leader.

DAG Watch Sessions

As described in Broker Roles, any broker can serve a DAG watch session because the segment metadata lives in the shared metadata store — no controller leader involvement is required. The session is established via the binary protocol:

  1. Client sends CommandScalableTopicLookup with a client-assigned sessionId and the topic:// topic name.
  2. Broker creates a DagWatchSession that:
    • Loads the current ScalableTopicMetadata from the store.
    • Resolves which broker owns each active segment's segment:// topic (via topic lookup).
    • Reads the controller broker URL from the leader lock path.
    • Registers a metadata store notification listener for changes to the topic's metadata path.
  3. Broker sends CommandScalableTopicUpdate with the initial ScalableTopicDAG containing the full segment DAG, broker addresses, and controller URL.
  4. On metadata change, the notification listener fires. The broker reloads metadata, re-resolves broker addresses, and pushes an updated CommandScalableTopicUpdate to the client.
  5. Client sends CommandScalableTopicClose to tear down the session.

The ScalableTopicDAG protocol message contains:

FieldTypeDescription
epochuint64Layout epoch
segmentsrepeated SegmentInfoProtoFull segment DAG
segment_brokersrepeated SegmentBrokerAddressBroker URL for each active segment
controller_broker_urlstringController leader's broker URL
controller_broker_url_tlsstringTLS variant

Split Protocol

Splitting a segment is the core scaling-out operation. The protocol is carefully ordered to guarantee that no messages are lost and all subscription cursors exist before producers are redirected.

Step-by-step

           Controller                   Metadata Store          Broker(s)
               │                             │                     │
  1. Discover  │──── getSubscriptions ───────>│                     │
  subscriptions│<─── [sub-A, sub-B] ─────────│                     │
               │                             │                     │
  2. Create    │──── createSegment(child1, ──>│ ─── route ────────> │
  children     │     [sub-A, sub-B])          │                     │ create topic
               │──── createSegment(child2, ──>│ ─── route ────────> │ + cursors
               │     [sub-A, sub-B])          │                     │
               │                             │                     │
  3. Terminate │──── terminateSegment ───────>│ ─── route ────────> │
  parent       │     (parent)                 │                     │ seal ML
               │                             │                     │
  4. Update    │──── CAS update ─────────────>│                     │
  metadata     │     (atomic)                 │                     │
               │                             │                     │
  5. Notify    │──── push to consumers ──────────────────────────> │
  consumers    │     (rebalance)              │                     │

Step 1: Discover subscriptions. The controller queries the parent segment topic for its list of subscriptions. It first checks locally (if the segment is on this broker), falling back to the admin API for remote segments.

Step 2: Create child segment topics. Two new segment:// topics are created via the Segments admin API. Each creation request includes the list of subscriptions discovered in step 1. The Segments API handler creates the persistent topic and initializes subscription cursors at the Earliest position. The admin API routes to whichever broker owns the namespace bundle for each segment, so child segments may be created on different brokers.

Step 3: Terminate parent segment. The parent segment topic is terminated via the Segments admin API, which writes a termination marker to the managed ledger. After termination, producers writing to the parent receive TopicTerminated and must re-route to child segments.

Step 4: Atomic metadata update. The controller issues a compare-and-swap (CAS) update to the metadata store. The update transitions the parent to SEALED state with child pointers, and adds the two new ACTIVE child segments. The CAS guarantees atomicity — if another operation modified the metadata concurrently, the update fails and must be retried.

Step 5: Notify consumers. All SubscriptionCoordinator instances are notified of the layout change. They recompute segment-to-consumer assignments and push updates to connected consumers.

Why this ordering matters

  • Steps 1-2 before step 4: if we updated metadata first, clients would discover the new segments immediately. Producers would start writing to child segments that have no subscription cursors yet, causing those messages to be missed by consumers.
  • Step 3 before step 4: terminating the parent before the metadata update ensures that by the time clients see the new layout, the parent is already sealed. There is no window where both parent and children are writable.
  • Step 2 creates cursors at Earliest: this is safe because the child topics are brand new (empty). The cursor starts at the beginning, and the first message written by a redirected producer will be the first message consumed.

Merge Protocol

Merging two adjacent segments is the core scaling-in operation. The protocol follows the same ordering principle: create first, update metadata last.

Step-by-step

Step 1: Discover subscriptions. The controller queries both parent segments for their subscriptions and takes the union, so no subscription is lost.

Step 2: Create merged segment topic. A single new segment:// topic is created with all discovered subscriptions.

Step 3: Terminate both parent segments. Both parents are terminated via the admin API. This is done sequentially to avoid a race where producers of one parent are still writing while the other is already sealed.

Step 4: Atomic metadata update. The CAS update seals both parents with child pointers to the merged segment, and adds the new ACTIVE merged segment.

Step 5: Notify consumers. Same as split — rebalance and push.

Cross-broker coordination

Unlike splits, merges inherently involve multiple brokers because the two parent segments may be owned by different brokers (assigned by the load manager). The controller leader handles this transparently: all segment operations (create, terminate, delete) go through the Segments admin API, which routes each request to the broker that currently owns the target segment's namespace bundle. The controller does not need to know or care which broker owns which segment — the admin API routing handles it.


Segment Topic Management via Admin API

Segment topics are managed exclusively through a dedicated REST API at /admin/v2/segments. This is a deliberate design choice: segment topics use the segment:// domain and must not be confused with regular persistent:// topics.

Endpoints

MethodPathDescription
PUT/{tenant}/{namespace}/{topic}/{descriptor}Create a segment topic. Optional request body with subscription names to pre-create.
POST/{tenant}/{namespace}/{topic}/{descriptor}/terminateTerminate (seal) a segment topic.
DELETE/{tenant}/{namespace}/{topic}/{descriptor}Delete a segment topic.

The {descriptor} is the segment's hash range and ID in the format {hexStart}-{hexEnd}-{segmentId} (e.g., 0000-7fff-1).

Create Segment

When creating a segment topic, the handler:

  1. Constructs the full segment:// topic name from the path components.
  2. Validates namespace bundle ownership (the request is routed to the owning broker).
  3. Creates the persistent topic via BrokerService.getOrCreateTopic().
  4. For each subscription in the request body, creates a cursor at the Earliest position.

Terminate Segment

Terminates the managed ledger backing the segment topic. After termination, the topic accepts no further writes, and producers receive TopicTerminated.

Delete Segment

Deletes the segment topic and its managed ledger. Used during scalable topic deletion to clean up all underlying storage.


Consumer Assignment

Consumer assignment behavior depends on the consumer type:

  • StreamConsumer (ordered, cumulative ack) — requires coordinated segment-to-consumer assignment from the controller leader. Each active segment is owned by exactly one consumer at any time to preserve per-segment ordering. This is the subject of the rest of this section.
  • CheckpointConsumer (unmanaged, for connectors) — same as StreamConsumer from a controller perspective: it registers with the controller leader and receives an explicit segment assignment.
  • QueueConsumer (unordered, individual ack) — does not require any controller-side assignment. Because there is no ordering requirement, every queue consumer attaches directly to all segments of the scalable topic — both ACTIVE and SEALED — and each segment-owning broker independently performs round-robin delivery across the queue consumers attached to that segment. New segments produced by a split are picked up transparently via the DAG watch session push; sealed segments keep serving messages until their backlog drains and are then dropped from the set. The controller leader is not in the data path for queue consumers and no SubscriptionCoordinator is involved for them.

Registration Flow (StreamConsumer / CheckpointConsumer)

  1. A StreamConsumer connects to the controller broker (discovered via the DAG watch session's controller_broker_url).
  2. The consumer sends a registration request with its subscription name and consumerName.
  3. The controller's registerConsumer() method routes to the appropriate SubscriptionCoordinator (created on first use for that subscription).
  4. The coordinator adds the consumer, recomputes assignments, and returns the consumer's ConsumerAssignment — a list of (segmentId, hashRange, segmentTopicName) tuples.
  5. The consumer connects to each assigned segment:// topic and begins consuming.

Rebalancing

Rebalancing occurs when:

  • A consumer is added or removed.
  • A split or merge changes the set of active segments.

The rebalancing algorithm is round-robin:

  1. Collect all active segments, sorted by hash range start.
  2. Collect all consumers, sorted by name (for deterministic ordering).
  3. Assign segments to consumers in round-robin order.
  4. For each consumer whose assignment changed, push the new ConsumerAssignment.

Consumer Session Lifecycle

A consumer's registration and its segment assignment are treated as a persistent session, not as a transient association tied to a TCP connection. This is a deliberate departure from the existing Pulsar consumer model, where consumer liveness is asserted purely by the TCP connection to the broker.

Rationale. Scalable topic consumer assignments have non-trivial cost: when a consumer disappears, its segments are redistributed among the remaining consumers, each of which may need to reconnect to different segment brokers and (for ordered consumers) drain in-flight messages before the new assignment takes effect. Treating a brief disconnection as a full consumer loss would cause unnecessary rebalancing in common scenarios such as a consumer process restart or a broker restart.

What is persisted vs. in-memory.

The session itself is persisted — the keep-alive state is not.

  • Persisted in the metadata store (the session): the ConsumerRegistration — consumer name and its current segment assignment — keyed by consumerName under the subscription path. Once a consumer successfully registers, this entry is durable and outlives TCP disconnects, client restarts, and controller leader failovers. The assignment survives failover without forcing consumers to re-register.
  • In-memory on the controller leader (the keep-alive): whether each consumer is currently connected, and the grace-period timer that runs while it is disconnected. Keep-alive signals are not written to the metadata store; the leader observes the consumer's connection state directly and tracks the timer in RAM. This avoids a metadata store write on every liveness tick.

Session semantics (steady state).

  • When a consumer registers, the SubscriptionCoordinator writes its ConsumerRegistration and marks it connected in-memory.
  • If the consumer's connection drops, the leader does not immediately remove the consumer. It transitions the in-memory state to "disconnected" and starts a configurable session grace period timer for that consumer.
  • If the same consumerName reconnects within the grace period, the timer is cancelled and the consumer resumes with the same persisted assignment — no rebalance, no other consumer disturbed.
  • If the grace period expires, the leader deletes the ConsumerRegistration from the metadata store and triggers a rebalance for the remaining consumers.

Behavior on controller leader failover.

Because liveness timers are in-memory only, they are lost when the leader broker crashes. The new leader:

  1. Loads all ConsumerRegistration entries for each subscription from the metadata store, restoring the prior segment assignment.
  2. Treats every consumer as "just disconnected" and starts a fresh grace-period timer for each. No timestamps carry across from the previous leader.
  3. Consumers reconnecting to the new leader within the fresh grace period resume with the same assignment — seamless from their perspective.
  4. Consumers that fail to reconnect before the fresh timer expires are evicted and their segments are redistributed.

This means a leader failover always gives clients a full grace period to reconnect, regardless of how long they had already been disconnected under the old leader. The trade-off is explicit: we keep the metadata store writes proportional to real membership changes (register / unregister / rebalance) rather than to liveness ticks.

Covered scenarios.

  • Consumer process restart. A client process restarts and reconnects with the same consumerName well within the grace period — the same segments are reassigned, no redistribution.
  • Controller leader restart. The leader is re-elected on a different broker. Consumer entries and assignments are restored from the metadata store; the new leader starts a fresh grace-period timer for every consumer so reconnects are transparent.
  • Segment-owning broker restart. Segment topics are reassigned by the standard Pulsar load manager; the consumer's segment-to-consumer assignment is unaffected and reconnects to the new owning broker are transparent at the consumer-assignment level.

The grace period and related tunables are configurable per broker and will be specified in a follow-up sub-PIP.

Layout Change Propagation

When a split or merge completes and the metadata is updated:

  1. The controller reloads the metadata and creates a new SegmentLayout.
  2. notifySubscriptions() is called, which iterates all SubscriptionCoordinator instances.
  3. Each coordinator calls onLayoutChange(newLayout), which recomputes assignments against the new active segments and pushes updates to affected consumers.

Leader Election and Failover

Election

Each ScalableTopicController participates in leader election via the metadata store's LeaderElection API. The election path is:

/topics/{tenant}/{namespace}/{encodedTopicName}/controller

The elected leader writes its broker service URL as the election value. Clients discover the controller by reading this value (delivered as part of the DAG watch session response).

Failover

When the leader broker fails:

  1. The metadata store detects the session loss and clears the leader lock.
  2. Other brokers with active controllers for the same topic receive a NoLeader state change notification.
  3. The ScalableTopicService.onLeaderStateChange() handler re-invokes controller.initialize(), which reloads metadata and re-attempts election.
  4. The new leader restores in-memory state from the persisted metadata, including all subscriptions and their consumer registrations with current assignments. Consumers reconnecting within the session grace period find their sessions intact and resume with the same segment assignment without a rebalance (see Consumer Session Lifecycle).

Non-leader brokers

A broker that loses leadership or never wins it retains the controller instance but all mutating operations (splitSegment, mergeSegments, registerConsumer) throw IllegalStateException after a checkLeader() guard. The controller remains available for read operations like getLayout().


Scalable Topic Lifecycle

Creation

Admin API (PUT /admin/v2/scalable/{tenant}/{namespace}/{topic})
  └─> ScalableTopicService.createScalableTopic(topic, numInitialSegments)
        ├─> ScalableTopicController.createInitialMetadata(numInitialSegments)
        │     └─> Divides [0x0000, 0xFFFF] into N equal ranges
        │         Creates N SegmentInfo records (ACTIVE, no parents)
        │         Returns ScalableTopicMetadata with epoch=0
        ├─> ScalableTopicResources.createScalableTopicAsync(topic, metadata)
        │     └─> Writes metadata to store at /topics/{t}/{ns}/{topic}
        └─> For each segment: createSegmentAsync via Segments admin API
              └─> Creates segment:// topic on owning broker

Deletion

Admin API (DELETE /admin/v2/scalable/{tenant}/{namespace}/{topic})
  └─> ScalableTopicService.deleteScalableTopic(topic)
        ├─> releaseController(topic)
        │     └─> Closes leader election, removes from controllers map
        ├─> Load metadata from store
        ├─> For each segment: deleteSegmentAsync via Segments admin API
        │     └─> Deletes segment:// topic on owning broker (best-effort)
        └─> ScalableTopicResources.deleteScalableTopicAsync(topic)
              └─> Removes metadata from store

Public-Facing Changes

Binary Protocol

Three new protocol commands (added to PulsarApi.proto):

CommandDirectionField IDDescription
CommandScalableTopicLookupClient → Broker70Initiates a DAG watch session
CommandScalableTopicUpdateBroker → Client71Initial response and subsequent pushed updates
CommandScalableTopicCloseClient → Broker72Closes the DAG watch session

New protocol messages:

  • ScalableTopicDAG — the full segment DAG with broker addresses and controller URL.
  • SegmentInfoProto — per-segment record in the DAG.
  • SegmentBrokerAddress — maps a segment ID to its owning broker.
  • SegmentState enum — ACTIVE or SEALED.

Admin API

Scalable Topics (/admin/v2/scalable):

MethodPathDescription
GET/{tenant}/{namespace}List scalable topics
PUT/{tenant}/{namespace}/{topic}Create scalable topic
GET/{tenant}/{namespace}/{topic}Get topic metadata
DELETE/{tenant}/{namespace}/{topic}Delete scalable topic
GET/{tenant}/{namespace}/{topic}/statsGet aggregated stats for the scalable topic (segment counts, per-segment rates, per-subscription state, consumer counts)
PUT/{tenant}/{namespace}/{topic}/subscriptions/{subscription}Create a subscription on the scalable topic. The controller propagates it to all active segments by issuing createSubscription on each segment:// topic via the Segments admin API.
DELETE/{tenant}/{namespace}/{topic}/subscriptions/{subscription}Delete a subscription. The controller unregisters all consumers and deletes the subscription from every segment.
POST/{tenant}/{namespace}/{topic}/split/{segmentId}Split a segment
POST/{tenant}/{namespace}/{topic}/merge/{segmentId1}/{segmentId2}Merge two segments

Segments (/admin/v2/segments):

MethodPathDescription
PUT/{tenant}/{namespace}/{topic}/{descriptor}Create segment topic
POST/{tenant}/{namespace}/{topic}/{descriptor}/terminateTerminate segment
DELETE/{tenant}/{namespace}/{topic}/{descriptor}Delete segment topic

Admin Client API

New ScalableTopics interface on PulsarAdmin:

  • listScalableTopics(namespace) / listScalableTopicsAsync(namespace)
  • createScalableTopic(topic, numSegments) / createScalableTopicAsync(...)
  • getMetadata(topic) / getMetadataAsync(topic)
  • getStats(topic) / getStatsAsync(topic)
  • deleteScalableTopic(topic) / deleteScalableTopicAsync(topic)
  • createSubscription(topic, subscription) / createSubscriptionAsync(...)
  • deleteSubscription(topic, subscription) / deleteSubscriptionAsync(...)
  • splitSegment(topic, segmentId) / splitSegmentAsync(...)
  • mergeSegments(topic, segmentId1, segmentId2) / mergeSegmentsAsync(...)
  • createSegment(segmentTopic, subscriptions) / createSegmentAsync(...)
  • terminateSegment(segmentTopic) / terminateSegmentAsync(...)
  • deleteSegment(segmentTopic, force) / deleteSegmentAsync(...)

Configuration

PropertyDefaultDescription
scalableTopicEnabledtrueEnable scalable topic support on the broker

Additional configuration for auto-split thresholds and consumer controller session grace period will be specified in follow-up sub-PIPs.

Metadata Store Paths

PathContent
/topics/{tenant}/{namespace}/{topic}ScalableTopicMetadata JSON — segment DAG and global topic state
/topics/{tenant}/{namespace}/{topic}/controllerLeader broker URL (ephemeral)
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}SubscriptionMetadata — subscription-level config and the persisted set of consumer registrations
/topics/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/consumers/{consumerName}ConsumerRegistration — the durable session: consumer name and its current segment assignment. Keep-alive state (connected/disconnected, grace-period timer) is in-memory on the controller leader only and is not persisted here.

Backward & Forward Compatibility

Upgrade

The controller is a new component with no interaction with existing partitioned or non-partitioned topics. Enabling scalableTopicEnabled activates the new code paths. Existing topics are unaffected.

Downgrade / Rollback

Scalable topic metadata and segment data remain in the metadata store and BookKeeper. Rolling back to a version without scalable topic support leaves this data intact but inaccessible. Upgrading again restores access.

Client Compatibility

The existing Pulsar client SDK rejects topic:// and segment:// domains with a clear error message directing users to the V5 client SDK. This check is enforced in PulsarClientImpl for producer, consumer, and reader creation paths.


Security Considerations

The controller follows the same tenant/namespace/topic authorization model as existing Pulsar topics:

  • Admin API operations require topic-level admin permissions.
  • DAG watch sessions require topic-level lookup permissions.
  • Consumer registration requires topic-level consume permissions.
  • Segment operations are internal and authenticated via the broker's internal admin client.

The controller leader lock in the metadata store is accessible only to authenticated brokers.