Back to Pulsar

PIP-454: Metadata Store Migration Framework

pip/pip-454.md

4.2.119.5 KB
Original Source

PIP-454: Metadata Store Migration Framework

Motivation

Apache Pulsar currently uses Apache ZooKeeper as its metadata store for broker coordination, topic metadata, namespace policies, and BookKeeper ledger management. While ZooKeeper has served well, there are several motivations for enabling migration to alternative metadata stores:

  1. Operational Simplicity: Alternative metadata stores like Oxia may offer simpler operations, better observability, or reduced operational overhead compared to ZooKeeper ensembles.

  2. Performance Characteristics: Different metadata stores have different performance profiles. Some workloads may benefit from stores optimized for high throughput or low latency.

  3. Deployment Flexibility: Organizations may prefer metadata stores that align better with their existing infrastructure and expertise.

  4. Zero-Downtime Migration: Operators need a safe, automated way to migrate metadata between stores without service interruption.

Currently, there is no supported path to migrate from one metadata store to another without cluster downtime. This PIP proposes a safe, simple migration framework that ensures metadata consistency by avoiding complex dual-write/dual-read patterns. The framework enables:

  • Zero-downtime migration from any metadata store to any other supported store
  • Automatic ephemeral node recreation in the target store
  • Version preservation to ensure conditional writes continue working
  • Automatic failure recovery if issues are detected
  • Minimal configuration changes - no config updates needed until after migration completes

Goal

Provide a safe, automated framework for migrating Apache Pulsar's metadata from one store implementation (e.g., ZooKeeper) to another (e.g., Oxia) with zero service interruption.

In Scope

  • Migration framework supporting any source → any target metadata store
  • Automatic ephemeral node recreation by brokers and bookies
  • Persistent data copy with version preservation
  • CLI commands for migration control and monitoring
  • Automatic failure recovery during migration
  • Support for broker and bookie participation
  • Read-only mode during migration for consistency

Out of Scope

  • Developing new metadata store implementations (Oxia, Etcd support already exists)
  • Cross-cluster metadata synchronization (different use case)
  • Automated rollback after COMPLETED phase (requires manual intervention)
  • Migration of configuration metadata store and geo-replicated clusters (can be done separately)

High Level Design

The migration framework introduces a DualMetadataStore wrapper that transparently handles migration without modifying existing metadata store implementations.

Key Principles

  1. Transparent Wrapping: The DualMetadataStore wraps the existing source store (e.g., ZKMetadataStore) without modifying its implementation.

  2. Lazy Target Initialization: The target store is only initialized when migration begins, triggered by a flag in the source store.

  3. Ephemeral-First Approach: Before copying persistent data, all brokers and bookies recreate their ephemeral nodes in the target store. This ensures the cluster is "live" in both stores during migration.

  4. Read-Only Mode During Migration: To ensure consistency, all metadata writes are blocked during PREPARATION and COPYING phases. Components receive SessionLost events to defer non-critical operations (e.g., ledger rollovers).

  5. Phase-Based Migration: Migration proceeds through well-defined phases (PREPARATION → COPYING → COMPLETED).

  6. Generic Framework: The framework is agnostic to specific store implementations - it works with any source and target that implement the MetadataStore interface.

  7. Guaranteed Consistency: By blocking writes during migration and using atomic copy, metadata is always in a consistent state. No dual-write complexity, no data divergence, no consistency issues.

Detailed Design

Migration Phases

NOT_STARTED
     ↓
PREPARATION ← All brokers/bookies recreate ephemeral nodes in target
             ← Metadata writes are BLOCKED (read-only mode)
     ↓
COPYING ← Coordinator copies persistent data source → target
         ← Metadata writes still BLOCKED
     ↓
COMPLETED ← Migration complete, all services using target store
          ← Metadata writes ENABLED on target
     ↓
After validation period:
 * Update config and restart brokers & bookies 
 * Decommission source store

(If errors occur):
FAILED ← Rollback to source store, writes ENABLED

Phase 1: NOT_STARTED → PREPARATION

Participant Registration (at startup): Each broker and bookie registers itself as a migration participant by creating a sequential ephemeral node:

  • Path: /pulsar/migration-coordinator/participants/id-NNNN (sequential)
  • This allows the coordinator to know how many participants exist before migration starts

