connector-writer/destination/dataflow-cdk.md
Summary: The Airbyte Dataflow CDK is a framework that orchestrates destination connector write operations. You implement 4 database-specific components (SQL generator, client, insert buffer, column utilities). The CDK handles message parsing, data flow, table lifecycle, state management, and error handling. Result: Write ~4 custom components, get all sync modes (append, dedupe, overwrite) for free.
main()
→ AirbyteDestinationRunner.run(*args)
→ Parse CLI (--spec, --check, --write)
→ Create Micronaut context
→ Select Operation (SpecOperation, CheckOperation, WriteOperationV2)
→ Execute operation
Write Operation Flow:
WriteOperationV2.execute()
→ DestinationLifecycle.run()
1. Writer.setup() [Create namespaces]
2. Initialize streams [Create StreamLoaders]
3. runDataPipeline() [Process messages]
4. Finalize streams [MERGE/SWAP/cleanup]
5. Teardown [Close connections]
stdin → Database:
Airbyte Platform Connector Pipeline Database
| | |
|-- RECORD messages ------>| |
|-- STATE messages -------->| |
| | |
| Parse JSON |
| Transform types |
| Map column names |
| Batch records |
| | |
| Aggregate.accept() |
| ↓ |
| InsertBuffer.accumulate() |
| | |
| [Buffering] |
| | |
| Aggregate.flush() |
| ↓ |
| InsertBuffer.flush() --------------->| Write batch
| | |
| [Repeat] |
| | |
| StreamLoader.close() |
| ↓ |
| MERGE/SWAP/nothing -------------->| Finalize
|<----- STATE emitted -----| |
Key Insight: Your InsertBuffer only writes batches. The framework handles message parsing, batching triggers, and finalization strategy (MERGE vs SWAP vs direct).
Purpose: Orchestrates per-stream write lifecycle
You don't implement - you instantiate the right one based on sync mode
4 Variants:
| StreamLoader | Mode | Strategy | Use Case |
|---|---|---|---|
DirectLoadTableAppendStreamLoader | Append | Direct write to final table | Logs, append-only data |
DirectLoadTableDedupStreamLoader | Dedupe | Temp table → MERGE with PK dedup | Incremental sync with PK |
DirectLoadTableAppendTruncateStreamLoader | Overwrite | Temp table → SWAP | Full refresh without PK |
DirectLoadTableDedupTruncateStreamLoader | Dedupe + Overwrite | Temp table → dedupe → SWAP | Full refresh with PK |
Lifecycle:
StreamLoader.start() {
- Check if final table exists
- Create/evolve final table
- Create temp table if needed (dedupe/truncate)
- Store target table name in state
}
[Records flow through pipeline → your InsertBuffer writes to table]
StreamLoader.close(streamCompleted) {
if (streamCompleted) {
// Dedupe: MERGE temp → final
// Truncate: SWAP temp ↔ final
// Append: nothing (already in final)
}
// Always cleanup temp tables
}
Selection Pattern:
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
return when (stream.minimumGenerationId) {
0L -> when (stream.importType) {
is Dedupe -> DirectLoadTableDedupStreamLoader(...)
else -> DirectLoadTableAppendStreamLoader(...)
}
stream.generationId -> when (stream.importType) {
is Dedupe -> DirectLoadTableDedupTruncateStreamLoader(...)
else -> DirectLoadTableAppendTruncateStreamLoader(...)
}
else -> throw SystemErrorException("Hybrid refresh not supported")
}
}
Purpose: Accumulates records, triggers flushing
Your Implementation (3 lines):
class MyAggregate(private val buffer: MyInsertBuffer) : Aggregate {
override fun accept(record: RecordDTO) {
buffer.accumulate(record.fields)
}
override suspend fun flush() {
buffer.flush()
}
}
Created by: AggregateFactory per stream
@Factory
class MyAggregateFactory(
private val client: MyAirbyteClient,
private val streamStateStore: StreamStateStore<DirectLoadTableExecutionConfig>,
) : AggregateFactory {
override fun create(key: StoreKey): Aggregate {
// Get table name set by StreamLoader
val tableName = streamStateStore.get(key)!!.tableName
val buffer = MyInsertBuffer(tableName, client)
return MyAggregate(buffer)
}
}
Purpose: Database primitive operations
Key Methods:
interface TableOperationsClient {
suspend fun createNamespace(namespace: String)
suspend fun namespaceExists(namespace: String): Boolean
suspend fun createTable(stream, tableName, columnMapping, replace)
suspend fun tableExists(table: TableName): Boolean
suspend fun dropTable(tableName: TableName)
suspend fun countTable(tableName: TableName): Long?
suspend fun getGenerationId(tableName: TableName): Long
suspend fun overwriteTable(source, target) // For truncate mode
suspend fun copyTable(columnMapping, source, target)
suspend fun upsertTable(stream, columnMapping, source, target) // For dedupe
}
Pattern:
@Singleton
class MyAirbyteClient(
private val dataSource: DataSource,
private val sqlGenerator: MySqlGenerator,
) : TableOperationsClient, TableSchemaEvolutionClient {
override suspend fun createTable(...) {
execute(sqlGenerator.createTable(...))
}
override suspend fun upsertTable(...) {
execute(sqlGenerator.upsertTable(...)) // MERGE statement
}
private suspend fun execute(sql: String) {
dataSource.connection.use { conn ->
conn.createStatement().use { stmt ->
stmt.executeQuery(sql)
}
}
}
}
Separation: Client executes SQL, SqlGenerator generates SQL
Purpose: Automatic schema adaptation
4-Step Process:
// 1. Discover current schema from database
suspend fun discoverSchema(tableName): TableSchema
// 2. Compute expected schema from stream
fun computeSchema(stream, columnMapping): TableSchema
// 3. Compare (automatic by CDK)
val changeset = ColumnChangeset(
columnsToAdd = ...,
columnsToDrop = ...,
columnsToChange = ...,
)
// 4. Apply changes
suspend fun applyChangeset(..., changeset)
When Called: Automatically by StreamLoader.start() if table exists
Operations:
ALTER TABLE ADD COLUMNALTER TABLE DROP COLUMNALTER TABLE ALTER COLUMN TYPE (safe)Purpose: Orchestration layer
Your Implementation:
@Singleton
class MyWriter(
private val names: TableCatalog,
private val stateGatherer: DatabaseInitialStatusGatherer<DirectLoadInitialStatus>,
private val streamStateStore: StreamStateStore<DirectLoadTableExecutionConfig>,
private val client: MyAirbyteClient,
private val tempTableNameGenerator: TempTableNameGenerator,
) : DestinationWriter {
private lateinit var initialStatuses: Map<DestinationStream, DirectLoadInitialStatus>
override suspend fun setup() {
// Create all namespaces
names.values
.map { it.tableNames.finalTableName!!.namespace }
.toSet()
.forEach { client.createNamespace(it) }
// Gather initial state (table exists? gen ID? columns?)
initialStatuses = stateGatherer.gatherInitialStatus(names)
}
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val initialStatus = initialStatuses[stream]!!
val tableNames = names[stream]!!.tableNames
val columnMapping = names[stream]!!.columnNameMapping
return /* Select appropriate StreamLoader */
}
}
Key Responsibilities:
| Component | Responsibilities | Your Interaction |
|---|---|---|
| DestinationLifecycle | Overall orchestration | None - runs automatically |
| Data Pipeline | Parse messages, transform types, batch records | Configure via ColumnUtils |
| 4 StreamLoaders | Table lifecycle, finalization strategy | Instantiate the right one |
| StreamStateStore | Coordinate InsertBuffer ↔ StreamLoader | Read from in AggregateFactory |
| TableCatalog | Column name mapping (logical → physical) | Query for mapped column names |
| State Management | Track checkpoints, emit STATE messages | Automatic after successful flush |
| Error Handling | Classify errors, emit TRACE messages | Throw ConfigError/SystemError |
| Base Class | Purpose | Customization Needed |
|---|---|---|
BaseDirectLoadInitialStatusGatherer | Gather table state before sync | Usually none - just extend |
DefaultTempTableNameGenerator | Generate temp table names | Usually none - use as-is |
| Component | Effort | Purpose | Lines of Code |
|---|---|---|---|
| SQL Generator | High | Generate DB-specific SQL | 300-500 |
| Database Client | High | Execute SQL, handle errors | 400-600 |
| Insert Buffer | Medium | Efficient batch writes | 200-300 |
| Column Utilities | Medium | Type mapping, column declarations | 100-200 |
| Component | Effort | Purpose | Lines of Code |
|---|---|---|---|
| Configuration | Low | Config spec, factory, validation | 100-150 |
| Name Generators | Low | Table/column name formatting | 50-100 |
| Checker | Low | Connection validation | 50-80 |
| Writer | Low | Orchestration (setup, select loaders) | 80-120 |
| Component | Effort | Purpose | Lines of Code |
|---|---|---|---|
| Aggregate | Minimal | Delegate to buffer | 10-15 |
| AggregateFactory | Minimal | Create aggregate per stream | 20-30 |
| WriteOperationV2 | Minimal | Entry point for write operation | 10-15 |
| BeanFactory | Low | Micronaut DI setup | 50-100 |
Total: ~20 components, ~2000-3000 lines of code
Critical Path: SqlGenerator → Client → InsertBuffer → ColumnUtils
Why?
When?
Naming: _airbyte_tmp_{uuid}_{timestamp} in internal schema
Lifecycle:
StreamLoader.start() → createTable(tempTable)
Records written to tempTable via InsertBuffer
StreamLoader.close() → finalize from tempTable → dropTable(tempTable)
Key Guarantee: Readers never see empty or partial tables during sync.
How: Temp table + atomic swap pattern
Why Atomic:
Traditional ETL comparison:
Use Cases:
| User Setting | minimumGenerationId | importType | StreamLoader |
|---|---|---|---|
| Incremental | 0 | Append | DirectLoadTableAppendStreamLoader |
| Incremental | 0 | Dedupe | DirectLoadTableDedupStreamLoader |
| Full Refresh (Append) | generationId | Append | DirectLoadTableAppendTruncateStreamLoader |
| Full Refresh (Overwrite) | generationId | Dedupe | DirectLoadTableDedupTruncateStreamLoader |
Generation IDs:
minimumGenerationId = 0: Keep all existing data (incremental)minimumGenerationId = generationId: Replace all data (full refresh)minimumGenerationId != 0 and != generationId: Hybrid refresh (NOT SUPPORTED)Resume Logic (Truncate Mode):
When StreamLoader.start() is called:
Generation ID Purpose:
_airbyte_generation_id column in every recordDedupe StreamLoader uses window function:
WITH deduped AS (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY primary_key
ORDER BY _airbyte_extracted_at DESC
) AS rn
FROM temp_table
)
SELECT * FROM deduped WHERE rn = 1
Then MERGE into final:
MERGE INTO final_table AS target
USING deduped AS source
ON target.pk = source.pk
WHEN MATCHED AND source.cursor > target.cursor THEN UPDATE ...
WHEN NOT MATCHED THEN INSERT ...
CDC Handling (if enabled):
MERGE INTO final_table AS target
USING deduped AS source
ON target.pk = source.pk
WHEN MATCHED AND source._ab_cdc_deleted_at IS NOT NULL THEN DELETE -- Hard delete
WHEN MATCHED AND source.cursor > target.cursor THEN UPDATE ...
WHEN NOT MATCHED AND source._ab_cdc_deleted_at IS NULL THEN INSERT ...
Critical Guarantee: STATE only emitted after data persisted to database.
Flow:
RECORD messages → buffer
STATE message → flush buffers → database COMMIT → emit STATE
Timing:
Recovery on Failure:
Always included (managed by framework):
| Column | Type | Purpose |
|---|---|---|
_airbyte_raw_id | UUID/String | Unique record identifier |
_airbyte_extracted_at | Timestamp | Extraction timestamp |
_airbyte_meta | JSON | Errors, warnings, transformations |
_airbyte_generation_id | Integer | Sync generation tracking |
Filtered out during schema discovery and computation - never in ColumnChangeset
AirbyteValue → Database Format:
Pipeline receives: JSON message
↓ Deserialize
AirbyteValue (StringValue, IntegerValue, etc.)
↓ Transform
Database-specific format (via ColumnUtils)
↓ Buffer
InsertBuffer accumulates
↓ Format
CSV, binary, JSON, etc.
↓ Write
Database client writes batch
Example:
// Source
{"id": 123, "name": "Alice", "created_at": "2024-01-01T12:00:00Z"}
// After transformation (RecordDTO.fields)
{
"id": IntegerValue(123),
"name": StringValue("Alice"),
"created_at": TimestampValue("2024-01-01T12:00:00Z"),
"_airbyte_raw_id": StringValue("uuid..."),
"_airbyte_extracted_at": TimestampValue("2024-01-01T12:00:00Z"),
"_airbyte_meta": ObjectValue(...),
"_airbyte_generation_id": IntegerValue(42)
}
// InsertBuffer formats for database
CSV: "123,Alice,2024-01-01 12:00:00,uuid...,2024-01-01 12:00:00,{},42"
Binary: [0x7B, 0x00, ...] (database-specific format)
Logical → Physical:
Stream schema: {"field_name": StringType}
↓
ColumnNameGenerator: "field_name" → "FIELD_NAME" (Snowflake)
→ "field_name" (ClickHouse)
↓
TableCatalog stores: {"field_name": "FIELD_NAME"}
↓
Your code queries: columnMapping["field_name"] → "FIELD_NAME"
Use TableCatalog, don't implement manually
| Exception | When to Use | Platform Action |
|---|---|---|
ConfigErrorException | User-fixable (bad creds, permissions, invalid config) | NO RETRY - notify user |
TransientErrorException | Temporary (network timeout, DB unavailable) | RETRY with backoff |
SystemErrorException | Internal errors, bugs | LIMITED RETRY - likely bug |
Pattern:
try {
connection.executeQuery(sql)
} catch (e: SQLException) {
when {
e.message?.contains("permission") == true ->
throw ConfigErrorException("Permission denied. Grant privileges.", e)
e.message?.contains("timeout") == true ->
throw TransientErrorException("Network timeout. Will retry.", e)
else ->
throw SystemErrorException("Unexpected SQL error", e)
}
}
On Failure:
StreamLoader.close(streamCompleted = false) {
// Skip finalization (no MERGE/SWAP)
// Drop temp tables (cleanup)
// Final table unchanged
}
Result:
Where framework calls your code:
| Phase | Framework Calls | Your Code Executes |
|---|---|---|
| Setup | Writer.setup() | Create namespaces, gather initial state |
| Stream Init | Writer.createStreamLoader() | Select appropriate StreamLoader |
StreamLoader.start() | createTable(), ensureSchemaMatches() | |
AggregateFactory.create() | Create InsertBuffer with target table | |
| Data Flow | Aggregate.accept() | InsertBuffer.accumulate() |
Aggregate.flush() | InsertBuffer.flush() → write batch | |
| Finalize | StreamLoader.close() | upsertTable() or overwriteTable() |
| Always | dropTable(tempTable) |
Key Insight: Framework orchestrates when to call what. You implement the "what" (database operations), framework handles the "when" and "how".
Separation of concerns:
Benefits:
Automatically during StreamLoader.start() if:
Never triggered if:
Options:
-- Dedupe in temp
CREATE TABLE deduped AS SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY cursor DESC) AS rn
FROM temp
) WHERE rn = 1;
-- DELETE + INSERT
DELETE FROM final WHERE pk IN (SELECT pk FROM deduped);
INSERT INTO final SELECT * FROM deduped;
-- Update existing
UPDATE final SET col1 = temp.col1, ... FROM temp WHERE final.pk = temp.pk;
-- Insert new
INSERT INTO final SELECT * FROM temp WHERE pk NOT IN (SELECT pk FROM final);
-- Postgres: INSERT ... ON CONFLICT
INSERT INTO final SELECT * FROM temp
ON CONFLICT (pk) DO UPDATE SET col1 = EXCLUDED.col1, ...;
-- MySQL: INSERT ... ON DUPLICATE KEY UPDATE
INSERT INTO final SELECT * FROM temp
ON DUPLICATE KEY UPDATE col1 = VALUES(col1), ...;
Levels:
BasicFunctionalityIntegrationTest provides:
Use Testcontainers for reproducible, isolated tests
Framework triggers flush at:
Your InsertBuffer can add:
Framework parallelizes:
num-open-stream-workers)Your code should:
@Singleton componentsFramework provides:
Your InsertBuffer should:
finally blocksAll production connectors must pin to a specific CDK version:
File: destination-{db}/gradle.properties
cdkVersion=0.1.76 # Pin to specific version
Never use in production:
cdkVersion=local # Only for CDK development
The airbyte-bulk-connector plugin:
cdkVersion from gradle.propertiesio.airbyte.bulk-cdk:bulk-cdk-core-load:0.1.76cdkVersion=localVerify pinning:
./gradlew :destination-{db}:dependencies --configuration runtimeClasspath | grep bulk-cdk
Expected: io.airbyte.bulk-cdk:bulk-cdk-core-load:0.1.76 (not project :airbyte-cdk:bulk:...)
Upgrade CDK:
./gradlew destination-{db}:upgradeCdk --cdkVersion=0.1.76
What you must provide:
gradle.propertiesWhat the CDK provides:
Result:
Effort: ~1 week for experienced developer (4 core components + patterns + testing)