connector-writer/destination/step-by-step/6-testing.md
Summary: Comprehensive guide for implementing the full CDK integration test suite. This test validates edge cases, type handling, schema evolution, and CDC support. Required for production certification.
When to use this: After Phase 8 (working connector with ConnectorWiringSuite passing)
Time estimate: 4-8 hours for complete implementation
Comprehensive test coverage (50+ scenarios):
testAppend() - Incremental append without deduplicationtestDedupe() - Incremental append with primary key deduplicationtestTruncate() - Full refresh (replace all data)testAppendSchemaEvolution() - Schema changes during appendBefore starting, you must have:
Purpose: Read data from database for test verification
File: src/test-integration/kotlin/.../{DB}DataDumper.kt
package io.airbyte.integrations.destination.{db}
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.destination.DestinationDataDumper
import javax.sql.DataSource
class {DB}DataDumper(
private val dataSource: DataSource,
) : DestinationDataDumper {
override fun dumpRecords(stream: DestinationStream): List<OutputRecord> {
val tableName = stream.descriptor.name // Or use name generator
val namespace = stream.descriptor.namespace ?: "test"
val records = mutableListOf<OutputRecord>()
dataSource.connection.use { connection ->
val sql = "SELECT * FROM \"$namespace\".\"$tableName\""
connection.createStatement().use { statement ->
val rs = statement.executeQuery(sql)
val metadata = rs.metaData
while (rs.next()) {
val data = mutableMapOf<String, AirbyteValue>()
for (i in 1..metadata.columnCount) {
val columnName = metadata.getColumnName(i)
val value = rs.getObject(i)
// Convert database value to AirbyteValue
data[columnName] = when {
value == null -> NullValue
value is String -> StringValue(value)
value is Int -> IntegerValue(value.toLong())
value is Long -> IntegerValue(value)
value is Boolean -> BooleanValue(value)
value is java.math.BigDecimal -> NumberValue(value)
value is java.sql.Timestamp -> TimestampWithTimezoneValue(value.toInstant().toString())
value is java.sql.Date -> DateValue(value.toLocalDate().toString())
// Add more type conversions as needed
else -> StringValue(value.toString())
}
}
// Extract Airbyte metadata columns
val extractedAt = (data["_airbyte_extracted_at"] as? TimestampWithTimezoneValue)?.value?.toLong() ?: 0L
val generationId = (data["_airbyte_generation_id"] as? IntegerValue)?.value?.toLong() ?: 0L
val meta = data["_airbyte_meta"] // ObjectValue with errors/changes
records.add(
OutputRecord(
extractedAt = extractedAt,
generationId = generationId,
data = data.filterKeys { !it.startsWith("_airbyte") },
airbyteMeta = parseAirbyteMeta(meta)
)
)
}
}
}
return records
}
private fun parseAirbyteMeta(meta: AirbyteValue?): OutputRecord.Meta {
// Parse _airbyte_meta JSON to OutputRecord.Meta
// For now, simple implementation:
return OutputRecord.Meta(syncId = 0)
}
}
What this does:
Purpose: Clean up test data between test runs
File: src/test-integration/kotlin/.../{DB}Cleaner.kt
package io.airbyte.integrations.destination.{db}
import io.airbyte.cdk.load.test.util.destination.DestinationCleaner
import javax.sql.DataSource
class {DB}Cleaner(
private val dataSource: DataSource,
private val testNamespace: String = "test",
) : DestinationCleaner {
override fun cleanup() {
dataSource.connection.use { connection ->
// Drop all test tables
val sql = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '$testNamespace'
"""
connection.createStatement().use { statement ->
val rs = statement.executeQuery(sql)
val tablesToDrop = mutableListOf<String>()
while (rs.next()) {
tablesToDrop.add(rs.getString("table_name"))
}
// Drop each table
tablesToDrop.forEach { tableName ->
try {
statement.execute("DROP TABLE IF EXISTS \"$testNamespace\".\"$tableName\" CASCADE")
} catch (e: Exception) {
// Ignore errors during cleanup
}
}
}
// Optionally drop test namespace
try {
connection.createStatement().use {
it.execute("DROP SCHEMA IF EXISTS \"$testNamespace\" CASCADE")
}
} catch (e: Exception) {
// Ignore
}
}
}
}
What this does:
BasicFunctionalityIntegrationTest has 14 required constructor parameters (15 for dataflow CDK):
| Parameter | Type | Purpose | Common Value |
|---|---|---|---|
configContents | String | Database config JSON | Load from secrets/config.json |
configSpecClass | Class<T> | Specification class | {DB}Specification::class.java |
dataDumper | DestinationDataDumper | Read data for verification | {DB}DataDumper(dataSource) |
destinationCleaner | DestinationCleaner | Clean between tests | {DB}Cleaner(dataSource) |
isStreamSchemaRetroactive | Boolean | Schema changes apply retroactively | true (usually) |
dedupBehavior | DedupBehavior? | CDC deletion mode | DedupBehavior(CdcDeletionMode.HARD_DELETE) |
stringifySchemalessObjects | Boolean | Convert objects without schema to strings | false |
schematizedObjectBehavior | SchematizedNestedValueBehavior | How to handle nested objects | PASS_THROUGH or STRINGIFY |
schematizedArrayBehavior | SchematizedNestedValueBehavior | How to handle nested arrays | STRINGIFY (usually) |
unionBehavior | UnionBehavior | How to handle union types | STRINGIFY or PROMOTE_TO_OBJECT |
supportFileTransfer | Boolean | Supports file uploads | false (for databases) |
commitDataIncrementally | Boolean | Commit during sync vs at end | true |
allTypesBehavior | AllTypesBehavior | Type handling configuration | StronglyTyped(...) |
unknownTypesBehavior | UnknownTypesBehavior | Unknown type handling | PASS_THROUGH |
nullEqualsUnset | Boolean | Null same as missing field | true |
useDataFlowPipeline | Boolean | Use dataflow CDK architecture | true ⭐ REQUIRED for dataflow CDK |
File: src/test-integration/kotlin/.../{DB}BasicFunctionalityTest.kt
package io.airbyte.integrations.destination.{db}
import io.airbyte.cdk.load.test.util.destination.DestinationCleaner
import io.airbyte.cdk.load.test.util.destination.DestinationDataDumper
import io.airbyte.cdk.load.write.AllTypesBehavior
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.DedupBehavior
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.UnknownTypesBehavior
import io.airbyte.integrations.destination.{db}.spec.{DB}Specification
import java.nio.file.Path
import javax.sql.DataSource
import org.junit.jupiter.api.BeforeAll
class {DB}BasicFunctionalityTest : BasicFunctionalityIntegrationTest(
configContents = Path.of("secrets/config.json").toFile().readText(),
configSpecClass = {DB}Specification::class.java,
dataDumper = createDataDumper(),
destinationCleaner = createCleaner(),
// Schema behavior
isStreamSchemaRetroactive = true,
// CDC deletion mode
dedupBehavior = DedupBehavior(DedupBehavior.CdcDeletionMode.HARD_DELETE),
// Type handling
stringifySchemalessObjects = false,
schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
schematizedArrayBehavior = SchematizedNestedValueBehavior.STRINGIFY,
unionBehavior = UnionBehavior.STRINGIFY,
// Feature support
supportFileTransfer = false, // Database destinations don't transfer files
commitDataIncrementally = true,
// Type system behavior
allTypesBehavior = AllTypesBehavior.StronglyTyped(
integerCanBeLarge = false, // true if your DB has unlimited integers
numberCanBeLarge = false, // true if your DB has unlimited precision
nestedFloatLosesPrecision = false,
),
unknownTypesBehavior = UnknownTypesBehavior.PASS_THROUGH,
nullEqualsUnset = true,
// Dataflow CDK architecture (REQUIRED for new CDK)
useDataFlowPipeline = true, // ⚠️ Must be true for dataflow CDK connectors
) {
companion object {
private lateinit var testDataSource: DataSource
@JvmStatic
@BeforeAll
fun beforeAll() {
// Set up test database (Testcontainers or real DB)
testDataSource = createTestDataSource()
}
private fun createDataDumper(): DestinationDataDumper {
return {DB}DataDumper(testDataSource)
}
private fun createCleaner(): DestinationCleaner {
return {DB}Cleaner(testDataSource)
}
private fun createTestDataSource(): DataSource {
// Initialize Testcontainers or connection pool
val container = {DB}Container("{db}:latest")
container.start()
return HikariDataSource().apply {
jdbcUrl = container.jdbcUrl
username = container.username
password = container.password
}
}
}
// Test methods - uncomment as you implement features
@Test
override fun testAppend() {
super.testAppend()
}
@Test
override fun testTruncate() {
super.testTruncate()
}
@Test
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}
@Test
override fun testDedupe() {
super.testDedupe()
}
}
| Parameter | Typical Value | Purpose |
|---|---|---|
| configContents | Path.of("secrets/config.json").toFile().readText() | DB connection config |
| configSpecClass | {DB}Specification::class.java | Your spec class |
| dataDumper | {DB}DataDumper(testDataSource) | Read test data (from Step 1) |
| destinationCleaner | {DB}Cleaner(testDataSource) | Cleanup test data (from Step 1) |
| isStreamSchemaRetroactive | true | Schema changes apply to existing data |
| supportFileTransfer | false | Database destinations don't support files |
| commitDataIncrementally | true | Commit batches as written |
| nullEqualsUnset | true | Treat {"x": null} same as {} |
| stringifySchemalessObjects | false | Use native JSON if available |
| unknownTypesBehavior | PASS_THROUGH | Store unrecognized types as-is |
| unionBehavior | STRINGIFY | Convert union types to JSON string |
| schematizedObjectBehavior | PASS_THROUGH or STRINGIFY | See below |
| schematizedArrayBehavior | STRINGIFY | See below |
Purpose: How to handle CDC deletions
Options:
// Hard delete - remove CDC-deleted records
DedupBehavior(DedupBehavior.CdcDeletionMode.HARD_DELETE)
// Soft delete - keep tombstone records
DedupBehavior(DedupBehavior.CdcDeletionMode.SOFT_DELETE)
// No CDC support yet
null
Purpose: Configure type precision limits
// Snowflake/BigQuery: Unlimited precision
AllTypesBehavior.StronglyTyped(
integerCanBeLarge = true,
numberCanBeLarge = true,
nestedFloatLosesPrecision = false,
)
// MySQL/Postgres: Limited precision
AllTypesBehavior.StronglyTyped(
integerCanBeLarge = false, // BIGINT limits
numberCanBeLarge = false, // DECIMAL limits
nestedFloatLosesPrecision = false,
)
Purpose: How to store nested objects and arrays
Options:
PASS_THROUGH: Use native JSON/array types (Postgres JSONB, Snowflake VARIANT)STRINGIFY: Convert to JSON strings (fallback for databases without native types)Recommendations:
PASS_THROUGH if DB has native JSON, else STRINGIFYSTRINGIFY (most DBs don't have typed arrays, except Postgres)Value: true - REQUIRED for dataflow CDK connectors
Why critical: Setting to false uses old CDK code paths that don't work with Aggregate/InsertBuffer pattern. Always use true.
NEVER rationalize test failures as:
Test failures mean ONE of two things:
Fix: Debug and fix your implementation
Fix: Update test parameters with clear rationale
Key principle: If tests fail, the connector is NOT working correctly for production use.
Example rationalizations to REJECT:
❌ "Many tests failing due to state message comparison - cosmetic" → State messages are HOW Airbyte tracks progress. Wrong state = broken checkpointing!
❌ "Schema evolution needs MongoDB-specific expectations" → Implement schema evolution correctly for MongoDB, then tests pass!
❌ "Dedupe tests need configuration" → Add the configuration! Don't skip tests!
❌ "Some tests need adaptations" → Make the adaptations! Document what's different and why!
ALL tests must pass or be explicitly skipped with documented rationale approved by maintainers.
Agent says: "The 7 failures are specific edge cases - advanced scenarios, not core functionality"
Reality:
If you don't support a mode:
If you claim to support it (in SpecificationExtension):
Agent says: "The connector works for normal use cases"
Reality:
The rule: If supportedSyncModes includes OVERWRITE, then testTruncate() must pass.
Truncate/Overwrite Mode:
DestinationSyncMode.OVERWRITE in SpecificationExtensionGeneration ID Tracking:
State Messages:
Schema Evolution:
Deduplication:
DestinationSyncMode.APPEND_DEDUPNone of these are "edge cases" - they're core Airbyte features!
# Test append mode
$ ./gradlew :destination-{db}:integrationTest --tests "*BasicFunctionalityTest.testAppend"
# Test dedupe mode
$ ./gradlew :destination-{db}:integrationTest --tests "*BasicFunctionalityTest.testDedupe"
# Test schema evolution
$ ./gradlew :destination-{db}:integrationTest --tests "*BasicFunctionalityTest.testAppendSchemaEvolution"
$ ./gradlew :destination-{db}:integrationTest --tests "*BasicFunctionalityTest"
Expected: All enabled tests pass
Time: 5-15 minutes (depending on database and data volume)
Cause: DataDumper not converting types correctly
Fix: Check type conversion in DataDumper:
Cause: Deduplication not working
Fix: Check upsertTable() implementation:
Cause: Schema evolution (ALTER TABLE) not working
Fix: Check applyChangeset() implementation:
Cause: Type mapping issues
Fix: Check ColumnUtils.toDialectType():
// If your DB doesn't support certain features:
// @Test
// override fun testDedupe() {
// // Skip if no MERGE/UPSERT support yet
// }
@Test
fun testDatabaseSpecificFeature() {
// Your custom test
}
File: destination-snowflake/src/test-integration/.../SnowflakeBasicFunctionalityTest.kt
Parameters:
unionBehavior = UnionBehavior.PROMOTE_TO_OBJECT (uses VARIANT type)schematizedObjectBehavior = PASS_THROUGH (native OBJECT type)allTypesBehavior.integerCanBeLarge = true (NUMBER unlimited)File: destination-clickhouse/src/test-integration/.../ClickhouseBasicFunctionalityTest.kt
Parameters:
dedupBehavior = SOFT_DELETE (ReplacingMergeTree doesn't support DELETE in MERGE)schematizedArrayBehavior = STRINGIFY (no native typed arrays)allTypesBehavior.integerCanBeLarge = false (Int64 has limits)File: destination-mysql/src/test-integration/.../MySQLBasicFunctionalityTest.kt
Parameters:
unionBehavior = STRINGIFYschematizedObjectBehavior = STRINGIFY (JSON type but limited)commitDataIncrementally = trueCause: DataDumper not created in companion object
Fix: Verify createDataDumper() returns {DB}DataDumper instance
Cause: Database not responding or deadlock
Fix:
@Timeout(5, unit = TimeUnit.MINUTES)Cause: Setup/cleanup issue
Fix: Check DestinationCleaner.cleanup() actually drops tables
Cause: Type conversion in DataDumper is wrong
Fix: Add logging to see what database returns:
val value = rs.getObject(i)
println("Column $columnName: value=$value, type=${value?.javaClass}")
BasicFunctionalityIntegrationTest is complete when:
Minimum (Phase 8):
Full Feature Set (Phase 13):
Production Ready (Phase 15):
| Task | Time |
|---|---|
| Implement DataDumper | 1-2 hours |
| Implement Cleaner | 30 min |
| Create test class with parameters | 30 min |
| Debug testAppend | 1-2 hours |
| Debug other tests | 2-4 hours |
| Total | 5-9 hours |
Tip: Implement tests incrementally:
BasicFunctionalityIntegrationTest is the gold standard for connector validation but has significant complexity:
Pros:
Cons:
Strategy:
The v2 guide gets you to working connector without this complexity, but this guide ensures production readiness!