Administrator triggers migration:

bash
pulsar-admin metadata-migration start --target oxia://oxia1:6648

Coordinator actions:

  1. Creates migration flag in source store: /pulsar/migration-coordinator/migration
    json
    {
      "phase": "PREPARATION",
      "targetUrl": "oxia://oxia1:6648"
    }
    

Broker/Bookie actions (automatic, triggered by watching the flag):

  1. Detect migration flag via watch on /pulsar/migration-coordinator/migration
  2. Defer non-critical metadata writes (e.g., ledger rollovers, bundle ownership changes)
  3. Initialize connection to target store
  4. Recreate ALL ephemeral nodes in target store
  5. Delete participant registration node to signal "ready"

Coordinator waits for all participant nodes to be deleted (indicating all participants are ready)

Phase 2: PREPARATION → COPYING

Coordinator actions:

  1. Updates phase to COPYING
  2. Performs recursive copy of persistent data from source → target:
    • Skips ephemeral nodes (already recreated)
    • Concurrent operations limited by semaphore (default: 1000 pending ops)
    • Breadth-first traversal to process all paths
    • Progress logged periodically

During this phase:

  • Brokers/bookies continue normal READ operations
  • Metadata WRITES are BLOCKED (return failure)
  • Ephemeral nodes remain alive in both stores
  • All reads still go to source store

During this phase:

  • Metadata writes are BLOCKED (return error to clients)
  • Metadata reads continue normally from source store
  • Data plane operations unaffected: Publish/consume, ledger writes continue normally
  • Version-id and modification count preserved using direct Oxia client
  • Breadth-first traversal with max 1000 concurrent operations

Estimated duration:

  • < 30 seconds for typical deployments with up to 500 MB of metadata in ZooKeeper

Impact on operations:

  • ✅ Existing topics: Publish and consume continue without interruption
  • ✅ BookKeeper: Ledger writes and reads continue normally
  • ✅ Clients: Connected producers and consumers unaffected
  • ❌ Admin operations: Topic/namespace creation blocked temporarily
  • ❌ Bundle operations: Load balancing deferred until completion

Phase 3: COPYING → COMPLETED

Coordinator actions:

  1. Updates phase to COMPLETED
  2. Logs success message with total copied node count

Broker/Bookie actions (automatic, triggered by phase update):

  1. Detect COMPLETED phase
  2. Deferred operations can now proceed
  3. Switch routing:
    • Writes: Go to target store only
    • Reads: Go to target store only

At this point:

  • Cluster is running on target store
  • Source store remains available for safety
  • Metadata writes are enabled again

Operator follow-up (after validation period):

  1. Update configuration files:
    properties
    # Before (ZooKeeper):
    metadataStoreUrl=zk://zk1:2181,zk2:2181/pulsar
    
    # After (Oxia):
    metadataStoreUrl=oxia://oxia1:6648
    
  2. Perform rolling restart with new config
  3. After all services restarted, decommission source store

Failure Handling: ANY_PHASE → FAILED

If migration fails at any point:

  1. Coordinator updates phase to FAILED
  2. Broker/Bookie actions:
    • Detect FAILED phase
    • Discard target store connection
    • Continue using source store
    • Metadata writes enabled again

Operator actions:

  1. Review logs to understand failure cause
  2. Fix underlying issue
  3. Retry migration with pulsar-admin metadata-migration start --target <url>

Implementation Details

Key Implementation Details:

  1. Direct Oxia Client Usage: The coordinator uses AsyncOxiaClient directly instead of going through MetadataStore interface. This allows setting version-id and modification count to match the source values, ensuring conditional writes (compare-and-set operations) continue to work correctly after migration.

  2. Breadth-First Traversal: Processes paths level by level using a work queue, enabling high concurrency while preventing deep recursion.

  3. Concurrent Operations: Uses a semaphore to limit pending operations (default: 1000), balancing throughput with memory usage.

Data Structures

Migration State (/pulsar/migration-coordinator/migration):

json
{
  "phase": "PREPARATION",
  "targetUrl": "oxia://oxia1:6648/default"
}

