connector-writer/destination/step-by-step/2-database-setup.md
Prerequisites: Complete Setup Phase 2 (Spec Operation) from 1-getting-started.md
After completing this guide, your connector will have:
TableOperationsClient implementation (all database operations)TableOperationsSuite (5 tests passing)--check operation workingThis file contains two phases:
Database Phase 1: TableOperationsClient Implementation
Database Phase 2: Check Operation
--check commandGoal: Implement complete database operations interface in one cohesive phase
Why one phase? TableOperationsClient is a single cohesive interface. Implementing all methods together:
What you'll implement:
Checkpoint: All TableOperationsSuite tests passing (5 tests)
File: {DB}BeanFactory.kt
package io.airbyte.integrations.destination.{db}
import io.airbyte.cdk.Operation
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.airbyte.integrations.destination.{db}.spec.*
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import javax.sql.DataSource
import com.zaxxer.hikari.HikariDataSource
@Factory
class {DB}BeanFactory {
@Singleton
fun configuration(
configFactory: {DB}ConfigurationFactory,
specFactory: ConfigurationSpecificationSupplier<{DB}Specification>,
): {DB}Configuration {
val spec = specFactory.get()
return configFactory.makeWithoutExceptionHandling(spec)
}
@Singleton
@Requires(property = Operation.PROPERTY, notEquals = "spec")
fun dataSource(config: {DB}Configuration): DataSource {
// For JDBC databases:
return HikariDataSource().apply {
jdbcUrl = "jdbc:{db}://${config.hostname}:${config.port}/${config.database}"
username = config.username
password = config.password
maximumPoolSize = 10
connectionTimeout = 30000
}
// For non-JDBC: Create your native client here
// Example (ClickHouse):
// return ClickHouseDataSource(
// "http://${config.hostname}:${config.port}/${config.database}",
// Properties().apply {
// setProperty("user", config.username)
// setProperty("password", config.password)
// }
// )
}
@Singleton
@Requires(property = Operation.PROPERTY, value = "spec")
fun emptyDataSource(): DataSource {
return object : DataSource {
override fun getConnection() = null
override fun getConnection(username: String?, password: String?) = null
override fun unwrap(iface: Class<*>?) = throw UnsupportedOperationException()
override fun isWrapperFor(iface: Class<*>?) = false
override fun getLogWriter() = null
override fun setLogWriter(out: java.io.PrintWriter?) {}
override fun setLoginTimeout(seconds: Int) {}
override fun getLoginTimeout() = 0
override fun getParentLogger() = throw UnsupportedOperationException()
}
}
}
What this does:
--config file--check and --write operations--spec operation (no connection needed)Database-specific adjustments:
File: Update build.gradle.kts
dependencies {
// Existing dependencies...
// Testcontainers for automated testing (recommended)
testImplementation("org.testcontainers:testcontainers:1.19.0")
testImplementation("org.testcontainers:{db}:1.19.0") // e.g., postgresql, mysql, clickhouse
// For databases without specific Testcontainers module:
// testImplementation("org.testcontainers:jdbc:1.19.0")
}
Check available modules: https://www.testcontainers.org/modules/databases/
File: src/test-integration/kotlin/.../component/{DB}TestConfigFactory.kt
package io.airbyte.integrations.destination.{db}.component
import io.airbyte.integrations.destination.{db}.spec.*
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.testcontainers.containers.{DB}Container // e.g., PostgreSQLContainer
@Factory
@Requires(env = ["component"])
class {DB}TestConfigFactory {
@Singleton
@Primary
fun testContainer(): {DB}Container<*> {
// Example for PostgreSQL:
val container = PostgreSQLContainer("postgres:15-alpine")
.withDatabaseName("test")
.withUsername("test")
.withPassword("test")
// Example for MySQL:
// val container = MySQLContainer("mysql:8.0")
// .withDatabaseName("test")
// .withUsername("test")
// .withPassword("test")
// Example for ClickHouse:
// val container = ClickHouseContainer("clickhouse/clickhouse-server:latest")
// .withDatabaseName("test")
// .withUsername("default")
// .withPassword("")
container.start()
return container
}
@Singleton
@Primary
fun testConfig(container: {DB}Container<*>): {DB}Configuration {
return {DB}Configuration(
hostname = container.host,
port = container.firstMappedPort,
database = container.databaseName,
username = container.username,
password = container.password,
)
}
}
Why Testcontainers (recommended)?
Use this approach when:
Prerequisites:
Before running tests, secrets/config.json must exist with valid database credentials.
File: destination-{db}/secrets/config.json
{
"hostname": "your-database-host.example.com",
"port": 5432,
"database": "your_database",
"username": "your_username",
"password": "your_password"
}
⚠️ This file is gitignored - never commit credentials.
TestConfigFactory (reads from secrets file):
File: src/test-integration/kotlin/.../component/{DB}TestConfigFactory.kt
package io.airbyte.integrations.destination.{db}.component
import io.airbyte.cdk.load.component.config.TestConfigLoader.loadTestConfig
import io.airbyte.integrations.destination.{db}.spec.*
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
@Factory
@Requires(env = ["component"])
class {DB}TestConfigFactory {
@Singleton
@Primary
fun testConfig(): {DB}Configuration {
return loadTestConfig(
{DB}Specification::class.java,
{DB}ConfigurationFactory::class.java,
"test-instance.json", // or "config.json" in secrets/
)
}
}
Alternative: Environment variables (for CI or when you prefer not to use files)
Replace testConfig() with:
@Singleton
@Primary
fun testConfig(): {DB}Configuration {
return {DB}Configuration(
hostname = System.getenv("DB_HOSTNAME") ?: error("DB_HOSTNAME not set"),
port = System.getenv("DB_PORT")?.toInt() ?: error("DB_PORT not set"),
database = System.getenv("DB_DATABASE") ?: error("DB_DATABASE not set"),
username = System.getenv("DB_USERNAME") ?: error("DB_USERNAME not set"),
password = System.getenv("DB_PASSWORD") ?: error("DB_PASSWORD not set"),
)
}
Validate infrastructure setup:
$ ./gradlew :destination-{db}:compileKotlin
Expected: BUILD SUCCESSFUL
File: client/{DB}ColumnUtils.kt
package io.airbyte.integrations.destination.{db}.client
import io.airbyte.cdk.load.data.*
import jakarta.inject.Singleton
@Singleton
class {DB}ColumnUtils {
fun toDialectType(type: AirbyteType): String = when (type) {
BooleanType -> "BOOLEAN"
IntegerType -> "BIGINT"
NumberType -> "DECIMAL(38, 9)"
StringType -> "VARCHAR" // or TEXT
DateType -> "DATE"
TimeTypeWithTimezone -> "TIME WITH TIME ZONE"
TimeTypeWithoutTimezone -> "TIME"
TimestampTypeWithTimezone -> "TIMESTAMP WITH TIME ZONE"
TimestampTypeWithoutTimezone -> "TIMESTAMP"
is ArrayType, ArrayTypeWithoutSchema -> "JSONB" // or TEXT
is ObjectType, ObjectTypeWithEmptySchema, ObjectTypeWithoutSchema -> "JSONB"
is UnionType, is UnknownType -> "JSONB" // or VARCHAR as fallback
else -> "VARCHAR"
}
fun formatColumn(name: String, type: AirbyteType, nullable: Boolean): String {
val typeDecl = toDialectType(type)
val nullableDecl = if (nullable) "" else " NOT NULL"
return "\"$name\" $typeDecl$nullableDecl"
}
}
Database-specific type mapping:
| Database | String | Integer | Number | JSON | Timestamp | Nullable |
|---|---|---|---|---|---|---|
| Postgres | TEXT | BIGINT | DECIMAL(38,9) | JSONB | TIMESTAMPTZ | NULL suffix |
| MySQL | VARCHAR(65535) | BIGINT | DECIMAL(38,9) | JSON | TIMESTAMP | NULL suffix |
| Snowflake | VARCHAR | NUMBER(38,0) | FLOAT | VARIANT | TIMESTAMP_TZ | NULL suffix |
| ClickHouse | String | Int64 | Decimal(38,9) | String | DateTime64(3) | Nullable() wrapper |
| BigQuery | STRING | INT64 | NUMERIC | JSON | TIMESTAMP | nullable field |
Adjust toDialectType() for your database - Use table above as reference.
Validate compilation:
$ ./gradlew :destination-{db}:compileKotlin
File: client/{DB}SqlGenerator.kt
Why implement all operations now? SQL generation is pure logic with no I/O. All methods follow the same pattern (string building + logging). Natural to write together.
package io.airbyte.integrations.destination.{db}.client
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.schema.model.TableName
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
private val log = KotlinLogging.logger {}
// Extension function for SQL logging
fun String.andLog(): String {
log.info { this.trim() }
return this
}
@Singleton
class {DB}SqlGenerator(
private val columnUtils: {DB}ColumnUtils,
) {
// ========================================
// NAMESPACE OPERATIONS
// ========================================
fun createNamespace(namespace: String): String {
// Postgres/MySQL: CREATE SCHEMA
return "CREATE SCHEMA IF NOT EXISTS ${namespace.quote()}".andLog()
// Or for databases with CREATE DATABASE:
// return "CREATE DATABASE IF NOT EXISTS ${namespace.quote()}".andLog()
}
fun namespaceExists(namespace: String): String {
// Postgres/MySQL:
return """
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name = '$namespace'
""".trimIndent().andLog()
// ClickHouse:
// return """
// SELECT name
// FROM system.databases
// WHERE name = '$namespace'
// """.trimIndent().andLog()
// Or query your DB's system catalog
}
// ========================================
// TABLE OPERATIONS
// ========================================
fun createTable(
stream: DestinationStream,
tableName: TableName,
columnMapping: ColumnNameMapping,
replace: Boolean
): String {
val replaceClause = if (replace) "OR REPLACE " else ""
val columnDeclarations = stream.schema.asColumns()
.filter { (name, _) -> name !in AIRBYTE_META_COLUMNS }
.map { (name, type) ->
val mappedName = columnMapping[name]!!
columnUtils.formatColumn(mappedName, type.type, type.nullable)
}
.joinToString(",\n ")
return """
CREATE ${replaceClause}TABLE ${fullyQualifiedName(tableName)} (
"_airbyte_raw_id" VARCHAR NOT NULL,
"_airbyte_extracted_at" TIMESTAMP NOT NULL,
"_airbyte_meta" JSONB NOT NULL,
"_airbyte_generation_id" BIGINT,
$columnDeclarations
)
""".trimIndent().andLog()
}
fun dropTable(tableName: TableName): String {
return "DROP TABLE IF EXISTS ${fullyQualifiedName(tableName)}".andLog()
}
fun countTable(tableName: TableName): String {
return """
SELECT COUNT(*) AS count
FROM ${fullyQualifiedName(tableName)}
""".trimIndent().andLog()
}
fun getGenerationId(tableName: TableName): String {
return """
SELECT "_airbyte_generation_id" AS generation_id
FROM ${fullyQualifiedName(tableName)}
LIMIT 1
""".trimIndent().andLog()
}
// ========================================
// HELPER METHODS
// ========================================
private fun String.quote(): String {
// Postgres/Snowflake: double quotes
return "\"$this\""
// MySQL: backticks
// return "`$this`"
// SQL Server: square brackets
// return "[$this]"
}
private fun fullyQualifiedName(tableName: TableName): String {
return "${tableName.namespace.quote()}.${tableName.name.quote()}"
}
companion object {
private val AIRBYTE_META_COLUMNS = setOf(
"_airbyte_raw_id",
"_airbyte_extracted_at",
"_airbyte_meta",
"_airbyte_generation_id"
)
}
}
Key points:
Validate SQL strings (manual check):
# Copy-paste generated SQL to your database console
# Verify syntax is correct for your database
Validate compilation:
$ ./gradlew :destination-{db}:compileKotlin
File: client/{DB}AirbyteClient.kt
Why implement all operations now? Client just delegates to SqlGenerator + executes. Straightforward pattern repeated for each method.
package io.airbyte.integrations.destination.{db}.client
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.component.TableOperationsClient
import io.airbyte.cdk.load.component.TableSchemaEvolutionClient
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.integrations.destination.{db}.spec.{DB}Configuration
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import java.sql.SQLException
import javax.sql.DataSource
private val log = KotlinLogging.logger {}
@Singleton
class {DB}AirbyteClient(
private val dataSource: DataSource,
private val sqlGenerator: {DB}SqlGenerator,
private val config: {DB}Configuration,
) : TableOperationsClient, TableSchemaEvolutionClient {
// ========================================
// NAMESPACE OPERATIONS
// ========================================
override suspend fun createNamespace(namespace: String) {
execute(sqlGenerator.createNamespace(namespace))
}
override suspend fun namespaceExists(namespace: String): Boolean {
return dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
val rs = statement.executeQuery(sqlGenerator.namespaceExists(namespace))
rs.next() // Returns true if namespace exists
}
}
}
// ========================================
// TABLE OPERATIONS
// ========================================
override suspend fun createTable(
stream: DestinationStream,
tableName: TableName,
columnNameMapping: ColumnNameMapping,
replace: Boolean
) {
execute(sqlGenerator.createTable(stream, tableName, columnNameMapping, replace))
}
override suspend fun dropTable(tableName: TableName) {
execute(sqlGenerator.dropTable(tableName))
}
override suspend fun tableExists(table: TableName): Boolean {
return countTable(table) != null
}
override suspend fun countTable(tableName: TableName): Long? =
try {
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
val rs = statement.executeQuery(sqlGenerator.countTable(tableName))
if (rs.next()) rs.getLong("count") else 0L
}
}
} catch (e: SQLException) {
log.debug(e) { "Table ${tableName.toPrettyString()} does not exist. Returning null." }
null // Expected - table doesn't exist
}
override suspend fun getGenerationId(tableName: TableName): Long =
try {
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
val rs = statement.executeQuery(sqlGenerator.getGenerationId(tableName))
if (rs.next()) {
rs.getLong("generation_id") ?: 0L
} else {
0L
}
}
}
} catch (e: SQLException) {
log.debug(e) { "Failed to retrieve generation ID, returning 0" }
0L
}
// ========================================
// HELPER METHODS
// ========================================
private fun execute(sql: String) {
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
statement.execute(sql)
}
}
}
// ========================================
// STUB METHODS (Implement in later phases)
// ========================================
override suspend fun copyTable(
columnMapping: ColumnNameMapping,
source: TableName,
target: TableName
) = TODO("Phase 11: Implement copyTable")
override suspend fun overwriteTable(
source: TableName,
target: TableName
) = TODO("Phase 10: Implement overwriteTable")
override suspend fun upsertTable(
stream: DestinationStream,
columnMapping: ColumnNameMapping,
source: TableName,
target: TableName
) = TODO("Phase 13: Implement upsertTable")
override suspend fun discoverSchema(tableName: TableName) =
TODO("Phase 12: Implement discoverSchema")
override fun computeSchema(
stream: DestinationStream,
columnMapping: ColumnNameMapping
) = TODO("Phase 12: Implement computeSchema")
override suspend fun ensureSchemaMatches(
stream: DestinationStream,
tableName: TableName,
columnMapping: ColumnNameMapping
) = TODO("Phase 12: Implement ensureSchemaMatches")
override suspend fun applyChangeset(
stream: DestinationStream,
columnMapping: ColumnNameMapping,
tableName: TableName,
expectedColumns: Collection<Pair<String, io.airbyte.cdk.load.table.ColumnType>>,
changeset: io.airbyte.cdk.load.table.ColumnChangeset
) = TODO("Phase 12: Implement applyChangeset")
}
Key points:
log.debug { } not log.debug("")Register client in BeanFactory:
File: Update {DB}BeanFactory.kt
@Singleton
fun client(
dataSource: DataSource,
sqlGenerator: {DB}SqlGenerator,
config: {DB}Configuration,
): TableOperationsClient {
return {DB}AirbyteClient(dataSource, sqlGenerator, config)
}
Validate compilation:
$ ./gradlew :destination-{db}:compileKotlin
Expected: BUILD SUCCESSFUL with no errors
File: src/test-integration/kotlin/.../component/{DB}TestTableOperationsClient.kt
package io.airbyte.integrations.destination.{db}.component
import io.airbyte.cdk.load.component.TestTableOperationsClient
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.schema.model.TableName
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import java.sql.Date
import java.sql.PreparedStatement
import java.sql.Timestamp
import java.sql.Types
import javax.sql.DataSource
@Requires(env = ["component"])
@Singleton
class {DB}TestTableOperationsClient(
private val dataSource: DataSource,
) : TestTableOperationsClient {
override suspend fun ping() {
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
statement.executeQuery("SELECT 1")
}
}
}
override suspend fun dropNamespace(namespace: String) {
dataSource.connection.use { connection ->
connection.createStatement().use { statement ->
// Postgres/MySQL:
statement.execute("DROP SCHEMA IF EXISTS \"${namespace}\" CASCADE")
// Or for databases with DROP DATABASE:
// statement.execute("DROP DATABASE IF EXISTS `${namespace}`")
}
}
}
override suspend fun insertRecords(
table: TableName,
records: List<Map<String, AirbyteValue>>
) {
if (records.isEmpty()) return
dataSource.connection.use { connection ->
records.forEach { record ->
val columns = record.keys.joinToString(", ") { "\"$it\"" }
val placeholders = record.keys.joinToString(", ") { "?" }
val sql = """
INSERT INTO "${table.namespace}"."${table.name}" ($columns)
VALUES ($placeholders)
"""
connection.prepareStatement(sql).use { statement ->
record.values.forEachIndexed { index, value ->
setParameter(statement, index + 1, value)
}
statement.executeUpdate()
}
}
}
}
override suspend fun readTable(table: TableName): List<Map<String, Any>> {
val results = mutableListOf<Map<String, Any>>()
dataSource.connection.use { connection ->
val sql = "SELECT * FROM \"${table.namespace}\".\"${table.name}\""
connection.createStatement().use { statement ->
val rs = statement.executeQuery(sql)
val metadata = rs.metaData
while (rs.next()) {
val row = mutableMapOf<String, Any>()
for (i in 1..metadata.columnCount) {
val columnName = metadata.getColumnName(i)
val value = rs.getObject(i)
if (value != null) {
row[columnName] = value
}
}
results.add(row)
}
}
}
return results
}
private fun setParameter(statement: PreparedStatement, index: Int, value: AirbyteValue) {
when (value) {
is StringValue -> statement.setString(index, value.value)
is IntegerValue -> statement.setLong(index, value.value)
is NumberValue -> statement.setBigDecimal(index, value.value)
is BooleanValue -> statement.setBoolean(index, value.value)
is TimestampValue -> statement.setTimestamp(index, Timestamp.from(value.value))
is DateValue -> statement.setDate(index, Date.valueOf(value.value))
is ObjectValue -> statement.setString(index, value.toJson()) // JSON as string
is ArrayValue -> statement.setString(index, value.toJson()) // JSON as string
is NullValue -> statement.setNull(index, Types.VARCHAR)
else -> statement.setString(index, value.toString())
}
}
}
What this does:
For non-JDBC databases: Replace JDBC code with native client API calls.
File: src/test-integration/kotlin/.../component/{DB}TableOperationsTest.kt
Create test file with ALL tests enabled from the start:
package io.airbyte.integrations.destination.{db}.component
import io.airbyte.cdk.load.component.TableOperationsClient
import io.airbyte.cdk.load.component.TableOperationsSuite
import io.airbyte.cdk.load.component.TestTableOperationsClient
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.junit.jupiter.api.Test
@MicronautTest(environments = ["component"])
class {DB}TableOperationsTest(
override val client: TableOperationsClient,
override val testClient: TestTableOperationsClient,
) : TableOperationsSuite {
@Test
override fun `connect to database`() {
super.`connect to database`()
}
@Test
override fun `create and drop namespaces`() {
super.`create and drop namespaces`()
}
@Test
override fun `create and drop tables`() {
super.`create and drop tables`()
}
@Test
override fun `insert records`() {
super.`insert records`()
}
@Test
override fun `count table rows`() {
super.`count table rows`()
}
}
What each test validates:
Run all component tests:
$ ./gradlew :destination-{db}:componentTest
Expected output:
{DB}TableOperationsTest > connect to database PASSED
{DB}TableOperationsTest > create and drop namespaces PASSED
{DB}TableOperationsTest > create and drop tables PASSED
{DB}TableOperationsTest > insert records PASSED
{DB}TableOperationsTest > count table rows PASSED
SpecTest > spec matches expected PASSED
BUILD SUCCESSFUL in 15s
6 tests, 6 passed
Success criteria (ALL must be true):
If tests FAIL, debug systematically:
Symptom: Connection refused, authentication failed, or timeout
Check:
Fix:
# Verify container is running
$ docker ps | grep {db}
# Check container logs
$ docker logs <container-id>
# Try manual connection
$ {db-cli} -h localhost -p <port> -U test -d test
Symptom: SQL syntax error on CREATE SCHEMA or namespace query
Check:
Fix:
Symptom: SQL syntax error on CREATE TABLE
Check:
Fix:
Symptom: SQL syntax error on INSERT or type conversion error
Check:
Fix:
Symptom: COUNT query fails or returns wrong value
Check:
Fix:
Common issues across all tests:
Issue: "No bean of type [TableOperationsClient]"
Fix: Check BeanFactory has @Singleton fun client(...): TableOperationsClient
Verify: ./gradlew :destination-{db}:compileKotlin succeeds
Issue: "TODO not yet implemented"
Symptom: Test calls TODO() stub method
Fix: Verify you implemented all methods (not just namespace or table, but BOTH)
Check: Client has no TODO() in methods called by tests
Issue: SQL logged but not executed
Symptom: .andLog() works but SQL doesn't run
Fix: Verify execute() method actually calls statement.execute(sql)
Check: Add logging after execute to confirm it ran
✅ Checkpoint: Complete TableOperationsClient implementation
What you've achieved:
What's NOT done yet (later phases):
Goal: Implement --check operation to validate database connection and permissions
Why separate phase? Check is a different component (Checker) with integration test validation. Natural checkpoint after basic operations work.
Checkpoint: --check operation works
File: check/{DB}Checker.kt
package io.airbyte.integrations.destination.{db}.check
import io.airbyte.cdk.load.check.DestinationCheckerV2
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.integrations.destination.{db}.client.{DB}AirbyteClient
import io.airbyte.integrations.destination.{db}.spec.{DB}Configuration
import jakarta.inject.Singleton
import kotlinx.coroutines.runBlocking
import java.util.UUID
@Singleton
class {DB}Checker(
private val client: {DB}AirbyteClient,
private val config: {DB}Configuration,
) : DestinationCheckerV2 {
override fun check() {
val testNamespace = config.database
val testTableName = "_airbyte_connection_test_${UUID.randomUUID()}"
val tableName = TableName(testNamespace, testTableName)
runBlocking {
try {
// 1. Verify namespace exists or can be created
client.createNamespace(testNamespace)
// 2. Create test table with Airbyte metadata columns
val testStream = createTestStream()
val columnMapping = ColumnNameMapping(mapOf("test_col" to "test_col"))
client.createTable(testStream, tableName, columnMapping, replace = false)
// 3. Verify table was created (count should be 0)
val count = client.countTable(tableName)
require(count == 0L) { "Expected empty table, got $count rows" }
} finally {
// Always cleanup test table
client.dropTable(tableName)
}
}
}
private fun createTestStream(): DestinationStream {
return DestinationStream(
descriptor = DestinationStream.Descriptor(
namespace = config.database,
name = "test"
),
importType = DestinationStream.ImportType.APPEND,
schema = ObjectType(
properties = linkedMapOf(
"test_col" to FieldType(StringType, nullable = true)
)
),
generationId = 0,
minimumGenerationId = 0,
syncId = 0,
)
}
}
What check() does:
Why this validates the connection:
File: src/test-integration/kotlin/.../check/{DB}CheckTest.kt
package io.airbyte.integrations.destination.{db}.check
import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.{db}.spec.{DB}Specification
import java.nio.file.Path
class {DB}CheckTest :
CheckIntegrationTest<{DB}Specification>(
successConfigFilenames = listOf(
CheckTestConfig(configPath = Path.of("secrets/config.json")),
),
failConfigFilenamesAndFailureReasons = emptyMap(),
)
What this test does:
--check operation with real configConfig file location: secrets/config.json (same as manual testing)
Run check integration test:
$ ./gradlew :destination-{db}:integrationTestCheckSuccessConfigs
Expected output:
{DB}CheckTest > testSuccessConfigs[0] PASSED
BUILD SUCCESSFUL in 5s
Run full test suite (regression check):
$ ./gradlew :destination-{db}:integrationTest
Expected tests passing:
SpecTest > spec matches expected ✅ (from Phase 1){DB}CheckTest > testSuccessConfigs[0] ✅ (new in Phase 3)Success criteria:
If check test FAILS:
Symptom: "Permission denied" or "Access denied"
Fix: Verify database user has CREATE/DROP privileges
Test: Run CREATE SCHEMA manually with test credentials
Symptom: "Table not created" (count != 0)
Fix: Check CREATE TABLE SQL in logs
Verify: Metadata column types are correct for your database
Symptom: Test table not cleaned up
Fix: Check finally block executes even if test fails
Verify: dropTable() doesn't throw exception
✅ Checkpoint: Database Phase 2 complete - Check operation working
What you've achieved:
Continue to: 3-write-infrastructure.md
What's next:
Current progress: