connector-writer/destination/preflight-checklist.md
Summary: Essential database knowledge required before implementing a destination connector. Complete this checklist to ensure you have the information needed for the 4 core components (SQL Generator, Database Client, Insert Buffer, Column Utilities).
Time to complete: 2-4 hours of research for unfamiliar database
For SQL Generator + Database Client
jdbc:db://host:port/database or custom URI?Quick validation:
// Can you write this code?
val connection = /* create connection */
connection.execute("SELECT 1")
connection.close()
Quick validation:
# Can you connect with test credentials?
{database-cli} -h localhost -u testuser -p testpass -d testdb -c "SELECT 1"
For SQL Generator: createNamespace(), namespaceExists()
schema.table or database.table or just table?CREATE SCHEMA, CREATE DATABASE, or N/A?information_schema, system catalog, or API?DROP SCHEMA CASCADE? Restrictions?Quick validation:
-- Can you write these queries?
CREATE SCHEMA test_schema;
SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'test_schema';
DROP SCHEMA test_schema CASCADE;
For SQL Generator: createTable(), dropTable(), tableExists(), countTable()
DROP TABLE IF EXISTS?SELECT COUNT(*)? Performance considerations?DESCRIBE, information_schema.columns, client API?Quick validation:
-- Can you write these queries?
CREATE TABLE test_table (id BIGINT, name VARCHAR);
SELECT table_name FROM information_schema.tables WHERE table_name = 'test_table';
SELECT COUNT(*) FROM test_table;
SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'test_table';
DROP TABLE IF EXISTS test_table;
ALTER TABLE RENAME? SWAP WITH? EXCHANGE TABLES?Quick validation:
-- Can you atomically swap tables?
CREATE TABLE old_table (id INT);
CREATE TABLE new_table (id INT);
ALTER TABLE old_table SWAP WITH new_table; -- Or your DB's syntax
For Column Utilities: toDialectType()
Quick validation:
-- Can you create table with all these types?
CREATE TABLE type_test (
str_col VARCHAR,
int_col BIGINT,
dec_col DECIMAL(38,9),
bool_col BOOLEAN,
date_col DATE,
time_col TIME WITH TIME ZONE,
ts_col TIMESTAMP WITH TIME ZONE,
json_col JSONB,
arr_col JSONB, -- or native array type
bin_col BYTEA
);
NULL/NOT NULL suffix? Nullable() wrapper?Quick validation:
CREATE TABLE nullable_test (
nullable_col VARCHAR,
not_null_col VARCHAR NOT NULL
);
Required types:
CREATE TABLE airbyte_test (
_airbyte_raw_id VARCHAR NOT NULL, -- UUID as string
_airbyte_extracted_at TIMESTAMP NOT NULL, -- Extraction time
_airbyte_meta JSONB NOT NULL, -- Metadata
_airbyte_generation_id BIGINT -- Generation tracking
);
For Insert Buffer: accumulate(), flush()
INSERT INTO t VALUES (...), (...)? Row limit?COPY FROM, LOAD DATA, bulk API?Quick validation:
-- Can you insert multiple rows at once?
INSERT INTO test_table (id, name) VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie');
-- Or does your DB prefer COPY/bulk API?
COPY test_table FROM '/path/to/data.csv' WITH (FORMAT CSV);
Decision needed:
For SQL Generator: upsertTable()
Quick validation:
-- Test 1: Can you upsert with MERGE?
MERGE INTO target USING source ON target.pk = source.pk
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;
-- Test 2: Or INSERT ON CONFLICT?
INSERT INTO target VALUES (...)
ON CONFLICT (pk) DO UPDATE SET ...;
-- Test 3: Or manual approach?
DELETE FROM target WHERE pk IN (SELECT pk FROM source);
INSERT INTO target SELECT * FROM source;
Quick validation:
-- Can you deduplicate with window function?
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY pk
ORDER BY updated_at DESC
) AS rn
FROM table
) WHERE rn = 1;
For SQL Generator: alterTable(), Client: discoverSchema(), computeSchema()
Quick validation:
CREATE TABLE schema_test (id INT);
-- Add column
ALTER TABLE schema_test ADD COLUMN name VARCHAR;
-- Drop column
ALTER TABLE schema_test DROP COLUMN name;
-- Change type
ALTER TABLE schema_test ALTER COLUMN id TYPE BIGINT;
-- Change nullable
ALTER TABLE schema_test ALTER COLUMN id DROP NOT NULL;
information_schema.columns? DESCRIBE TABLE? Client API?Quick validation:
-- Can you introspect schema?
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'schema_test';
For SQL Generator: All methods
Quick validation:
-- What happens here?
CREATE TABLE MyTable (MyColumn INT);
SELECT * FROM mytable; -- Does this work?
SELECT * FROM "MyTable"; -- Does this preserve case?
Quick validation:
-- Can you use reserved keywords with quoting?
CREATE TABLE "order" ("user" VARCHAR);
For Database Client: Error classification
Quick validation:
// Can you classify errors?
try {
connection.execute("INSERT INTO t VALUES (1, 1)") // Duplicate PK
connection.execute("INSERT INTO t VALUES (1, 2)") // Duplicate PK
} catch (e: SQLException) {
println("SQL State: ${e.sqlState}") // 23505?
println("Error code: ${e.errorCode}")
println("Message: ${e.message}")
}
For all phases
{database}:latest? Specific version?Answer these to assess readiness:
| Question | Your Answer | Needed For |
|---|---|---|
| JDBC driver or native client? | Phase 1 | |
| Connection string format? | Phase 1 | |
| CREATE SCHEMA or CREATE DATABASE? | Phase 2 | |
| CREATE TABLE syntax? | Phase 3 | |
| String type (VARCHAR/TEXT/STRING)? | Phase 3 | |
| Integer type (INT/BIGINT/NUMBER)? | Phase 3 | |
| JSON type (JSONB/JSON/VARIANT)? | Phase 3 | |
| Timestamp type with timezone? | Phase 3 | |
| Has MERGE or INSERT ON CONFLICT? | Phase 9 | |
| Has window functions (ROW_NUMBER)? | Phase 9 | |
| Has ALTER TABLE ADD/DROP COLUMN? | Phase 8 | |
| System catalog for introspection? | Phase 8 | |
| Atomic table swap/exchange? | Phase 6 | |
| Optimal batch size for inserts? | Phase 5 |
Use this to document your findings:
# {Database} Connector Preflight Research
## 1. Connection & Client
- **Driver:** [JDBC / Native / HTTP API]
- **Maven coordinates:** `group:artifact:version`
- **Connection string:** `protocol://host:port/database?options`
- **Auth method:** [username/password / API key / other]
- **Connection pooling:** [HikariCP / built-in / manual]
## 2. Namespaces
- **Concept:** [Schema / Database / None]
- **Create:** `CREATE SCHEMA name` or `CREATE DATABASE name`
- **Check exists:** `SELECT ... FROM information_schema.schemata`
- **Drop:** `DROP SCHEMA name CASCADE`
## 3. Tables
- **Create:** `CREATE TABLE schema.table (cols...)`
- **Check exists:** `SELECT ... FROM information_schema.tables`
- **Drop:** `DROP TABLE IF EXISTS schema.table`
- **Describe:** `DESCRIBE TABLE` or `SELECT ... FROM information_schema.columns`
- **Swap:** `ALTER TABLE ... SWAP WITH ...` or recreation needed
## 4. Type Mapping
| Airbyte Type | Database Type | Notes |
|--------------|---------------|-------|
| String | VARCHAR / TEXT | Length limit? |
| Integer | BIGINT | Range? |
| Number | DECIMAL(38,9) | Precision? |
| Boolean | BOOLEAN | Or TINYINT? |
| Date | DATE | Format? |
| Timestamp+TZ | TIMESTAMPTZ | Precision? |
| JSON | JSONB | Or JSON / TEXT? |
| Array | JSONB | Or native array? |
## 5. Bulk Insert
- **Best method:** [Multi-row INSERT / COPY / Staging / Bulk API]
- **Batch size:** [1000 / 10000 / custom]
- **Compression:** [GZIP / LZ4 / None]
- **Example:** `COPY table FROM file WITH (...)`
## 6. Upsert
- **Method:** [MERGE / INSERT ON CONFLICT / REPLACE / Manual DELETE+INSERT]
- **Syntax:** `MERGE INTO ... USING ... ON ... WHEN MATCHED ...`
- **Window functions:** [ROW_NUMBER OVER supported? YES/NO]
## 7. Schema Evolution
- **ADD COLUMN:** `ALTER TABLE t ADD COLUMN c type`
- **DROP COLUMN:** `ALTER TABLE t DROP COLUMN c`
- **CHANGE TYPE:** `ALTER TABLE t ALTER COLUMN c TYPE newtype` or temp column approach
- **Discover schema:** `SELECT ... FROM information_schema.columns`
## 8. Identifiers
- **Case:** [Lowercase / Uppercase / Preserve]
- **Quoting:** [" / ` / [] ]
- **Reserved keywords:** [List common ones]
- **Max length:** [63 / 255 / other]
## 9. Errors
- **SQL State codes:** [23505 = unique violation, etc.]
- **Exception type:** [SQLException / DatabaseException / custom]
- **Permission errors:** [Pattern in error message]
## 10. Testing
- **Testcontainers:** `{database}Container("{db}:latest")`
- **Or local:** [Installation command]
- **Default port:** [5432 / 3306 / etc.]
Before starting Phase 1, can you answer YES to all?
If you answer YES to any, plan workarounds:
| Database Familiarity | Research Time | Total Implementation |
|---|---|---|
| Expert (daily use) | 30 min | 3-4 days |
| Familiar (used before) | 2-3 hours | 4-5 days |
| New (never used) | 4-8 hours | 5-7 days |
| Exotic (limited docs) | 8-16 hours | 7-10 days |
Recommendation: Spend the research time upfront. It pays off during implementation.
If you can answer all Critical questions: → Proceed to Phase 0 (Scaffolding) in step-by-step-guide.md
If you're missing Important knowledge: → Research those areas first - they're needed by Phase 6-9
If you're missing Nice to Have knowledge: → Start implementation anyway - research these as needed during development
If you hit Red Flags: → Review implementation-reference.md for alternative approaches → Consider asking in Airbyte community if workarounds exist
Complete these research tasks (2-4 hours):
Output: Research document with all findings
Then: Proceed to step-by-step-guide.md Phase 0
Estimated effort to implement: 3-7 days depending on database familiarity