Fields:

  • phase: Current migration phase (NOT_STARTED, PREPARATION, COPYING, COMPLETED, FAILED)
  • targetUrl: Target metadata store URL (e.g., oxia://oxia1:6648/default)

Participant Registration (/pulsar/migration-coordinator/participants/id-NNNN):

  • Sequential ephemeral node created by each broker/bookie at startup
  • Empty data (presence indicates participation)
  • Deleted by participant when preparation complete (signals "ready")
  • Coordinator waits for all to be deleted before proceeding to COPYING phase

No additional state tracking: The simplified design removes complex state tracking and checksums. Migration state is kept minimal.

CLI Commands

bash
# Start migration
pulsar-admin metadata-migration start --target <target-url>

# Check status
pulsar-admin metadata-migration status

The simplified design only requires two commands. Rollback happens automatically if migration fails (phase transitions to FAILED).

REST API

POST   /admin/v2/metadata/migration/start
       Body: { "targetUrl": "oxia://..." }

GET    /admin/v2/metadata/migration/status
       Returns: { "phase": "COPYING", "targetUrl": "oxia://..." }

Safety Guarantees

Why This Approach is Safe

The migration design guarantees metadata consistency by avoiding dual-write and dual-read patterns entirely:

  1. Single Source of Truth: At any given time, there is exactly ONE active metadata store:

    • Before migration: Source store (ZooKeeper)
    • During PREPARATION and COPYING: Source store (read-only)
    • After COMPLETED: Target store (Oxia)
  2. No Dual-Write Complexity: Unlike approaches that write to both stores simultaneously, this design eliminates:

    • Write synchronization issues
    • Conflict resolution between stores
    • Data divergence problems
    • Partial failure handling complexity
  3. No Dual-Read Complexity: Unlike approaches that read from both stores, this design eliminates:

    • Read consistency issues
    • Cache invalidation across stores
    • Stale data problems
    • Complex fallback logic
  4. Atomic Cutover: All participants switch stores simultaneously when COMPLETED phase is detected. There is no ambiguous state where some participants use one store and others use another.

  5. Fast Migration Window: With < 30 seconds for typical metadata sizes (even up to 500 MB), the read-only window is minimal and acceptable for most production environments.

Bottom line: Metadata is always in a consistent state - either fully in the source store or fully in the target store, never split or diverged between them.

Data Integrity

  1. Version Preservation: All persistent data is copied with original version-id and modification count preserved. This ensures conditional writes (compare-and-set operations) continue working after migration.

  2. Ephemeral Node Recreation: All ephemeral nodes are recreated by their owning brokers/bookies before persistent data copy begins.

  3. Read-Only Mode: All metadata writes are blocked during PREPARATION and COPYING phases, ensuring no data inconsistencies during migration.

    Important: Read-only mode only affects metadata operations. Data plane operations continue normally:

    • Publishing and consuming messages works without interruption
    • Reading from existing topics and subscriptions works normally
    • Ledger writes to BookKeeper continue unaffected
    • Creating new topics or subscriptions will be blocked temporarily
    • Namespace/policy updates will be blocked temporarily
    • Bundle ownership changes will be deferred until migration completes

Operational Safety

  1. No Downtime: Brokers and bookies remain online throughout the migration. Data plane operations (publish/consume) continue without interruption. Only metadata operations are temporarily blocked during the migration phases.

  2. Graceful Failure: If migration fails at any point, phase transitions to FAILED and cluster returns to source store automatically.

  3. Session Events: Components receive SessionLost event during migration to defer non-critical writes (e.g., ledger rollovers), and SessionReestablished when migration completes or fails.

  4. Participant Coordination: Migration waits for all participants to complete preparation before copying data.

Consistency

  1. Atomic Cutover: All participants switch to target store simultaneously when COMPLETED phase is detected.

  2. Ephemeral Session Consistency: Each participant manages its own ephemeral nodes in target store with proper session management.

  3. No Dual-Write Complexity: By blocking writes during migration, we avoid complex dual-write error handling and data divergence issues.

Configuration

No Configuration Changes for Migration

The beauty of this design is that no configuration changes are needed to start migration:

  • Brokers and bookies continue using their existing metadataStoreUrl config
  • The DualMetadataStore wrapper is automatically applied when using ZooKeeper
  • Target URL is provided only when triggering migration via CLI

Post-Migration Configuration

After migration completes and validation period ends, update config files:

properties
# Before migration
metadataStoreUrl=zk://zk1:2181,zk2:2181,zk3:2181/pulsar

# After migration (update and rolling restart)
metadataStoreUrl=oxia://oxia1:6648

Comparison with Kafka's ZooKeeper → KRaft Migration

Apache Kafka faced a similar challenge migrating from ZooKeeper to KRaft (Kafka Raft). Their approach provides useful comparison points:

Kafka's Approach (KIP-866)

Migration Strategy:

  • Dual-mode operation: Kafka brokers run in a hybrid mode where the KRaft controller reads from ZooKeeper
  • Metadata synchronization: KRaft controller actively mirrors metadata from ZooKeeper to KRaft
  • Phased cutover: Operators manually transition from ZK_MIGRATION mode to KRAFT mode
  • Write forwarding: During migration, metadata writes go to ZooKeeper and are replicated to KRaft

Timeline:

  • Migration can take hours or days as metadata is continuously synchronized
  • Requires careful monitoring of lag between ZooKeeper and KRaft
  • Rollback possible until final KRAFT mode is committed

Pulsar's Approach (This PIP)

Migration Strategy:

  • Transparent wrapper: DualMetadataStore wraps existing store without broker code changes
  • Read-only migration: Metadata writes blocked during migration (< 30 seconds for most clusters)
  • Atomic copy: All persistent data copied in one operation with version preservation
  • Single source of truth: No dual-write or dual-read - metadata always consistent
  • Automatic cutover: All participants switch simultaneously when COMPLETED phase detected

Timeline:

  • Migration completes in < 30 seconds for typical deployments (even up to 500 MB metadata)
  • No lag monitoring needed
  • Automatic rollback on failure (FAILED phase)

Key Differences

AspectKafka (ZK → KRaft)Pulsar (ZK → Oxia)
Migration DurationHours to days< 30 seconds (up to 500 MB)
Metadata WritesContinue during migrationBlocked during migration
Data PlaneUnaffectedUnaffected (publish/consume continues)
ApproachContinuous sync + dual-modeAtomic copy + read-only mode
ConsistencyDual-write (eventual consistency)Single source of truth (always consistent)
ComplexityHigh (dual-mode broker logic)Low (transparent wrapper)
Version PreservationNot applicable (different metadata models)Yes (conditional writes preserved)
RollbackManual, complexAutomatic on failure
MonitoringRequires lag trackingSimple phase monitoring

Why Pulsar's Approach Differs

  1. Data Plane Independence: The key insight is that Pulsar's data plane (publish/consume, ledger writes) does not require metadata writes to function. This architectural property allows pausing metadata writes for a brief period (< 30 seconds) without affecting data operations. This is what makes the migration provably safe and consistent, not the metadata size.

  2. Write-Pause Safety: Pausing writes during copy ensures:

    • No dual-write complexity
    • No data divergence between stores
    • No conflict resolution needed
    • Guaranteed consistency

    This works regardless of metadata size - whether 50K nodes or millions of topics. The migration handles large metadata volumes through high concurrency (1000 parallel operations), completing in < 30 seconds even for 500 MB.

  3. Ephemeral Node Handling: Pulsar has significant ephemeral metadata (broker registrations, bundle ownership), making dual-write complex. Read-only mode simplifies this.

  4. Conditional Writes: Pulsar relies heavily on compare-and-set operations. Version preservation ensures these continue working post-migration, which Kafka doesn't need to address.

  5. Architectural Enabler: Pulsar's separation of data plane and metadata plane allows brief metadata write pauses without data plane impact, enabling a simpler, safer migration approach.

Lessons from Kafka's Experience

Pulsar's design incorporates lessons from Kafka's migration:

  • Avoid dual-write complexity: Kafka found dual-mode operation added significant code complexity. Pulsar's read-only approach is simpler and guarantees consistency.
  • Clear phase boundaries: Kafka's migration has unclear "completion" point. Pulsar has explicit COMPLETED phase.
  • Automatic participant coordination: Kafka requires manual broker restarts. Pulsar participants coordinate automatically.
  • Fast migration: < 30 seconds read-only window is acceptable for most production environments
  • Brief write unavailability: Pulsar accepts brief metadata write unavailability (< 30 sec) vs Kafka's continuous operation, but gains guaranteed consistency and simplicity.

References