pip/pip-475.md
Sub-PIP of PIP-460: Scalable Topics
PIP-460 introduces scalable topics (topic://...) as a new topic type that supports range splitting and merging without breaking key ordering. For this to be adoptable in real deployments, users with existing partitioned and non-partitioned topics need a migration path that:
PIP-460 lists "Tooling for migrating existing partitioned topics to scalable topics" in its postponed section. This PIP closes that gap.
This PIP also clarifies the V5 SDK's behavior when given a topic name that may or may not be scalable, and tightens the broker so that a v4 client cannot accidentally write to (or auto-create) a regular topic that has already been migrated.
The longer-term direction for Pulsar is for scalable topics to fully replace partitioned and non-partitioned topics: the existing topic types stay supported for backward compatibility, but new development on the topic surface targets scalable topics, and migration tooling like this PIP is what lets existing deployments make that transition incrementally instead of all at once.
A Pulsar topic name encodes its domain in a URI scheme:
persistent://t/n/x — durable topic backed by a managed ledger.non-persistent://t/n/x — in-memory topic, no durability.topic://t/n/x — scalable topic introduced by PIP-460. Backed by a DAG of segments; each segment is itself a segment://... topic with its own managed ledger.A short name like my-topic is normalized to persistent://public/default/my-topic by the v4 client. The V5 SDK keeps the same normalization, then opens a scalable-topic lookup session for the resolved name (see V5 SDK resolution rule below).
Two distinct shapes:
persistent://t/n/x with one managed ledger.persistent://t/n/x is a logical name; the actual data lives in persistent://t/n/x-partition-0 … persistent://t/n/x-partition-(N-1), each with its own managed ledger. The v4 client's partitioned producer routes each message to a partition by signSafeMod(murmurHash3_32(key), N).A scalable topic's hash space is [0x0000, 0xFFFF]. Each active segment owns a contiguous sub-range. The V5 producer routes a message with key K to the segment whose range contains murmurHash3_32(K) & 0xFFFF. Segments split into two halves; halves can later be merged back. The routing function is range-based, not modulo-based.
This difference matters for partitioned-topic migration: the migrated layout's initial segments line up 1:1 with the partitions, but their hash-range routing wouldn't agree with v4's mod-N routing for all keys. The fix is described in Routing across migration below.
admin.scalableTopics().migrateToScalable(topic) — that converts an existing regular topic into a scalable topic in a single atomic step, with no data copy and no cursor migration.non-persistent:// topics in V5 builders. Non-persistent topics are out of scope for V5 entirely.When a V5 application calls client.newProducer(...).topic(input) (or any consumer builder), the SDK opens a scalable-topic lookup session for the topic — the same long-lived push-based session that PIP-468 defines for topic://... discovery. The lookup session is the single source of truth for what the topic looks like, and how it routes; there is no separate "probe" call and no client-side TTL cache.
The broker responds to the lookup session based on the input form and the topic's current state:
| Input form | Lookup response |
|---|---|
topic://t/n/x | Real DAG layout (or NotFound if the scalable topic doesn't exist — same as today). |
persistent://t/n/x | If scalable metadata exists for the equivalent topic://t/n/x: real DAG layout, with the topic identity promoted to topic://... for all subsequent operations. Otherwise: a synthetic layout that models the regular topic's partitions as legacy segments (see Legacy segments below). |
my-topic (or any short / unscoped form) | Normalize to persistent://public/default/my-topic then apply the rule above. |
non-persistent://... | Reject at create() / subscribe() with UnsupportedOperationException. V5 does not support non-persistent topics. |
Because the lookup session is push-based, the broker can update the layout in place when the topic is migrated. The V5 SDK already handles layout changes — splits and merges go through the same machinery — so a migration is observed as one more layout-change event: synthetic layout → real DAG. The application sees its Producer<T> / Consumer<T> keep working through the transition; the SDK's existing reconnection logic handles the underlying connection swap internally.
This gives the "once scalable, always scalable" guarantee (Goal 4) for free: once the lookup session has reported a real DAG, future updates can only refine the DAG via splits / merges; there is no "downgrade to synthetic" path the broker exposes.
A legacy segment is the lookup-session encoding of "this slice of the keyspace is not yet a real segment://... topic — it's the existing persistent://t/n/x[-partition-K] topic instead." It carries the same fields as a regular segment plus a marker that points at the underlying persistent://... name.
The V5 SDK's per-segment producer and consumer infrastructure already attaches to a topic name; the only difference for a legacy segment is that the name uses the persistent:// domain instead of segment://. No separate code path, no separate wrapper class hierarchy.
For routing, the layout carries the routing function as data:
signSafeMod(murmurHash3_32(key), N) — exactly v4's partitioned-producer routing. Producers route the same way the v4 SDK would, ensuring per-key ordering during the migration window.Pre-migration (steady state):
• Topic exists as persistent://t/n/x (with N partitions, possibly N=0)
• Some clients are still on the v4 SDK; others have been upgraded to V5.
• V5 clients see a SYNTHETIC layout from the lookup session: N special
segments (or 1 for non-partitioned) pointing at the existing
persistent://...-partition-K topics, with mod-N routing.
Step 1 — Operator upgrades all clients to the V5 SDK.
Step 2 enforces this by default: the migration command inspects the
topic's attached producers/consumers and fails with HTTP 409 if any
*legacy v4* connection remains. V5 connections are recognised by a
reserved metadata marker the V5 SDK sets on every per-segment v4
producer/consumer it creates, so a topic served only by upgraded V5
clients passes the check even though those clients are attached to the
underlying persistent:// partitions via the synthetic layout. The
operator can override the check with --force (e.g. to evict a stale v4
client). Old code keeps working unchanged before migration because the
synthetic layout exposes the existing persistent topics through the V5
surface; routing is mod-N so per-key destinations are identical to v4
partitioned-topic routing.
Step 2 — Operator runs:
pulsar-admin scalable-topics migrate persistent://t/n/x [--force]
The broker:
2a. Validates that the topic exists and is not already scalable (else
404 / 409). Unless --force is set, validates that no legacy v4
producer/consumer connection is attached (counting only connections
without the V5-managed marker); if any are, fails with HTTP 409.
2b. Builds ScalableTopicMetadata with:
• N sealed legacy parent segments (or 1 for non-partitioned), each
wrapping the existing persistent://t/n/x-partition-K (or
persistent://t/n/x for non-partitioned). The managed ledgers are
unchanged; no data copy. Each parent spans the full hash range
(v4 mod-N routing scattered keys for any child range across every
partition).
• N active child segments (or 1) with equal-width contiguous hash
ranges covering [0x0000, 0xFFFF], using standard range-based
routing. Each child has all N parents as predecessors in the DAG.
New writes route to children; subscriptions drain the parents
before consuming from children — the same drain-before-assign
protocol the controller already uses for segment splits / merges.
2c. Creates the backing segment topics for the new active children
(the sealed parents reuse the existing persistent:// managed ledgers).
2d. Atomically writes the metadata, taking the topic from
"regular" to "scalable" in one CAS on the metadata store. This is
the commit point; connected lookup sessions are pushed the real DAG
via the metadata-store watch.
2e. Terminates the old persistent:// topic(s) so no further v4 writes
can land — they become the drainable sealed parent segments.
Step 3 — Connected V5 clients receive the layout-update push on their
lookup session. Synthetic layout → real DAG. The SDK's existing
layout-change handling (used for split / merge) drives any internal
reconnection. Application-visible behaviour: nothing changes.
The synthetic layout (pre-migration) routes mod-N so that V5 producers write to the same partitions v4 producers would. The post-migration real DAG routes by hash range — the standard scalable-topic semantics. The two routings are not equivalent: a given key's destination segment can change at the migration moment.
Per-key ordering is preserved by the existing scalable-topic drain-before-assign protocol, the same one the subscription controller already uses for splits and merges:
Producers route to children using standard range-based routing immediately after the migration commit; their writes land in the new segments while the parents are draining behind them. No special routing flag is needed: the migration reuses the same machinery that handles splits, with the parent fan-in (each child has N parents instead of 1) being the only structural difference.
For non-partitioned topics (N=1) the same protocol applies trivially: one sealed parent and one active child covering the full hash range.
The SDK reuses its existing scalable code path — the lookup session, per-segment producer / consumer infrastructure, layout-change handler — for every topic, including regular ones. The only new bits are: how the lookup session is opened from persistent:// / short-form input, how the SDK interprets a "legacy segment" entry in the layout, and how it carries the routing function carried by the layout.
PulsarClientV5 opens a lookup session for the user's topic regardless of domain. The existing scalable-topic lookup-session machinery is extended so the broker accepts persistent://... and short-form names in addition to topic://.... The session response carries:
topic://t/n/x after normalization).mod-N for the synthetic layout, range-based for the real DAG).non-persistent://... inputs are rejected at the V5 builder before the lookup is opened, with UnsupportedOperationException.
A regular Segment carries a segment://... URI. A legacy segment carries a persistent://... URI instead, plus a flag indicating it's special. The SDK's per-segment producer (PerSegmentProducer) and per-segment consumer (PerSegmentConsumer) attach to whatever URI the segment carries — the v4 producer / consumer beneath them already accepts persistent:// and segment:// alike. No separate adapter classes are introduced.
V5-specific features that don't apply on a regular topic surface as ordinary "this layout doesn't support that" errors at the API surface:
CheckpointConsumer requires a sealed-segment history; on a synthetic layout there is none, so subscribe fails with a clear error pointing at the migration command.splitSegment / mergeSegments admin operations on a synthetic layout fail at the broker with the same error class — the operations require real ScalableTopicMetadata.topic://...-domain DLQ targets work transparently for both layouts because the DLQ is its own scalable topic; nothing about it depends on the source topic's layout shape.Layout updates pushed by the lookup session already trigger the SDK's per-segment reconcile: segments that disappeared get their per-segment producers / consumers torn down; new segments get fresh ones; segments whose URI changed get rebuilt on the new URI.
A migration is exactly this: the legacy segment with URI persistent://t/n/x-partition-K is replaced in the new layout with a real segment whose URI is segment://t/n/x/<descriptor> — and that segment://... resolves to the same managed ledger. The reconciler tears down the per-segment v4 producer/consumer attached to the persistent:// URI and reattaches one to the segment:// URI; from the application's perspective the SDK's internal reconnect happens (as it does for any layout change) but the Producer<T> / Consumer<T> reference and the publish/receive flow are unaffected.
There is no TopicMigratedException exposed to the application by default. Applications that want to observe migrations can subscribe to the lookup session's layout-change events directly via a future hook — out of scope for this PIP.
POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate — authorized via a dedicated TopicOperation.MIGRATE_TO_SCALABLE operation, which the default authorization provider maps to PRODUCE permission on the topic (the same permission needed to write to it in the first place). Exposing it as its own operation lets third-party RBAC implementations restrict migration independently of produce. Migration is irreversible but does not destroy data, so the blast radius is bounded by what a write-permissioned user can already do; super-user is not required.
Query parameter: force (boolean, default false).
No request body.
Unless force=true, the broker enumerates the producers/consumers attached to the source topic (across all partitions, via the topic-stats path) and rejects the migration with HTTP 409 if any legacy v4 connection is present. A connection is recognised as V5-managed — and therefore excluded — by a reserved metadata marker the V5 SDK sets on every per-segment v4 producer/consumer it creates; everything else counts as a legacy v4 connection. force=true skips this check.
Response: 204 No Content on success. 409 Conflict if scalable metadata already exists or legacy v4 connections are still attached (and force is not set). 404 if the topic doesn't exist.
package org.apache.pulsar.client.admin;
public interface ScalableTopics {
/** Migrate an existing partitioned or non-partitioned topic to a scalable topic. */
void migrateToScalable(String topic) throws PulsarAdminException; // force = false
void migrateToScalable(String topic, boolean force) throws PulsarAdminException;
CompletableFuture<Void> migrateToScalableAsync(String topic); // force = false
CompletableFuture<Void> migrateToScalableAsync(String topic, boolean force);
}
pulsar-admin scalable-topics migrate persistent://t/n/x [--force]
Executed by the topic's owning broker:
ScalableTopicMetadata already exists at the path. If it does, fail 409.force, no legacy v4 connection is attached (counting only producers/consumers whose metadata lacks the V5-managed marker). If any are, fail 409.ScalableTopicController.createMigratedMetadata:
persistent://t/n/x-partition-K (or persistent://t/n/x). Each parent spans the full hash range.[0x0000, 0xFFFF]. Routing is range-based.epoch = 0, nextSegmentId = 2N.persistent:// managed ledgers, so they are skipped.ScalableTopicMetadata to /topics/t/n/x via metadata-store CAS. This is the commit point — once it succeeds, the topic is scalable. Connected lookup sessions transition from the synthetic layout to the real DAG via the metadata-store watch.persistent://t/n/x (or every partition, for a partitioned source) so no further v4 writes can land. Termination is exactly the sealed-parent state — existing data is still drainable by consumers, and the sealed legacy parent segments reference these terminated topics.Failures before step 4 leave the topic untouched; failures after step 4 leave the topic scalable.
Step 5 above (terminating the old topics) is the primary guard: an already-connected v4 producer's send() fails with TopicTerminatedError, and a freshly-connected one hits the same wall. This keeps stale v4 producers from writing to a topic that has been migrated.
To make the "once scalable, always scalable" guarantee robust even after the old (terminated, drained) topics are eventually removed by retention, the broker also refuses to auto-create a persistent://t/n/x topic when ScalableTopicMetadata exists for the equivalent topic://t/n/x. This prevents a stray v4 client from recreating a regular topic that would shadow the scalable one. The rejection reuses TopicTerminatedError (binary) / the equivalent REST status; no new error code is introduced. V5 clients are unaffected — they always use the scalable lookup session.
migratedFrom on ScalableTopicMetadataAn optional informational field migratedFrom: TopicMigrationOrigin? records pre-migration metadata (partition count, original persistent:// name) for debugging, metrics labelling, and future tooling. Not consulted on hot paths.
No legacyModNRouting flag or other migration-specific routing knob is needed: the post-migration real DAG uses standard range-based routing and per-key ordering is preserved by the controller's existing drain-before-assign protocol (see Routing across migration).
New endpoint:
POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate — requires produce permission on the topic; query param force (default false); no request body; 204 on success, 409 if already scalable or legacy v4 connections are still attached (and force is not set), 404 if topic missing.Modified behaviour:
persistent://t/n/x topic is refused when scalable metadata exists for the equivalent topic://t/n/x (reusing TopicTerminatedError); together with the post-migration termination of the old topics, this keeps v4 clients off a migrated topic.persistent://... and short-form names in addition to topic://.... For non-scalable topics it returns a synthetic layout with legacy-segment entries.TopicTerminatedError.pulsar-admin scalable-topics migrate <topic> [--force]persistent://... and short-form names; the SDK opens a scalable-topic lookup session that the broker resolves to either a real DAG or a synthetic layout.non-persistent://... with UnsupportedOperationException.The upgrade story this PIP supports is:
persistent:// clients and new topic:// clients side by side; no behavior change for existing topics.migrate. The migration is atomic and one-way.Old client versions continue to work with the cluster on un-migrated topics.
Out of scope; see Goals: Out of Scope.
An earlier draft of this PIP had the V5 SDK probe via admin.scalableTopics().getMetadataAsync(topic) at every builder call, cache the verdict with a TTL, and use a separate v4-wrapper code path for regular topics. Migration would be observed by connected clients as TopicTerminatedException from the v4 wrapper, which the SDK would catch and use as the cue to re-probe and rebuild on the scalable path.
Rejected in favour of the lookup-session approach because:
TopicMigratedException on receive futures, however brief; the lookup-session approach makes it strictly an internal SDK reconnect, with no application-visible signal at all.The migration command would create a new scalable topic alongside the regular one and stream all retained data through. Producers and consumers would cut over after the copy completes.
Rejected because:
migratedFrom is an optional informational field on ScalableTopicMetadata (partition count, original persistent:// name) for debugging and metrics labelling; not consulted on hot paths.