connector-writer/destination/step-by-step/4-write-operations.md
Prerequisites: Complete 3-write-infrastructure.md - Your DI setup must be complete and you must understand test contexts.
What You'll Build: After completing this guide, you'll have a working connector with:
--write operation working for basic syncsGoal: Implement actual data writing (Writer, Aggregate, InsertBuffer)
Checkpoint: Can write one record end-to-end
š Dependency Context: Now that infrastructure exists (Phases 6-7), add business logic:
Key insight: Infrastructure DI (Phase 7) is separate from business logic DI (Phase 8). Phase 7 validates "can we start?" Phase 8 validates "can we write data?"
File: write/load/{DB}InsertBuffer.kt
package io.airbyte.integrations.destination.{db}.write.load
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient
import io.github.oshai.kotlinlogging.KotlinLogging
private val log = KotlinLogging.logger {}
/**
* Accumulates records and flushes to database in batches.
*
* NOT a @Singleton - created per-stream by AggregateFactory
*/
class {DB}InsertBuffer(
private val tableName: TableName,
private val client: {DB}AirbyteClient,
private val flushLimit: Int = 1000,
) {
private val buffer = mutableListOf<Map<String, AirbyteValue>>()
private var recordCount = 0
fun accumulate(recordFields: Map<String, AirbyteValue>) {
buffer.add(recordFields)
recordCount++
if (recordCount >= flushLimit) {
kotlinx.coroutines.runBlocking { flush() }
}
}
suspend fun flush() {
if (buffer.isEmpty()) return
try {
log.info { "Flushing $recordCount records to ${tableName}..." }
// Simple multi-row INSERT for now
// (Optimize in Phase 15: CSV staging, COPY, bulk APIs)
buffer.forEach { record ->
insertRecord(tableName, record)
}
log.info { "Finished flushing $recordCount records" }
} finally {
buffer.clear()
recordCount = 0
}
}
private suspend fun insertRecord(
tableName: TableName,
record: Map<String, AirbyteValue>
) {
val columns = record.keys.joinToString(", ") { "\"$it\"" }
val placeholders = record.keys.joinToString(", ") { "?" }
val sql = """
INSERT INTO "${tableName.namespace}"."${tableName.name}" ($columns)
VALUES ($placeholders)
"""
client.executeInsert(sql, record.values.toList())
}
}
Key points:
Why not @Singleton?
File: Update client/{DB}AirbyteClient.kt
// Add this method to {DB}AirbyteClient
fun executeInsert(sql: String, values: List<AirbyteValue>) {
dataSource.connection.use { connection ->
connection.prepareStatement(sql).use { statement ->
values.forEachIndexed { index, value ->
setParameter(statement, index + 1, value)
}
statement.executeUpdate()
}
}
}
private fun setParameter(statement: PreparedStatement, index: Int, value: AirbyteValue) {
when (value) {
is StringValue -> statement.setString(index, value.value)
is IntegerValue -> statement.setLong(index, value.value)
is NumberValue -> statement.setBigDecimal(index, value.value)
is BooleanValue -> statement.setBoolean(index, value.value)
is TimestampValue -> statement.setTimestamp(index, Timestamp.from(value.value))
is DateValue -> statement.setDate(index, Date.valueOf(value.value))
is TimeValue -> statement.setTime(index, Time.valueOf(value.value.toLocalTime()))
is ObjectValue -> statement.setString(index, value.toJson()) // JSON as string
is ArrayValue -> statement.setString(index, value.toJson()) // JSON as string
is NullValue -> statement.setNull(index, Types.VARCHAR)
else -> statement.setString(index, value.toString())
}
}
Note: For non-JDBC databases, use native client APIs (e.g., MongoDB insertOne, ClickHouse native client)
File: dataflow/{DB}Aggregate.kt
package io.airbyte.integrations.destination.{db}.dataflow
import io.airbyte.cdk.load.dataflow.aggregate.Aggregate
import io.airbyte.cdk.load.dataflow.transform.RecordDTO
import io.airbyte.integrations.destination.{db}.write.load.{DB}InsertBuffer
/**
* Processes transformed records for a single stream.
*
* Dataflow pipeline: Raw record ā Transform ā RecordDTO ā Aggregate.accept() ā InsertBuffer
*
* NOT a @Singleton - created per-stream by AggregateFactory
*/
class {DB}Aggregate(
private val buffer: {DB}InsertBuffer,
) : Aggregate {
override fun accept(record: RecordDTO) {
buffer.accumulate(record.fields)
}
override suspend fun flush() {
buffer.flush()
}
}
What this does:
Dataflow pipeline:
Platform ā JSONL records
ā
AirbyteMessageDeserializer (CDK)
ā
RecordTransformer (CDK, uses ColumnNameMapper from Phase 7)
ā
RecordDTO (transformed record with mapped column names)
ā
Aggregate.accept() ā YOUR CODE STARTS HERE
ā
InsertBuffer.accumulate()
ā
Database
File: dataflow/{DB}AggregateFactory.kt
package io.airbyte.integrations.destination.{db}.dataflow
import io.airbyte.cdk.load.dataflow.aggregate.Aggregate
import io.airbyte.cdk.load.dataflow.aggregate.AggregateFactory
import io.airbyte.cdk.load.state.StoreKey
import io.airbyte.cdk.load.table.directload.DirectLoadTableExecutionConfig
import io.airbyte.cdk.load.write.StreamStateStore
import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient
import io.airbyte.integrations.destination.{db}.write.load.{DB}InsertBuffer
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
@Factory
class {DB}AggregateFactory(
private val client: {DB}AirbyteClient,
private val streamStateStore: StreamStateStore<DirectLoadTableExecutionConfig>,
) : AggregateFactory {
@Singleton
override fun create(key: StoreKey): Aggregate {
// StreamStateStore contains execution config for each stream
// Config includes table name, column mapping, etc.
val tableName = streamStateStore.get(key)!!.tableName
val buffer = {DB}InsertBuffer(
tableName = tableName,
client = client,
)
return {DB}Aggregate(buffer)
}
}
What this does:
Why factory pattern?
File: write/{DB}Writer.kt
package io.airbyte.integrations.destination.{db}.write
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.table.DatabaseInitialStatusGatherer
import io.airbyte.cdk.load.table.directload.DirectLoadInitialStatus
import io.airbyte.cdk.load.table.directload.DirectLoadTableAppendStreamLoader
import io.airbyte.cdk.load.table.directload.DirectLoadTableAppendTruncateStreamLoader
import io.airbyte.cdk.load.table.directload.DirectLoadTableExecutionConfig
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.cdk.load.write.StreamStateStore
import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient
import jakarta.inject.Singleton
@Singleton
class {DB}Writer(
private val catalog: DestinationCatalog,
private val stateGatherer: DatabaseInitialStatusGatherer<DirectLoadInitialStatus>,
private val streamStateStore: StreamStateStore<DirectLoadTableExecutionConfig>,
private val client: {DB}AirbyteClient,
) : DestinationWriter {
private lateinit var initialStatuses: Map<DestinationStream, DirectLoadInitialStatus>
override suspend fun setup() {
// Create all namespaces
catalog.streams
.map { it.tableSchema.tableNames.finalTableName!!.namespace }
.toSet()
.forEach { client.createNamespace(it) }
// Gather initial state (which tables exist, generation IDs, etc.)
initialStatuses = stateGatherer.gatherInitialStatus()
}
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val initialStatus = initialStatuses[stream]!!
// Access schema directly from stream (modern CDK pattern)
val realTableName = stream.tableSchema.tableNames.finalTableName!!
val tempTableName = stream.tableSchema.tableNames.tempTableName!!
val columnNameMapping = ColumnNameMapping(
stream.tableSchema.columnSchema.inputToFinalColumnNames
)
// Choose StreamLoader based on sync mode
return when (stream.minimumGenerationId) {
0L ->
// Append mode: just insert records
DirectLoadTableAppendStreamLoader(
stream,
initialStatus,
realTableName = realTableName,
tempTableName = tempTableName,
columnNameMapping,
client, // TableOperationsClient
client, // TableSchemaEvolutionClient
streamStateStore,
)
stream.generationId ->
// Overwrite/truncate mode: replace table contents
DirectLoadTableAppendTruncateStreamLoader(
stream,
initialStatus,
realTableName = realTableName,
tempTableName = tempTableName,
columnNameMapping,
client,
client,
streamStateStore,
)
else ->
throw SystemErrorException(
"Cannot execute a hybrid refresh - current generation ${stream.generationId}; minimum generation ${stream.minimumGenerationId}"
)
}
}
}
What this does:
Modern CDK pattern (stream.tableSchema):
stream.tableSchema (set by CDK)stream.tableSchema.tableNames.finalTableName!!stream.tableSchema.columnSchema.inputToFinalColumnNamesStreamLoader selection:
minimumGenerationId == 0: Append mode (DirectLoadTableAppendStreamLoader)minimumGenerationId == generationId: Overwrite mode (DirectLoadTableAppendTruncateStreamLoader)StreamLoader responsibilities:
CDK provides implementations:
File: src/test-integration/kotlin/.../component/{DB}WiringTest.kt
package io.airbyte.integrations.destination.{db}.component
import io.airbyte.cdk.load.component.ConnectorWiringSuite
import io.airbyte.cdk.load.component.TableOperationsClient
import io.airbyte.cdk.load.dataflow.aggregate.AggregateFactory
import io.airbyte.cdk.load.write.DestinationWriter
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.junit.jupiter.api.Test
@MicronautTest(environments = ["component"])
class {DB}WiringTest(
override val writer: DestinationWriter,
override val client: TableOperationsClient,
override val aggregateFactory: AggregateFactory,
) : ConnectorWiringSuite {
// Optional: Override test namespace if different from "test"
// override val testNamespace = "my_database"
@Test
override fun `all beans are injectable`() {
super.`all beans are injectable`()
}
@Test
override fun `writer setup completes`() {
super.`writer setup completes`()
}
@Test
override fun `can create append stream loader`() {
super.`can create append stream loader`()
}
@Test
override fun `can write one record`() {
super.`can write one record`()
}
}
What ConnectorWiringSuite does:
Test 1: all beans are injectable
Test 2: writer setup completes
Test 3: can create append stream loader
Test 4: can write one record ā MOST IMPORTANT
Test context:
Why MockDestinationCatalog?
Validate:
$ ./gradlew :destination-{db}:testComponentAllBeansAreInjectable \
:destination-{db}:testComponentWriterSetupCompletes \
:destination-{db}:testComponentCanCreateAppendStreamLoader \
:destination-{db}:testComponentCanWriteOneRecord # 4 tests should pass
$ ./gradlew :destination-{db}:componentTest # 9 tests should pass
$ ./gradlew :destination-{db}:integrationTest # 3 tests should pass
If can write one record FAILS:
DI errors: ā Check Phase 7 infrastructure (WriteOperationV2, DatabaseInitialStatusGatherer, ColumnNameMapper) ā Check Phase 6 name generators all have @Singleton
Table creation errors: ā Check TableOperationsClient.createTable() implementation (Phase 4) ā Check SqlGenerator.createTable() SQL syntax
Insert errors: ā Check InsertBuffer.insertRecord() implementation ā Check client.executeInsert() and setParameter() logic ā Check column name mapping
Record not found in database: ā Check buffer.flush() is called ā Check SQL INSERT statement is correct ā Query database directly to debug
ā Checkpoint: First working sync + all previous phases still work
Goal: Track sync generations for refresh handling
Checkpoint: Can retrieve generation IDs
š What's a Generation ID?
_airbyte_generation_id columnWhen used:
File: Update src/test-integration/kotlin/.../component/{DB}TableOperationsTest.kt
@Test
override fun `get generation id`() {
super.`get generation id`()
}
What this tests:
_airbyte_generation_id columnValidate:
$ ./gradlew :destination-{db}:testComponentGetGenerationId # 1 test should pass
$ ./gradlew :destination-{db}:componentTest # 10 tests should pass
ā Checkpoint: Generation ID tracking works + all previous phases still work
Goal: Support full refresh (replace all data)
Checkpoint: Can replace table contents atomically
š How Overwrite Works:
Sync modes:
File: Update client/{DB}SqlGenerator.kt
fun overwriteTable(source: TableName, target: TableName): List<String> {
// Option 1: SWAP (Snowflake)
return listOf(
"ALTER TABLE ${fullyQualifiedName(target)} SWAP WITH ${fullyQualifiedName(source)}".andLog(),
"DROP TABLE IF EXISTS ${fullyQualifiedName(source)}".andLog(),
)
// Option 2: EXCHANGE (ClickHouse)
return listOf(
"EXCHANGE TABLES ${fullyQualifiedName(target)} AND ${fullyQualifiedName(source)}".andLog(),
"DROP TABLE IF EXISTS ${fullyQualifiedName(source)}".andLog(),
)
// Option 3: DROP + RENAME (fallback for most databases)
return listOf(
"DROP TABLE IF EXISTS ${fullyQualifiedName(target)}".andLog(),
"ALTER TABLE ${fullyQualifiedName(source)} RENAME TO ${target.name.quote()}".andLog(),
)
// Option 4: BEGIN TRANSACTION + DROP + RENAME + COMMIT (for ACID guarantees)
return listOf(
"BEGIN TRANSACTION".andLog(),
"DROP TABLE IF EXISTS ${fullyQualifiedName(target)}".andLog(),
"ALTER TABLE ${fullyQualifiedName(source)} RENAME TO ${target.name.quote()}".andLog(),
"COMMIT".andLog(),
)
}
Database-specific notes:
File: Update client/{DB}AirbyteClient.kt
override suspend fun overwriteTable(
sourceTableName: TableName,
targetTableName: TableName
) {
val statements = sqlGenerator.overwriteTable(sourceTableName, targetTableName)
statements.forEach { execute(it) }
}
File: Update write/{DB}Writer.kt
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
// Defensive: Handle streams not in catalog (for test compatibility)
val initialStatus = if (::initialStatuses.isInitialized) {
initialStatuses[stream] ?: DirectLoadInitialStatus(null, null)
} else {
DirectLoadInitialStatus(null, null)
}
val tableNameInfo = names[stream]
val (realTableName, tempTableName, columnNameMapping) = if (tableNameInfo != null) {
Triple(
tableNameInfo.tableNames.finalTableName!!,
tempTableNameGenerator.generate(tableNameInfo.tableNames.finalTableName!!),
tableNameInfo.columnNameMapping
)
} else {
val tableName = TableName(
namespace = stream.mappedDescriptor.namespace ?: "test",
name = stream.mappedDescriptor.name
)
Triple(tableName, tempTableNameGenerator.generate(tableName), ColumnNameMapping(emptyMap()))
}
// Choose StreamLoader based on sync mode
return when (stream.minimumGenerationId) {
0L -> DirectLoadTableAppendStreamLoader(
stream, initialStatus, realTableName, tempTableName,
columnNameMapping, client, client, streamStateStore
)
stream.generationId -> DirectLoadTableAppendTruncateStreamLoader(
stream, initialStatus, realTableName, tempTableName,
columnNameMapping, client, client, streamStateStore
)
else -> throw SystemErrorException("Hybrid refresh not supported")
}
}
What changed:
when statement to choose StreamLoader based on minimumGenerationIdminimumGenerationId = 0: Append mode (keep old data)minimumGenerationId = generationId: Truncate mode (replace old data)StreamLoader behavior:
File: Update src/test-integration/kotlin/.../component/{DB}TableOperationsTest.kt
@Test
override fun `overwrite tables`() {
super.`overwrite tables`()
}
Validate:
$ ./gradlew :destination-{db}:testComponentOverwriteTables # 1 test should pass
$ ./gradlew :destination-{db}:componentTest # 11 tests should pass
$ ./gradlew :destination-{db}:integrationTest # 3 tests should pass
ā Checkpoint: Full refresh mode works + all previous phases still work
Goal: Support table copying (used internally by some modes)
Checkpoint: Can copy data between tables
š When Copy is Used:
File: Update client/{DB}SqlGenerator.kt
fun copyTable(
columnMapping: ColumnNameMapping,
source: TableName,
target: TableName
): String {
val columnList = columnMapping.values.joinToString(", ") { "\"$it\"" }
return """
INSERT INTO ${fullyQualifiedName(target)} ($columnList)
SELECT $columnList
FROM ${fullyQualifiedName(source)}
""".trimIndent().andLog()
}
What this does:
Alternative: Include Airbyte metadata columns explicitly:
fun copyTable(
columnMapping: ColumnNameMapping,
source: TableName,
target: TableName
): String {
// Include Airbyte metadata + user columns
val allColumns = listOf(
"_airbyte_raw_id",
"_airbyte_extracted_at",
"_airbyte_meta",
"_airbyte_generation_id"
) + columnMapping.values
val columnList = allColumns.joinToString(", ") { "\"$it\"" }
return """
INSERT INTO ${fullyQualifiedName(target)} ($columnList)
SELECT $columnList
FROM ${fullyQualifiedName(source)}
""".trimIndent().andLog()
}
File: Update client/{DB}AirbyteClient.kt
override suspend fun copyTable(
columnNameMapping: ColumnNameMapping,
sourceTableName: TableName,
targetTableName: TableName
) {
execute(sqlGenerator.copyTable(columnNameMapping, sourceTableName, targetTableName))
}
File: Update src/test-integration/kotlin/.../component/{DB}TableOperationsTest.kt
@Test
override fun `copy tables`() {
super.`copy tables`()
}
Validate:
$ ./gradlew :destination-{db}:testComponentCopyTables # 1 test should pass
$ ./gradlew :destination-{db}:componentTest # 12 tests should pass
$ ./gradlew :destination-{db}:integrationTest # 3 tests should pass
ā Checkpoint: Copy operation works + all previous phases still work
Next: Your connector now works for basic use cases! Continue to 5-advanced-features.md for production-ready features, or jump to 6-testing.md to run the full test suite.