internal/impl/redpanda/migrator/README.md
Comprehensive data migration system for Apache Kafka and Redpanda clusters, coordinating topics, schemas, and consumer groups.
The unified migrator orchestrates three specialized migrators working in concert to provide complete cluster-to-cluster migration.
classDiagram
class MigratorInput {
<<BatchInput>>
+Connect()
+ReadBatch()
}
class MigratorOutput {
<<BatchOutput>>
+Connect()
+WriteBatch()
}
class Migrator {
+topicMigrator topic
+schemaRegistryMigrator sr
+groupsMigrator groups
+messageBatchToFranzRecords()
-onInputConnected()
-onOutputConnected()
}
class topicMigrator {
+TopicMigratorConfig conf
+SyncOnce()
+Sync()
+CreateTopicIfNeeded()
+SyncACLs()
-knownTopics map
}
class schemaRegistryMigrator {
+SchemaRegistryMigratorConfig conf
+Sync()
+SyncLoop()
+DestinationSchemaID()
-knownSubjects map
-knownSchemas map
}
class groupsMigrator {
+GroupsMigratorConfig conf
+Sync()
+SyncLoop()
+ListGroupOffsets()
-translateOffset()
-tryFindExactOffset()
-commitedOffsets map
}
class KadmClient {
<<franz-go>>
}
class SrClient {
<<franz-go>>
}
class KgoClient {
<<franz-go>>
}
MigratorInput --> Migrator : uses
MigratorOutput --> Migrator : uses
Migrator *-- topicMigrator : contains
Migrator *-- schemaRegistryMigrator : contains
Migrator *-- groupsMigrator : contains
topicMigrator --> KadmClient : src/dst admin
schemaRegistryMigrator --> SrClient : src/dst SR
groupsMigrator --> KadmClient : src/dst admin
groupsMigrator --> KgoClient : src/dst client
Migrator - Central coordinator
topicMigrator - Topic infrastructure
schemaRegistryMigrator - Schema synchronization
groupsMigrator - Consumer group offset translation
How input messages are transformed into franz-go records for destination cluster.
flowchart TD
A[service.Message] --> B{Extract Metadata}
B --> C[kafka_key]
B --> D[kafka_value]
B --> E[kafka_topic]
B --> F[kafka_partition]
B --> G[kafka_timestamp_ms]
B --> H[kafka_offset]
B --> I[kafka_headers]
C --> J[kgo.Record.Key]
D --> K{Schema ID?}
K -->|Yes| L[Parse Schema ID]
L --> M[Translate ID]
M --> N[Update Schema ID]
N --> O[kgo.Record.Value]
K -->|No| O
E --> P[Resolve Destination Topic]
P --> Q{Topic Exists?}
Q -->|No| R[Create Topic]
R --> S[kgo.Record.Topic]
Q -->|Yes| S
F --> T[kgo.Record.Partition]
G --> U[kgo.Record.Timestamp]
I --> V[Extract Headers]
H --> W{Groups Enabled?}
W -->|Yes| X[Add Offset Header]
W -->|No| Y[Skip]
X --> Z[kgo.Record.Headers]
V --> Z
AA{Provenance Header?} -->|Enabled| AB[Add Source Cluster ID]
AA -->|Disabled| AC[Skip]
AB --> Z
AC --> Z
J --> AD[kgo.Record]
O --> AD
S --> AD
T --> AD
U --> AD
Z --> AD
AD --> AE[Write to Destination]
translate_ids: true, source schema IDs are mapped to destination IDs via schema registry lookupTopic creation and synchronization sequence.
sequenceDiagram
participant M as Migrator
participant TM as topicMigrator
participant SrcAdm as Source Admin
participant DstAdm as Dest Admin
M->>TM: Sync(srcAdm, dstAdm, getTopics)
TM->>TM: getTopics()
loop For each topic
TM->>TM: Check knownTopics cache
alt Topic cached
TM-->>M: Skip (already created)
else Topic not cached
TM->>TM: resolveTopic(srcTopic)
Note over TM: Apply name interpolation
TM->>SrcAdm: ListTopics(srcTopic)
SrcAdm-->>TM: TopicDetail (partitions, RF)
TM->>SrcAdm: DescribeTopicConfigs(srcTopic)
SrcAdm-->>TM: ResourceConfig
TM->>TM: Filter supported configs
Note over TM: Serverless-aware subset
TM->>DstAdm: CreateTopic(dstTopic, partitions, RF, configs)
alt Topic exists
DstAdm-->>TM: TopicAlreadyExists
TM->>DstAdm: ListTopics(dstTopic)
DstAdm-->>TM: TopicDetail
alt Partition mismatch (src > dst)
TM->>DstAdm: CreatePartitions(dstTopic, delta)
DstAdm-->>TM: Success
else Partition mismatch (dst > src)
Note over TM: Log warning, use dst count
end
else Topic created
DstAdm-->>TM: Success
TM->>TM: Record metrics
end
opt SyncACLs enabled
TM->>SrcAdm: DescribeACLs(srcTopic)
SrcAdm-->>TM: ACL list
TM->>TM: Filter & transform ACLs
Note over TM: Exclude WRITE, downgrade ALL→READ
TM->>DstAdm: CreateACLs(dstTopic, transformedACLs)
DstAdm-->>TM: Success
end
TM->>TM: Cache topic mapping
end
end
TM-->>M: Sync complete
Schema and compatibility synchronization sequence.
sequenceDiagram
participant M as Migrator
participant SR as schemaRegistryMigrator
participant SrcSR as Source SR
participant DstSR as Dest SR
M->>SR: Sync(ctx)
SR->>DstSR: GetMode()
DstSR-->>SR: READWRITE or IMPORT
Note over SR: Validate mode
SR->>SrcSR: Subjects(ctx, includeDeleted)
SrcSR-->>SR: Subject list
SR->>SR: Filter subjects (include/exclude regex)
loop For each subject
SR->>SrcSR: Versions(ctx, subject)
SrcSR-->>SR: Version list
alt Versions == "latest"
SR->>SR: Keep only latest version
else Versions == "all"
SR->>SR: Keep all versions
end
loop For each version
SR->>SR: Check knownSubjects cache
alt Schema cached
SR-->>M: Skip (already synced)
else Schema not cached
SR->>SrcSR: SchemaByVersion(ctx, subject, version)
SrcSR-->>SR: SubjectSchema
SR->>SR: resolveSubject(subject, version)
Note over SR: Apply name interpolation
opt Serverless mode
SR->>SR: Strip metadata & rule sets
end
alt TranslateIDs enabled
SR->>DstSR: CreateSchema(dstSubject, schema)
Note over SR: Destination assigns new ID
DstSR-->>SR: SubjectSchema (new ID)
else Fixed IDs
SR->>DstSR: CreateSchemaWithIDAndVersion(dstSubject, schema, srcID, srcVersion)
Note over SR: Preserve source ID & version
DstSR-->>SR: SubjectSchema (same ID)
end
SR->>SR: Record metrics
SR->>SR: Cache schema mapping
SR->>SrcSR: GetCompatibility(subject)
SrcSR-->>SR: Compatibility level
alt Compatibility explicitly set
SR->>DstSR: UpdateCompatibility(dstSubject, level)
DstSR-->>SR: Success
SR->>SR: Record metrics
else Global compatibility
Note over SR: Skip (don't force global mode)
end
end
end
end
SR-->>M: Sync complete
interval settingConsumer group offset translation and commit sequence.
sequenceDiagram
participant M as Migrator
participant GM as groupsMigrator
participant SrcAdm as Source Admin
participant DstAdm as Dest Admin
participant SrcCl as Source Client
participant DstCl as Dest Client
M->>GM: Sync(ctx, getTopics)
GM->>GM: getTopics()
GM->>GM: filterTopics(mappings)
GM->>SrcAdm: ListGroups(ctx)
SrcAdm-->>GM: Group list with states
GM->>GM: Filter groups (include/exclude regex)
GM->>GM: Filter by state (Empty or not Dead)
GM->>SrcAdm: FetchManyOffsets(ctx, groups)
SrcAdm-->>GM: Group offsets
GM->>GM: Filter groups with no offsets for topics
GM->>SrcAdm: ListStartOffsets(ctx, topics)
SrcAdm-->>GM: Topic start offsets
GM->>SrcAdm: ListEndOffsets(ctx, topics)
SrcAdm-->>GM: Topic end offsets
GM->>DstAdm: ListEndOffsets(ctx, dstTopics)
DstAdm-->>GM: Dest topic end offsets
par Translate offsets in parallel
loop For each group offset
GM->>GM: Check commitedOffsets cache
alt Offset cached
GM-->>GM: Skip (already committed)
else Offset not cached
GM->>GM: Validate partition counts match
alt Partition mismatch
Note over GM: Log error, skip partition
else Partitions match
GM->>SrcCl: Fetch(ctx, topic, partition, offset-1)
Note over GM: Read previous record
SrcCl-->>GM: Record with timestamp
GM->>DstAdm: ListOffsetsAfterMilli(ctx, dstTopic, partition, timestamp)
Note over GM: Find offset after timestamp
DstAdm-->>GM: Approximate offset (o1)
opt Exact offset refinement
GM->>GM: tryFindExactOffset(dstTopic, partition, srcOffset, endOffset, o1)
loop Max 5 attempts
GM->>DstCl: Fetch(ctx, dstTopic, partition, o1)
DstCl-->>GM: Record with offset header
GM->>GM: Decode offset header
GM->>GM: Calculate delta = srcOffset - headerOffset
alt Delta == 0
GM-->>GM: Exact offset found
else Delta != 0
GM->>GM: Adjust o1 += delta
Note over GM: Retry with adjusted offset
end
end
end
GM->>GM: Record metrics (translation)
end
end
end
end
GM->>GM: Group translated offsets by group
par Commit offsets in parallel
loop For each group
GM->>DstAdm: CommitOffsets(ctx, group, offsets)
DstAdm-->>GM: Success
GM->>GM: Record metrics (commit)
GM->>GM: Cache committed offsets
end
end
GM-->>M: Sync complete
interval settingListOffsetsAfterMilli for approximate offsetsrcOffset - 1 to get timestampListOffsetsAfterMilli to find offset after timestampdelta = srcOffset - embeddedOffset, adjust by deltaschema_registry.interval (if > 0)consumer_groups.interval (if > 0)input:
redpanda_migrator:
seed_brokers: ["source:9092"]
topics: ["orders", "payments"]
consumer_group: "migration"
output:
redpanda_migrator:
seed_brokers: ["destination:9092"]
topic: ${! @kafka_topic } # Preserve names
output:
redpanda_migrator:
topic: prod_${! @kafka_topic } # Add prefix
output:
redpanda_migrator:
schema_registry:
url: "http://dest-registry:8081"
translate_ids: true # Create-or-reuse mode
versions: all # Migrate all versions
output:
redpanda_migrator:
consumer_groups:
interval: 1m
include: ["app-.*"] # Only app- prefixed groups
exclude: ["migration"] # Exclude migrator itself
only_empty: true # Only Empty state groups
output:
redpanda_migrator:
serverless: true # Restrict configs to serverless subset
schema_registry:
url: "https://serverless.redpanda.com:8081"
translate_ids: true
redpanda_migrator_topics_created_total - Topics successfully createdredpanda_migrator_topic_create_errors_total - Topic creation failuresredpanda_migrator_topic_create_latency_ns - Topic creation latencyredpanda_migrator_sr_schemas_created_total - Schemas successfully createdredpanda_migrator_sr_schema_create_errors_total - Schema creation failuresredpanda_migrator_sr_schema_create_latency_ns - Schema creation latencyredpanda_migrator_sr_compatibility_updates_total - Compatibility updates appliedredpanda_migrator_sr_compatibility_update_errors_total - Compatibility update failuresredpanda_migrator_sr_compatibility_update_latency_ns - Compatibility update latencyPer-group metrics with group label:
redpanda_migrator_cg_offsets_translated_total - Offsets successfully translatedredpanda_migrator_cg_offset_translation_errors_total - Offset translation failuresredpanda_migrator_cg_offset_translation_latency_ns - Offset translation latencyredpanda_migrator_cg_offsets_committed_total - Offsets successfully committedredpanda_migrator_cg_offset_commit_errors_total - Offset commit failuresredpanda_migrator_cg_offset_commit_latency_ns - Offset commit latencyPer-partition metrics with topic and partition labels:
redpanda_lag - Current consumer lag in messagesProvenance headers prevent circular migration:
output:
redpanda_migrator:
provenance_header: "redpanda-migrator-provenance" # Default
Records with provenance header matching destination cluster ID are skipped.
Safe ACL transforms for read-only migration:
output:
redpanda_migrator:
sync_topic_acls: true
ALLOW WRITE entriesALLOW ALL to ALLOW READNormalize schemas on create for consistency:
output:
redpanda_migrator:
schema_registry:
normalize: true
Embedded offset headers enable exact consumer group parking:
tryFindExactOffset to refine timestamp-based translationThe migrator has comprehensive test coverage across unit, integration, and soak test categories.
migrator/
├── *_test.go # Unit tests
├── *_integration_test.go # Integration tests
└── integration_soak_test.go # Long-running soak test
Configuration & Validation - migrator_test.go
Data Conversion - conv_test.go
Schema Registry - migrator_schema_registry_test.go
Consumer Groups - migrator_groups_test.go
End-to-End Migration - integration_test.go
Topic Migration - migrator_topic_integration_test.go
Schema Registry Migration - migrator_schema_registry_integration_test.go
Consumer Groups Migration - migrator_groups_integration_test.go
Long-Running Stability - integration_soak_test.go
Embedded Clusters - integration_helpers_test.go
Test Characteristics
assert.EventuallyCritical Paths Tested
Edge Cases Covered
knownTopics map prevents redundant creation attemptsknownSubjects and knownSchemas maps prevent redundant schema operationscommitedOffsets map prevents offset rewind