.claude-plugin/plugins/redpanda-connect/skills/pipeline-assistant/resources/recipes/cdc-replication.md
Pattern: Kafka Patterns - Database CDC Replication Difficulty: Advanced Components: postgres_cdc, sql_raw, switch, batching Use Case: Replicate database changes in real-time using Postgres logical replication to keep databases synchronized
This recipe demonstrates Change Data Capture (CDC) for replicating database changes. It streams changes from a Postgres database using logical replication, groups them by transaction, and applies them to a destination database using MERGE (upsert) and DELETE operations. This pattern is essential for building real-time data synchronization pipelines.
See cdc-replication.yaml for the complete configuration.
The postgres_cdc input streams database changes using Postgres logical replication:
Changes are grouped by transaction to maintain consistency:
batching:
check: '@operation == "commit"'
period: 10s
All changes in a transaction are batched together before being applied. This preserves foreign key constraints and data consistency.
Different operations require different SQL:
The switch routes based on @operation metadata.
The MERGE statement handles both inserts and updates atomically:
MERGE INTO dst_table AS old
USING (SELECT $1 id, $2 foo, $3 bar) AS new
ON new.id = old.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
This ensures idempotency - replaying the same change is safe.
${POSTGRES_DSN})strict_mode: true ensures all messages match a case# Set environment variables
export SOURCE_DSN="postgres://user:pass@source:5432/db?sslmode=disable"
export DEST_DSN="postgres://user:pass@dest:5432/db?sslmode=disable"
# Create replication slot on source database
psql $SOURCE_DSN -c "SELECT pg_create_logical_replication_slot('test_slot', 'pgoutput');"
# Run the pipeline
rpk connect run cdc-replication.yaml
# In another terminal, make changes to source database
psql $SOURCE_DSN -c "INSERT INTO my_src_table (id, foo, bar) VALUES (1, 'test', 'data');"
psql $SOURCE_DSN -c "UPDATE my_src_table SET foo='updated' WHERE id=1;"
psql $SOURCE_DSN -c "DELETE FROM my_src_table WHERE id=1;"
# Check destination database
psql $DEST_DSN -c "SELECT * FROM my_dst_table;"
Kafka as Destination:
output:
switch:
cases:
- check: '@operation == "delete"'
output:
kafka_franz:
topic: deletes
- output:
kafka_franz:
topic: upserts
Multi-Table Replication:
input:
postgres_cdc:
tables: [table1, table2, table3]
output:
switch:
cases:
- check: '@table == "table1"'
output:
sql_raw:
query: |
MERGE INTO dst_table1 ...