Back to Pulsar

PIP-475: Regular-to-Scalable Topic Migration

pip/pip-475.md

5.0.0-M127.4 KB
Original Source

PIP-475: Regular-to-Scalable Topic Migration

Sub-PIP of PIP-460: Scalable Topics

Motivation

PIP-460 introduces scalable topics (topic://...) as a new topic type that supports range splitting and merging without breaking key ordering. For this to be adoptable in real deployments, users with existing partitioned and non-partitioned topics need a migration path that:

  1. Doesn't require recreating their topics from scratch. Existing topics may hold months of retained data and have many active subscriptions. Re-create + re-publish is not a viable upgrade story.
  2. Lets clients adopt the V5 SDK before any topic is migrated. Operationally, applications need to be upgraded one at a time over weeks, while the topics they read and write keep working as-is. The V5 SDK has to interoperate with the old topic types until the migration moment.
  3. Keeps the migration moment small and surgical. Once all clients are on the V5 SDK, an admin command flips a topic from regular to scalable in a single atomic step, without copying data or moving cursors.
  4. Cannot be reversed. Once a topic is scalable, regressing to a regular topic is unsafe (the new layout can have already split, leaving data in segments that don't map back to a fixed partition count). The metadata transition has to be one-way.

PIP-460 lists "Tooling for migrating existing partitioned topics to scalable topics" in its postponed section. This PIP closes that gap.

This PIP also clarifies the V5 SDK's behavior when given a topic name that may or may not be scalable, and tightens the broker so that a v4 client cannot accidentally write to (or auto-create) a regular topic that has already been migrated.

The longer-term direction for Pulsar is for scalable topics to fully replace partitioned and non-partitioned topics: the existing topic types stay supported for backward compatibility, but new development on the topic surface targets scalable topics, and migration tooling like this PIP is what lets existing deployments make that transition incrementally instead of all at once.


Background Knowledge

Topic domains in Pulsar today

A Pulsar topic name encodes its domain in a URI scheme:

  • persistent://t/n/x — durable topic backed by a managed ledger.
  • non-persistent://t/n/x — in-memory topic, no durability.
  • topic://t/n/x — scalable topic introduced by PIP-460. Backed by a DAG of segments; each segment is itself a segment://... topic with its own managed ledger.

A short name like my-topic is normalized to persistent://public/default/my-topic by the v4 client. The V5 SDK keeps the same normalization, then opens a scalable-topic lookup session for the resolved name (see V5 SDK resolution rule below).

Partitioned vs. non-partitioned regular topics

Two distinct shapes:

  • Non-partitioned: a single persistent://t/n/x with one managed ledger.
  • Partitioned: persistent://t/n/x is a logical name; the actual data lives in persistent://t/n/x-partition-0persistent://t/n/x-partition-(N-1), each with its own managed ledger. The v4 client's partitioned producer routes each message to a partition by signSafeMod(murmurHash3_32(key), N).

V5 segment routing

A scalable topic's hash space is [0x0000, 0xFFFF]. Each active segment owns a contiguous sub-range. The V5 producer routes a message with key K to the segment whose range contains murmurHash3_32(K) & 0xFFFF. Segments split into two halves; halves can later be merged back. The routing function is range-based, not modulo-based.

This difference matters for partitioned-topic migration: the migrated layout's initial segments line up 1:1 with the partitions, but their hash-range routing wouldn't agree with v4's mod-N routing for all keys. The fix is described in Routing across migration below.


Goals

In Scope

  • A V5 SDK that operates against existing regular topics (partitioned or non-partitioned) without requiring source changes from the application.
  • An admin command — admin.scalableTopics().migrateToScalable(topic) — that converts an existing regular topic into a scalable topic in a single atomic step, with no data copy and no cursor migration.
  • A V5 SDK resolution rule that picks between the scalable code path and a v4-compatible fallback based on the input topic name and broker-side state, with a strict "once scalable, always scalable" guarantee.
  • A broker-side guard that prevents new v4 clients from accidentally writing to (or auto-creating) a regular topic whose name has already been claimed by a scalable topic.
  • Preservation of per-key ordering across the migration moment, via the same drain-before-assign protocol the controller uses for segment splits: old partitions become sealed parent segments and the subscription controller drains them before activating consumers on the new children.
  • Rejection of non-persistent:// topics in V5 builders. Non-persistent topics are out of scope for V5 entirely.

Out of Scope

  • Cross-cluster (geo-replication) migration coordination. A topic migrated in one cluster while still being replicated to another is a separate problem, deferred until geo-replication on scalable topics lands (PIP-460 sub-PIP).
  • Reverse migration (scalable → regular). Not supported; the metadata transition is one-way.
  • Migration triggered by traffic or load thresholds. Migration is operator-initiated only.
  • Per-message format conversion. Messages stay byte-identical on disk before and after migration; only the topic-name surface above the managed ledger changes.

High-Level Design

V5 SDK resolution rule

When a V5 application calls client.newProducer(...).topic(input) (or any consumer builder), the SDK opens a scalable-topic lookup session for the topic — the same long-lived push-based session that PIP-468 defines for topic://... discovery. The lookup session is the single source of truth for what the topic looks like, and how it routes; there is no separate "probe" call and no client-side TTL cache.

The broker responds to the lookup session based on the input form and the topic's current state:

Input formLookup response
topic://t/n/xReal DAG layout (or NotFound if the scalable topic doesn't exist — same as today).
persistent://t/n/xIf scalable metadata exists for the equivalent topic://t/n/x: real DAG layout, with the topic identity promoted to topic://... for all subsequent operations. Otherwise: a synthetic layout that models the regular topic's partitions as legacy segments (see Legacy segments below).
my-topic (or any short / unscoped form)Normalize to persistent://public/default/my-topic then apply the rule above.
non-persistent://...Reject at create() / subscribe() with UnsupportedOperationException. V5 does not support non-persistent topics.

Because the lookup session is push-based, the broker can update the layout in place when the topic is migrated. The V5 SDK already handles layout changes — splits and merges go through the same machinery — so a migration is observed as one more layout-change event: synthetic layout → real DAG. The application sees its Producer<T> / Consumer<T> keep working through the transition; the SDK's existing reconnection logic handles the underlying connection swap internally.

This gives the "once scalable, always scalable" guarantee (Goal 4) for free: once the lookup session has reported a real DAG, future updates can only refine the DAG via splits / merges; there is no "downgrade to synthetic" path the broker exposes.

Legacy segments

A legacy segment is the lookup-session encoding of "this slice of the keyspace is not yet a real segment://... topic — it's the existing persistent://t/n/x[-partition-K] topic instead." It carries the same fields as a regular segment plus a marker that points at the underlying persistent://... name.

The V5 SDK's per-segment producer and consumer infrastructure already attaches to a topic name; the only difference for a legacy segment is that the name uses the persistent:// domain instead of segment://. No separate code path, no separate wrapper class hierarchy.

For routing, the layout carries the routing function as data:

  • Synthetic layout for an N-partitioned regular topic: N legacy segments, one per partition; routing is signSafeMod(murmurHash3_32(key), N) — exactly v4's partitioned-producer routing. Producers route the same way the v4 SDK would, ensuring per-key ordering during the migration window.
  • Synthetic layout for a non-partitioned regular topic: 1 legacy segment covering the full hash range; routing is trivial (no key matters).
  • Real DAG (post-migration or natively scalable): range-based routing — the standard scalable-topic semantics. Producers route by hash range; per-key ordering across the migration boundary is preserved by the controller's drain-before-assign protocol (see Routing across migration below).

Migration protocol (operator's view)

Pre-migration (steady state):
  • Topic exists as persistent://t/n/x (with N partitions, possibly N=0)
  • Some clients are still on the v4 SDK; others have been upgraded to V5.
  • V5 clients see a SYNTHETIC layout from the lookup session: N special
    segments (or 1 for non-partitioned) pointing at the existing
    persistent://...-partition-K topics, with mod-N routing.

Step 1 — Operator upgrades all clients to the V5 SDK.
  Step 2 enforces this by default: the migration command inspects the
  topic's attached producers/consumers and fails with HTTP 409 if any
  *legacy v4* connection remains. V5 connections are recognised by a
  reserved metadata marker the V5 SDK sets on every per-segment v4
  producer/consumer it creates, so a topic served only by upgraded V5
  clients passes the check even though those clients are attached to the
  underlying persistent:// partitions via the synthetic layout. The
  operator can override the check with --force (e.g. to evict a stale v4
  client). Old code keeps working unchanged before migration because the
  synthetic layout exposes the existing persistent topics through the V5
  surface; routing is mod-N so per-key destinations are identical to v4
  partitioned-topic routing.

Step 2 — Operator runs:
    pulsar-admin scalable-topics migrate persistent://t/n/x [--force]

  The broker:
   2a. Validates that the topic exists and is not already scalable (else
       404 / 409). Unless --force is set, validates that no legacy v4
       producer/consumer connection is attached (counting only connections
       without the V5-managed marker); if any are, fails with HTTP 409.
   2b. Builds ScalableTopicMetadata with:
        • N sealed legacy parent segments (or 1 for non-partitioned), each
          wrapping the existing persistent://t/n/x-partition-K (or
          persistent://t/n/x for non-partitioned). The managed ledgers are
          unchanged; no data copy. Each parent spans the full hash range
          (v4 mod-N routing scattered keys for any child range across every
          partition).
        • N active child segments (or 1) with equal-width contiguous hash
          ranges covering [0x0000, 0xFFFF], using standard range-based
          routing. Each child has all N parents as predecessors in the DAG.
       New writes route to children; subscriptions drain the parents
       before consuming from children — the same drain-before-assign
       protocol the controller already uses for segment splits / merges.
   2c. Creates the backing segment topics for the new active children
       (the sealed parents reuse the existing persistent:// managed ledgers).
   2d. Atomically writes the metadata, taking the topic from
       "regular" to "scalable" in one CAS on the metadata store. This is
       the commit point; connected lookup sessions are pushed the real DAG
       via the metadata-store watch.
   2e. Terminates the old persistent:// topic(s) so no further v4 writes
       can land — they become the drainable sealed parent segments.

Step 3 — Connected V5 clients receive the layout-update push on their
  lookup session. Synthetic layout → real DAG. The SDK's existing
  layout-change handling (used for split / merge) drives any internal
  reconnection. Application-visible behaviour: nothing changes.

Routing across migration

The synthetic layout (pre-migration) routes mod-N so that V5 producers write to the same partitions v4 producers would. The post-migration real DAG routes by hash range — the standard scalable-topic semantics. The two routings are not equivalent: a given key's destination segment can change at the migration moment.

Per-key ordering is preserved by the existing scalable-topic drain-before-assign protocol, the same one the subscription controller already uses for splits and merges:

  • Each old partition becomes a sealed parent segment in the new DAG.
  • N new active child segments are created alongside, with equal-width contiguous hash ranges; every child has all N parents as predecessors.
  • The subscription controller fully drains the parents before assigning their children to a consumer. By the time a consumer first sees a message from a child, every message previously published to those parents (including any with the same key) has already been delivered.

Producers route to children using standard range-based routing immediately after the migration commit; their writes land in the new segments while the parents are draining behind them. No special routing flag is needed: the migration reuses the same machinery that handles splits, with the parent fan-in (each child has N parents instead of 1) being the only structural difference.

For non-partitioned topics (N=1) the same protocol applies trivially: one sealed parent and one active child covering the full hash range.


Detailed Design

1. V5 SDK changes

The SDK reuses its existing scalable code path — the lookup session, per-segment producer / consumer infrastructure, layout-change handler — for every topic, including regular ones. The only new bits are: how the lookup session is opened from persistent:// / short-form input, how the SDK interprets a "legacy segment" entry in the layout, and how it carries the routing function carried by the layout.

1.1 Lookup session opens for any input form

PulsarClientV5 opens a lookup session for the user's topic regardless of domain. The existing scalable-topic lookup-session machinery is extended so the broker accepts persistent://... and short-form names in addition to topic://.... The session response carries:

  • The promoted topic identity (always topic://t/n/x after normalization).
  • The current layout — either a real DAG or a synthetic layout (see Legacy segments above).
  • The routing function (mod-N for the synthetic layout, range-based for the real DAG).

non-persistent://... inputs are rejected at the V5 builder before the lookup is opened, with UnsupportedOperationException.

1.2 Special-segment handling in the per-segment infrastructure

A regular Segment carries a segment://... URI. A legacy segment carries a persistent://... URI instead, plus a flag indicating it's special. The SDK's per-segment producer (PerSegmentProducer) and per-segment consumer (PerSegmentConsumer) attach to whatever URI the segment carries — the v4 producer / consumer beneath them already accepts persistent:// and segment:// alike. No separate adapter classes are introduced.

V5-specific features that don't apply on a regular topic surface as ordinary "this layout doesn't support that" errors at the API surface:

  • CheckpointConsumer requires a sealed-segment history; on a synthetic layout there is none, so subscribe fails with a clear error pointing at the migration command.
  • splitSegment / mergeSegments admin operations on a synthetic layout fail at the broker with the same error class — the operations require real ScalableTopicMetadata.
  • topic://...-domain DLQ targets work transparently for both layouts because the DLQ is its own scalable topic; nothing about it depends on the source topic's layout shape.

1.3 Layout-change handling drives the migration transition

Layout updates pushed by the lookup session already trigger the SDK's per-segment reconcile: segments that disappeared get their per-segment producers / consumers torn down; new segments get fresh ones; segments whose URI changed get rebuilt on the new URI.

A migration is exactly this: the legacy segment with URI persistent://t/n/x-partition-K is replaced in the new layout with a real segment whose URI is segment://t/n/x/<descriptor> — and that segment://... resolves to the same managed ledger. The reconciler tears down the per-segment v4 producer/consumer attached to the persistent:// URI and reattaches one to the segment:// URI; from the application's perspective the SDK's internal reconnect happens (as it does for any layout change) but the Producer<T> / Consumer<T> reference and the publish/receive flow are unaffected.

There is no TopicMigratedException exposed to the application by default. Applications that want to observe migrations can subscribe to the lookup session's layout-change events directly via a future hook — out of scope for this PIP.

2. Migration command

2.1 Admin REST endpoint

POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate — authorized via a dedicated TopicOperation.MIGRATE_TO_SCALABLE operation, which the default authorization provider maps to PRODUCE permission on the topic (the same permission needed to write to it in the first place). Exposing it as its own operation lets third-party RBAC implementations restrict migration independently of produce. Migration is irreversible but does not destroy data, so the blast radius is bounded by what a write-permissioned user can already do; super-user is not required.

Query parameter: force (boolean, default false).

No request body.

Unless force=true, the broker enumerates the producers/consumers attached to the source topic (across all partitions, via the topic-stats path) and rejects the migration with HTTP 409 if any legacy v4 connection is present. A connection is recognised as V5-managed — and therefore excluded — by a reserved metadata marker the V5 SDK sets on every per-segment v4 producer/consumer it creates; everything else counts as a legacy v4 connection. force=true skips this check.

Response: 204 No Content on success. 409 Conflict if scalable metadata already exists or legacy v4 connections are still attached (and force is not set). 404 if the topic doesn't exist.

2.2 Admin client

java
package org.apache.pulsar.client.admin;

public interface ScalableTopics {
    /** Migrate an existing partitioned or non-partitioned topic to a scalable topic. */
    void migrateToScalable(String topic) throws PulsarAdminException;            // force = false
    void migrateToScalable(String topic, boolean force) throws PulsarAdminException;
    CompletableFuture<Void> migrateToScalableAsync(String topic);               // force = false
    CompletableFuture<Void> migrateToScalableAsync(String topic, boolean force);
}

2.3 CLI

pulsar-admin scalable-topics migrate persistent://t/n/x [--force]

2.4 Broker-side migration steps

Executed by the topic's owning broker:

  1. Precheck:
    • No ScalableTopicMetadata already exists at the path. If it does, fail 409.
    • Topic exists (as either partitioned or non-partitioned). If not, fail 404.
    • Unless force, no legacy v4 connection is attached (counting only producers/consumers whose metadata lacks the V5-managed marker). If any are, fail 409.
  2. Build initial layoutScalableTopicController.createMigratedMetadata:
    • N sealed legacy parent segments (or 1 for non-partitioned), each wrapping persistent://t/n/x-partition-K (or persistent://t/n/x). Each parent spans the full hash range.
    • N active child segments (or 1) with equal-width contiguous hash ranges covering [0x0000, 0xFFFF]. Routing is range-based.
    • DAG edges: every child has all N parents as predecessors. The subscription controller's drain-before-assign protocol (already used for splits / merges) preserves per-key ordering.
    • Set epoch = 0, nextSegmentId = 2N.
  3. Create child segment topics: materialize the backing topic for each new active child. Sealed legacy parents reuse the existing persistent:// managed ledgers, so they are skipped.
  4. Atomic flip: write ScalableTopicMetadata to /topics/t/n/x via metadata-store CAS. This is the commit point — once it succeeds, the topic is scalable. Connected lookup sessions transition from the synthetic layout to the real DAG via the metadata-store watch.
  5. Terminate the old topic(s): terminate persistent://t/n/x (or every partition, for a partitioned source) so no further v4 writes can land. Termination is exactly the sealed-parent state — existing data is still drainable by consumers, and the sealed legacy parent segments reference these terminated topics.

Failures before step 4 leave the topic untouched; failures after step 4 leave the topic scalable.

3. v4 write guard

Step 5 above (terminating the old topics) is the primary guard: an already-connected v4 producer's send() fails with TopicTerminatedError, and a freshly-connected one hits the same wall. This keeps stale v4 producers from writing to a topic that has been migrated.

To make the "once scalable, always scalable" guarantee robust even after the old (terminated, drained) topics are eventually removed by retention, the broker also refuses to auto-create a persistent://t/n/x topic when ScalableTopicMetadata exists for the equivalent topic://t/n/x. This prevents a stray v4 client from recreating a regular topic that would shadow the scalable one. The rejection reuses TopicTerminatedError (binary) / the equivalent REST status; no new error code is introduced. V5 clients are unaffected — they always use the scalable lookup session.

4. migratedFrom on ScalableTopicMetadata

An optional informational field migratedFrom: TopicMigrationOrigin? records pre-migration metadata (partition count, original persistent:// name) for debugging, metrics labelling, and future tooling. Not consulted on hot paths.

No legacyModNRouting flag or other migration-specific routing knob is needed: the post-migration real DAG uses standard range-based routing and per-key ordering is preserved by the controller's existing drain-before-assign protocol (see Routing across migration).


Public-facing changes

REST API

New endpoint:

  • POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate — requires produce permission on the topic; query param force (default false); no request body; 204 on success, 409 if already scalable or legacy v4 connections are still attached (and force is not set), 404 if topic missing.

Modified behaviour:

  • Auto-creation of a persistent://t/n/x topic is refused when scalable metadata exists for the equivalent topic://t/n/x (reusing TopicTerminatedError); together with the post-migration termination of the old topics, this keeps v4 clients off a migrated topic.

Binary protocol

  • The existing scalable-topic lookup-session command (PIP-468) is extended to accept persistent://... and short-form names in addition to topic://.... For non-scalable topics it returns a synthetic layout with legacy-segment entries.
  • No new error code: the v4 write guard reuses the existing TopicTerminatedError.

CLI

  • pulsar-admin scalable-topics migrate <topic> [--force]

V5 SDK behavior

  • V5 builders accept persistent://... and short-form names; the SDK opens a scalable-topic lookup session that the broker resolves to either a real DAG or a synthetic layout.
  • V5 builders reject non-persistent://... with UnsupportedOperationException.
  • Migration is observed as a layout-change push on the lookup session; no new SDK exception is exposed to the application.

Backward & forward compatibility

Upgrade

The upgrade story this PIP supports is:

  1. Upgrade brokers to the version containing this PIP. Brokers handle both old persistent:// clients and new topic:// clients side by side; no behavior change for existing topics.
  2. Upgrade applications to the V5 SDK at the operator's pace. No topic changes required; V5 SDK uses the wrapper path.
  3. Once all applications on a given topic are V5, run migrate. The migration is atomic and one-way.

Old client versions continue to work with the cluster on un-migrated topics.

Downgrade / Rollback

  • A broker downgrade before any topic has been migrated is fully supported.
  • A broker downgrade after a topic has been migrated is not supported: older brokers don't understand the scalable-metadata layout. Recovery would require restoring the metadata store from backup. The migration command should be treated as a one-way commit.
  • A V5 SDK client can be downgraded back to v4 only if the topic was never migrated. Once migrated, the topic only speaks the scalable protocol.

Geo-replication

Out of scope; see Goals: Out of Scope.


Alternatives

Alt 1: Probe-and-wrapper in the SDK (rejected)

An earlier draft of this PIP had the V5 SDK probe via admin.scalableTopics().getMetadataAsync(topic) at every builder call, cache the verdict with a TTL, and use a separate v4-wrapper code path for regular topics. Migration would be observed by connected clients as TopicTerminatedException from the v4 wrapper, which the SDK would catch and use as the cue to re-probe and rebuild on the scalable path.

Rejected in favour of the lookup-session approach because:

  • Two SDK code paths (scalable + v4 wrapper) would have to be maintained and tested in parallel.
  • The lookup session is push-based; the probe approach forces a TTL trade-off (long TTL = slow to notice migration; short TTL = constant load).
  • Migration was visible to applications as a TopicMigratedException on receive futures, however brief; the lookup-session approach makes it strictly an internal SDK reconnect, with no application-visible signal at all.
  • The hash-routing equivalence problem (Routing across migration) had to be papered over with a documented constraint; with the lookup session carrying the routing function as data, the synthetic layout's mod-N routing and the real DAG's range-based routing coexist naturally, with per-key ordering preserved by the controller's drain-before-assign protocol.

Alt 2: Migration via per-message data copy

The migration command would create a new scalable topic alongside the regular one and stream all retained data through. Producers and consumers would cut over after the copy completes.

Rejected because:

  • The metadata-flip approach in this PIP achieves the same result with no data movement.
  • A retained topic with months of data could take hours to copy, during which producers and consumers would have no clear cut-over moment.

General Notes

  • migratedFrom is an optional informational field on ScalableTopicMetadata (partition count, original persistent:// name) for debugging and metrics labelling; not consulted on hot paths.