pip/pip-460.md
Pulsar's partitioned topic model has three fundamental limitations that make it unsuitable as the sole topic type for applications requiring elastic, transparent scaling.
Many applications rely on key-based message ordering: all messages for the same key must be processed in sequence, guaranteeing correctness for deduplication, session aggregation, stateful stream processing, and similar use cases. Key-to-partition routing uses modulo hashing: hash(key) % numPartitions. When the partition count changes — say from 4 to 8 — the modulo mapping changes completely. A key previously routed to partition 2 may now hash to partition 6. Messages produced after the change land on a different partition than those produced before it, silently breaking the ordering contract for all in-flight messages. The only safe workaround is to fully drain the old topic before switching partition count, which requires downtime.
Pulsar does not support reducing the number of partitions. Once a topic is scaled up to 64 partitions, it remains at 64 partitions permanently — even if traffic drops to a level where 4 partitions would suffice. Decreasing partitions would require re-routing keys (breaking ordering), migrating unconsumed messages from removed partitions, and updating every active consumer's assignment. No safe mechanism exists for this today. The result is partition count drift: topics accumulate more partitions than needed, wasting broker resources, metadata store entries, and consumer connections.
Because partitions cannot shrink and growing them breaks ordering, operators must predict the right partition count at topic creation time. Over-provisioning wastes resources; under-provisioning leads to write-hot partitions that eventually require re-creating the topic with a higher partition count. Neither outcome is acceptable for a system that aims to be operationally simple at scale.
Apache Pulsar scales topics by pre-configuring a fixed number of partitions. Each partition is an independent persistent topic backed by a single managed ledger on a single broker. A managed ledger is Pulsar's abstraction over Apache BookKeeper, providing an append-only log with cursor-based consumption tracking.
For key-ordered messaging, producers route messages using hash(key) % numPartitions, guaranteeing that all messages with the same key land on the same partition and are consumed in order. Consumer groups are partition-aware: each consumer in a group is assigned one or more partitions, and the assignment is stable as long as the partition count does not change.
A managed ledger is a sequence of BookKeeper ledgers forming a single logical stream. Subscriptions track their position in this stream using cursors. The managed ledger layer handles ledger rollover (sealing a full ledger and opening a new one), cursor advancement, and interaction with BookKeeper for durable storage.
Pulsar's existing topic termination feature allows a managed ledger to be permanently sealed — a capability that is relevant to scalable topics when a range segment is split or merged.
In some of the above areas, the core design must accommodate features that may be deferred beyond the Pulsar 5.0 LTS timeline so that the implementation can be extended later without a redesign. Potentially deferred features include:
Northguard introduces a range-based data model. A topic is a named collection of ranges that together cover the full keyspace. Each range is a sequence of segments (the unit of replication), where each segment is assigned to a different set of brokers — a technique called log striping that keeps the cluster balanced by design.
Scaling is achieved through range splitting and merging using a buddy algorithm: a range can be split into exactly two child ranges, and only two ranges previously split from the same parent can be merged back. This provides clean ordering guarantees — all records in a parent range happen-before records in any child range, and all records in sibling ranges happen-before records in their merged parent. It also naturally aligns keyspaces across different topics, eliminating the need for shuffle stages in stream processing joins.
Unlike indexed partitions, which require a global synchronization barrier when the partition count changes, range splits only interrupt producers writing to the specific range being split.
Pravega uses a similar segment-based model with splitting and merging. Neighboring keyspace ranges can be merged, which is slightly more flexible than Northguard's strict buddy constraint. Pravega's consumer-side abstractions include:
Pravega's Flink connector experience highlights a key challenge: when range topology changes, the stream processing connector must handle checkpoint restoration across segment boundaries and coordinate watermarks correctly across a dynamic number of streams. For key-ordered processing, Pravega routes events to segments by routing key, ensuring all events for a given key are read by the same parallel Flink instance. However, stateful downstream operators that require key-based ordering still need a Flink keyBy() transformation, which introduces a network shuffle to re-route events to the correct Flink task.
Note: This section does not contain the high-level design for Scalable Topics. The full high-level design — covering functionality, capabilities and quality characteristics, concepts, components, interactions, contracts, and related design decisions — will be defined in a dedicated sub-PIP once PIP-460 has received high-level acceptance from the Pulsar community to proceed with planning.
PIP-460 is intentionally a vision and problem statement document. Its purpose is to establish the motivation, goals, and key challenges that need to be resolved and to seek community alignment in the direction before detailed design work begins.
The sections below sketch some preliminary design directions to illustrate the problem space. These are subject to change as the detailed design evolves in sub-PIPs.
| Concept | Description |
|---|---|
| Scalable Topic | A new topic type identified by the topic:// URL scheme. Scales transparently through range splitting and merging without requiring recreation or downtime. |
| Range Segment | The fundamental unit of a scalable topic. Covers a contiguous portion of the keyspace (a hash range) and is backed by exactly one managed ledger on one broker. Often referred to as "range" for brevity. |
| Keyspace | The full space of possible key hashes. Each range segment is assigned a non-overlapping slice of the keyspace; together all active segments cover the full keyspace. |
| Segment DAG | The directed acyclic graph that records the full history of range segments for a topic. Parent-child edges are added when segments are split or merged. Encodes a strict happens-before relationship: all messages in a parent segment precede messages in its children. |
| Split | An operation that seals a range segment and creates two child segments, each covering a sub-portion of the parent's keyspace. A local operation on the owning broker. |
| Merge | An operation that seals two or more adjacent range segments and creates a single child segment covering their combined keyspace. Requires cross-broker coordination. |
| Merge Leader | The broker that coordinates a merge operation, responsible for sealing all involved segments and creating the merged child. |
| Watch Session | A persistent connection between a client and a broker through which the broker pushes the initial topic state and subsequent DAG topology updates. Replaces one-shot lookup requests. |
| Consumer Controller | A broker elected via leader election that manages segment-to-consumer assignments for a subscription. Persists assignment state to survive broker failures. |
| Consumer Controller Session | A persistent bidirectional stream between a consumer and the controller broker used to push assignments and receive progress reports. |
| Consumer Identity | A stable name chosen by the client that identifies a consumer controller session. Allows the session to be restored after a reconnect within the grace period. |
| Consumer Session Grace Period | Acts as a persistent lease on segment assignments. When a consumer disconnects, its assignments are held in reserve for the duration of the grace period rather than immediately redistributed. If the consumer reconnects within this window, the lease is renewed and assignments are restored unchanged. |
| StreamConsumer | A consumer type registered with the controller that receives exclusive segment assignments and processes messages in key order within each segment. |
| QueueConsumer | A consumer type that subscribes to all active segments with shared dispatch. No cross-segment ordering guarantees. |
| CheckpointConsumer | A consumer type registered with the controller that tracks read positions externally via a serializable Checkpoint snapshot. Designed for stream processing frameworks such as Flink. |
| Checkpoint | A serializable snapshot of read positions across all assigned segments, used by the CheckpointConsumer to restore processing state after a restart or topology change. |
| Tailing Reads | A consumption pattern where consumers follow closely behind producers on active (non-sealed) segments, making consumer-side I/O metrics potentially useful for scaling decisions. |
A scalable topic is a DAG of range segments. A topic starts with one or more range segments. Each range segment covers a contiguous portion of the keyspace (expressed as a hash range). At produce time, a keyed message is routed to the active segment whose hash range covers the message's key hash.
A range segment can be split into two child segments, each inheriting a sub-portion of the parent's keyspace. Adjacent range segments can be merged into a single child segment covering the combined keyspace. These operations extend the DAG by adding parent-child edges.
The DAG encodes a strict happens-before relationship: all messages in a parent segment happen-before messages in any of its child segments. Consumers must traverse the DAG in this order to maintain correctness across splits and merges.
A "range segment" is often referred to as a "range" for brevity. Each range is backed by exactly one managed ledger and is owned by exactly one broker at any given time.
Splitting does not require cross-broker coordination — since a range is owned by a single broker, the split is a local operation on the owning broker:
Merging requires cross-broker coordination since the segments being merged may be owned by different brokers. One broker acts as the merge leader, coordinating the seal of all involved segments and the creation of the merged child. Merging multiple adjacent segments at once is supported. As with splitting, the updated DAG is pushed to clients via their active watch sessions.
Scalable topics use a dedicated URL scheme to make the topic type unambiguous at the API and routing level:
topic://{tenant}/{namespace}/{name}segment://{tenant}/{namespace}/{name}/{segment-id}Using a distinct scheme — rather than a prefix in the topic name — avoids ambiguity with existing persistent:// and non-persistent:// URLs and enables topic type detection without an extra metadata round-trip.
Rather than a one-shot request/response, the lookup for a scalable topic establishes a persistent watch session between the client and a broker. The broker returns the initial topic state — current active segments, their keyspace boundaries, and broker addresses — and then proactively pushes updates whenever the segment DAG changes due to a split or merge. This avoids polling and the need for clients to re-issue lookups on topology changes.
For producers, any broker can serve the watch session, since segment metadata lives in the shared metadata store. For controller-coordinated consumers (StreamConsumer and CheckpointConsumer), the watch session additionally delivers the address of the elected controller broker for the subscription.
Three consumption patterns are supported:
Checkpoint snapshot rather than subscription cursors. Designed for use by stream processing frameworks (Flink, Beam) that manage their own state and require restorable read positions across topology changes.StreamConsumer and CheckpointConsumer require coordinated segment assignment. A single broker, elected as the controller via leader election in the metadata store, manages segment-to-consumer assignments for a given subscription. The controller persists the assignment state so that it can be restored after a broker failure without requiring consumers to rebalance from scratch.
Consumers register with the controller using a persistent bidirectional stream, establishing a consumer controller session. The controller pushes segment assignment updates through this stream, and consumers report segment completions and acknowledgment positions back. Each session is identified by a stable consumer identity (a name chosen by the client). If the underlying connection drops, the session enters a grace period during which the consumer's assigned segments are held in reserve and not reassigned to other consumers. If the consumer reconnects with the same identity before the grace period expires (configurable, default ~1 minute), the session is restored and the consumer receives its previous assignments unchanged. If the grace period expires without reconnection, the session ends and the segments are redistributed to the remaining consumers.
A new API namespace is introduced (e.g., org.apache.pulsar.client.api.v5) designed from the ground up for scalable topics. The existing API remains unchanged and fully supported in 5.0 LTS releases. Deprecation of the existing API would be planned for a later LTS release (6.0 LTS) and removed in the next LTS release after that (7.0 LTS).
The new API introduces type-safe, separate consumer interfaces — StreamConsumer, QueueConsumer, and CheckpointConsumer — each exposing only the operations valid for that consumption pattern. This makes invalid operations unrepresentable at compile time rather than producing runtime exceptions. Each interface has its own builder exposing only the configuration options that apply to it.
This replaces Pulsar's existing subscription type model (Exclusive, Failover, Shared, Key_Shared), where a single Consumer interface exposes operations that are silently no-ops depending on the subscription type.
New client API implementations are required for all supported language client SDKs, such as Java, Python, Go, C++, DotNet, and Node.js.
Given the Pulsar 5.0 LTS timeline (targeting October 2026) and the strategy of shipping pre-LTS features in 4.x for community feedback:
The above phases are subject to change as the design evolves.
| Decision | Approach |
|---|---|
| Scaling mechanism | Range splitting and merging via a segment DAG |
| Storage per range | One managed ledger per range segment |
| Key routing | Range-based hash assignment (replaces hash(key) % numPartitions) |
| Split coordination | Single-broker operation (range is owned by one broker) |
| Merge coordination | Cross-broker, leader-based; supports merging multiple adjacent ranges at once |
| Lookup | Persistent watch session with server-pushed DAG updates |
| Consumer coordination | Elected controller broker; persistent bidirectional stream; persisted assignment state |
| Consumer controller session | Identified by consumer identity; grace period (~1 min) holds assigned segments in reserve on disconnect, allowing reconnection without rebalancing |
| Consumer model | StreamConsumer + QueueConsumer + CheckpointConsumer (replaces Exclusive/Failover/Shared/Key_Shared) |
| Read-side scaling within a range | Key_Shared-style dispatch within a segment (StreamConsumer mode) |
| Client API | New API namespace; type-safe separate consumer interfaces |
| Metadata store | Oxia (preferred for Pulsar 5.0); enables streaming watch sessions |
| Delivery | Phased: 4.3.0 → 4.4.0 → 5.0 LTS |
This section summarizes the key challenges that scalable topics introduce. Each is addressed at a high level here; detailed solutions are deferred to sub-PIPs. The presented solutions are subject to change as the design evolves.
When a single range segment becomes a write bottleneck, the system should allow splitting the range segment. The split must be coordinated: the current segment is sealed, child segments are created, and producers are redirected. The key-to-range mapping must update atomically from the client's perspective. Both automatic (I/O-threshold-based) and manual (admin API) triggers should be supported.
Splitting and merging are fundamentally producer-side decisions: they affect where new messages are routed. In tailing reads mode — where consumers follow closely behind producers — consumer-side I/O metrics could also inform scaling decisions, and the number of splits could adapt to the number of available consumers. Once a segment is sealed, its topology is fixed.
Auto-merge is the symmetric case: when I/O on adjacent segments drops below a low-water mark, those segments become candidates for merging. This is harder in practice than auto-split. Merging requires cross-broker coordination, and the buddy algorithm constraint means only segments that share split ancestry are eligible. There is also a risk of thrashing under a fluctuating load if the threshold is naive. Manual merge via the admin API is also supported, which makes it possible to drive merge decisions from an external controller that observes traffic patterns. The detailed auto-merge policy is deferred to a sub-PIP that covers range splitting and merging.
When segments are split, the system must make newly created segments available to additional consumer instances. The consumer controller handles rebalancing. In container orchestration environments such as Kubernetes, the current segment count can be exposed as a scaling metric so that the consumer deployment scales out and in automatically.
Segment count is determined by produce-time topology and is fixed once segments are sealed. To scale reads independently of writes, the StreamConsumer supports a Key_Shared-style dispatch mode within a single segment, fanning out consumption to multiple consumers without requiring the segment to be split.
Current Pulsar Key_Shared consumption requires that producers use BatcherBuilder.KEY_BASED so the broker can route batch entries to the correct consumer. For scalable topics, eliminating this producer-side requirement is necessary.
The new message entry format must allow the broker to route or filter individual messages within a batch entry without requiring the producer and broker to perform costly operations. It must also support selective decompression and decryption so that only the relevant portion of an entry is processed by each consumer or replication path. The format should support both keyed and non-keyed (total-order) modes, and its design affects the MessageId representation, the acknowledgement model, and geo-replication routing. Because existing clients are not compatible with scalable topics, the message entry format can be changed without backward compatibility constraints. The detailed wire format, encoding, and related design decisions will be addressed in a dedicated sub-PIP.
Key-ordered processing must be maintained across splits and merges. When a segment is split, consumers must finish processing all messages in the parent segment before reading from its children — this is the happens-before guarantee encoded by the segment DAG. Consumers that traverse sealed segments during catch-up reads must follow this parent-before-child order to preserve correctness.
Exactly-once messaging semantics depend on three mechanisms: idempotent producers (sequence-number-based deduplication at the broker), broker-side deduplication state, and transactions for atomic multi-segment writes. Each of these is currently scoped to a single topic or partition. Scalable topics introduce a dynamic segment lifecycle where segments are created, sealed, and replaced over time. Deduplication state must be carried over correctly during splits. Transactions that span multiple range segments must commit atomically. All areas require careful design.
Key_Shared-style dispatch within a segment may require independent per-range cursors rather than the single shared cursor used in the current Key_Shared model, allowing the consumer controller to track and rebalance progress at hash-range granularity. Key distribution imbalance — where a small number of hot keys dominate throughput — is a known challenge in the current Key_Shared implementation; the consumer controller's ability to reassign and drain individual hash ranges addresses this at the assignment level. This feature may be deferred beyond Pulsar 5.0, but the design must remain extensible to support it without a redesign. The cursor model and assignment protocol details will be specified in the StreamConsumer sub-PIP.
If the controller broker fails, a new controller is elected and restores the persisted assignment state. Consumers can continue consuming from their already-assigned segments until the new controller is ready. On startup, the new controller treats all sessions as disconnected and immediately starts the grace period for each. Consumers that reconnect within the grace period have their sessions restored and receive the same segment assignments; those that do not reconnect in time have their sessions expired and their segments redistributed.
The StreamConsumer could use cumulative acknowledgements when a single consumer processes a segment sequentially, reducing overhead and metadata store load. However, when Key_Shared-style dispatch fans out messages from a single segment to multiple consumers, messages may be processed out of order, requiring individual acknowledgements. The QueueConsumer also requires individual acknowledgements for the same reason. An additional complication is that a single entry can be delivered to two consumers when it spans a bucket boundary; such entries must be tracked individually regardless of the acknowledgement model. The final model will be determined in the consumer API sub-PIP.
Delayed message delivery requires tracking timer state across all active (non-sealed) range segments, since delayed messages can be spread across multiple segments. The delay timer tracking and dispatch mechanism must account for the multi-segment topology. One approach is to maintain an active subscription and consumer on every segment that still contains undelivered delayed messages.
Pulsar's transaction system is currently partition/topic-oriented. Extending transactions to span records across multiple range segments of the same scalable topic requires careful coordination, especially if a segment is split or merged during the transaction's lifetime. Transaction support is deferred to Phase 4.
Stream processing frameworks manage their own state and require stable, restorable read positions. When a range is split or merged, the Flink source connector must handle the new segment topology and map restored checkpoints — which may reference now-sealed segments — to correct positions in the successor segments. Watermark propagation must account for a dynamically changing number of input streams. A clean integration with Flink's Source/SplitEnumerator model is a design goal; the specifics will be addressed in a follow-on PIP.
A foundational design decision for geo-replication is that range segments are independent per cluster — the segment topology in one cluster is not required to match, or be coordinated with, the topology in another. A cluster is free to split or merge its own segments based on its local traffic patterns, independently of what remote clusters do. Coupling segment topologies across clusters would reintroduce a global coordination requirement that scalable topics are specifically designed to avoid. However, the independence means the segment DAG can diverge between clusters, which creates additional challenges for both message routing and subscription position tracking.
For message replication, this independence means that a replicator cannot assume that a segment with a given identifier exists on the remote cluster. Instead, replication must route each message to whichever active segment on the remote cluster covers the message's key hash at the time of replication — the same routing logic used by producers.
Replicated subscriptions require a fundamentally new mechanism. The existing approach assumes a subscription position is a single point in a linear stream; with scalable topics, it is a set of positions across multiple active range segments whose DAGs can diverge independently between clusters. The detailed design is deferred to a dedicated sub-PIP.
Geo-replication also requires the new message entry format described in Message Entry Format, allowing the receiving broker to route an incoming batch to the correct destination segment without decompressing or decrypting the payload. The interaction between batch boundaries, compression, and encryption at the replication layer will be addressed jointly by the message entry format sub-PIP and the geo-replication sub-PIP.
This section gives a high-level overview of the changes visible to operators and application developers. Each area is intentionally kept brief here — the precise API surface, configuration keys, metric names, and protocol details are defined in the dedicated sub-PIPs.
A new topic type — scalable topic — is introduced and registered in Pulsar's topic type system. It is created via the admin API with a new topic type identifier and is addressable using the topic:// URL scheme.
New admin API endpoints for scalable topics:
The existing Pulsar binary protocol topic lookup command is not used for scalable topics. Broker discovery is handled entirely through the two new bidirectional streams described below — the DAG watch session for segment topology and broker addresses, and the consumer controller stream for assignment coordination.
Two new bidirectional protocol streams are introduced:
Additional protocol changes:
MessageId format that includes the range segment identifier. Support for the new message entry format also impacts the message ID format.segment:// topic domain alongside the existing topic:// domainA new client module (pulsar-client-v5) introduces type-safe interfaces for scalable topics:
ScalableTopicProducer: opens a DAG watch session; routes each message to the active segment covering the message's key hash; adapts automatically as the topology changes.StreamConsumer: registers with the controller broker; receives exclusive segment assignments; consumes in key order within each assigned segment; uses subscription cursors for position tracking.QueueConsumer: subscribes to all active segments with shared dispatch; no controller interaction; messages delivered without cross-segment ordering guarantees.CheckpointConsumer: registers with the controller like a StreamConsumer, but tracks positions externally via a Checkpoint snapshot instead of subscription cursors. Requires a group identifier to coordinate parallel instances. Designed for Flink and similar stream processing frameworks.Each sub-PIP introduces its own configuration options. The examples below are illustrative; the complete list will grow as sub-PIPs are written. All options are expected to be configurable at broker level, with selected options also overridable at namespace and topic level.
Examples of anticipated configuration options:
Each sub-PIP defines the metrics relevant to its area. The examples below are illustrative; the complete set will be specified in a dedicated metrics sub-PIP.
Examples of anticipated metrics:
Scalable topics are introduced as a new, distinct topic type. No changes are made to the existing partitioned or non-partitioned topic implementation. Existing clients and topics continue to function without modification with partitioned or non-partitioned topics.
Scalable topics require a compatible broker and client version. Clients connecting to older brokers will receive an error when attempting to create or connect to a scalable topic, and older clients cannot connect to scalable topics even on a compatible broker.
Scalable topics are not usable on broker versions that predate this feature. Rolling back to an older broker version is possible; existing non-partitioned and partitioned topics continue to work normally. Scalable topic data and metadata should be retained in BookKeeper and the metadata store during a rollback so that upgrading again to a compatible broker version resumes access to scalable topics.
Geo-replication for scalable topics is a new protocol path. Clusters participating in geo-replication of scalable topics must all run a compatible broker version. Replicating scalable topics to a cluster running an older Pulsar version is not supported. Detailed upgrade considerations will be specified in the geo-replication sub-PIP.
Scalable topics follow the same tenant/namespace/topic authorization model as existing Pulsar topics. The new admin API endpoints for segment operations (split, merge, metadata read) will require the same permissions as equivalent topic-level admin operations.
The watch session and the consumer controller stream are authenticated using the same mechanisms as existing Pulsar binary protocol connections (TLS, token authentication, mutual TLS).
Multi-tenancy isolation is preserved: a client can only watch or interact with segments that belong to topics it has permission to access. The controller does not expose the assignment state across subscriptions or tenants.
This is a parent PIP. The following sub-PIPs are planned to cover each major design area in detail. Each sub-PIP will include its own public-facing changes, detailed design, and backward compatibility sections. A single sub-PIP may address multiple related areas from this list if they are tightly coupled.
| Sub-PIP | Area |
|---|---|
| TBD | High-level architecture: functionality, capabilities, concepts, components, interactions, contracts, and design decisions |
| TBD | Range segment data model and metadata store schema |
| TBD | StreamConsumer API and protocol |
| TBD | QueueConsumer API and protocol |
| TBD | CheckpointConsumer API and protocol |
| TBD | Migration strategy |
| TBD | Producer protocol: watch sessions, range routing, topology change handling |
| TBD | Consumer controller: leader election, assignment protocol, persistent state, grace period |
| TBD | Range splitting: broker-side protocol, admin API, auto-split triggers |
| TBD | Range merging: cross-broker coordination protocol, admin API |
| TBD | Message entry format for Scalable Topics |
| TBD | Scalable topic metrics |
| TBD | Transaction support across range segments |
| TBD | Geo-replication for scalable topics |
| TBD | Replicated Subscriptions for scalable topics |
| TBD | Stream processing connector for scalable topics (Flink / Apache Beam) |
Partitioned topics could be improved incrementally (e.g., online repartitioning with a migration period, smarter placement). However, modulo-based routing is fundamentally incompatible with transparent key-preserving repartitioning. A clean-break design is necessary to deliver the desired semantics without carrying forward the modulo constraint.
The scalable topic feature is targeted at Pulsar 5.0 as the LTS milestone. The phased delivery path through 4.3.0 and 4.4.0 is designed to collect implementation experience and community feedback before the LTS commitment.