docs/en/architecture/api-design/sink-architecture.md
Writing data to external systems in distributed environments presents critical challenges:
SeaTunnel's Sink API aims to:
┌────────────────────────────────────────────────────────────────┐
│ TaskExecutionService (Worker Side) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkWriter<IN, CommitInfoT, StateT> │ │
│ │ │ │
│ │ • Receive records from upstream │ │
│ │ • Buffer and write data │ │
│ │ • Produce commitInfo at checkpoint boundary │ │
│ │ • Snapshot writer state │ │
│ │ • Cleanup/rollback on failure (engine-dependent) │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
└────────────────────────────┼─────────────────────────────────────┘
│ (CommitInfo)
▼
┌────────────────────────────────────────────────────────────────┐
│ Coordinator Side (control plane, engine-dependent) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkCommitter<CommitInfoT> (Optional) │ │
│ │ │ │
│ │ • Receive commit infos from multiple writers │ │
│ │ • Commit each writer's changes independently │ │
│ │ • Retry failed commits │ │
│ │ • Must be idempotent │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ │ (Optional: AggregatedCommitInfo) │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkAggregatedCommitter<CommitInfoT, │ │
│ │ AggregatedCommitInfoT> │ │
│ │ (Optional) │ │
│ │ │ │
│ │ • Aggregate commit infos from all writers │ │
│ │ • Perform single global commit operation │ │
│ │ • Single-threaded, global coordinator │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
│
▼
External Data Sink
(Database / File / Message Queue)
The top-level interface that serves as a factory for creating writers and committers.
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
extends Serializable {
/**
* Create SinkWriter (called on worker)
*/
SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context)
throws IOException;
/**
* Restore SinkWriter from checkpoint (called on worker)
*/
default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(
SinkWriter.Context context,
List<StateT> states) throws IOException {
return createWriter(context);
}
/**
* Serializer for writer state (optional).
*/
default Optional<Serializer<StateT>> getWriterStateSerializer() {
return Optional.empty();
}
/**
* Create SinkCommitter (optional, trigger location depends on execution engine)
*/
default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
return Optional.empty();
}
/**
* Serializer for commit info (optional).
*/
default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
return Optional.empty();
}
/**
* Create SinkAggregatedCommitter (optional).
*/
default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>>
createAggregatedCommitter() throws IOException {
return Optional.empty();
}
/**
* Serializer for aggregated commit info (optional).
*/
default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
return Optional.empty();
}
/**
* Get input schema.
*/
default Optional<CatalogTable> getWriteCatalogTable() {
return Optional.empty();
}
}
Key Design Points:
sequenceDiagram
participant CP as CheckpointCoordinator
participant Writer1 as SinkWriter 1
participant Writer2 as SinkWriter 2
participant Committer as SinkCommitter
participant Sink as External Sink
Writer1->>Writer1: write(record)
Writer2->>Writer2: write(record)
CP->>Writer1: triggerBarrier(checkpointId)
CP->>Writer2: triggerBarrier(checkpointId)
Writer1->>Writer1: prepareCommit(checkpointId)
Writer1->>CP: ack(commitInfo1)
Writer2->>Writer2: prepareCommit(checkpointId)
Writer2->>CP: ack(commitInfo2)
CP->>CP: All writers acked
CP->>CP: Persist checkpoint
CP->>Committer: commit([commitInfo1, commitInfo2])
Committer->>Sink: Commit writer1 changes
Committer->>Sink: Commit writer2 changes
Committer->>CP: ack()
Note over Writer1,Writer2: Framework may notify checkpoint completion for cleanup (engine-dependent)
sequenceDiagram
participant CP as CheckpointCoordinator
participant Writer as SinkWriter
participant Committer as SinkCommitter
participant Sink as External Sink
Writer->>Writer: prepareCommit(checkpointId)
Writer->>CP: ack(commitInfo)
CP->>Writer: [Failure - writer crashes]
CP->>CP: Checkpoint fails
CP->>CP: Restore from previous checkpoint
CP->>Writer: restoreWriter(previousState)
Writer->>Writer: Replay records from checkpoint
Writer->>Writer: prepareCommit(checkpointId)
Writer->>CP: ack(commitInfo)
CP->>Committer: commit([commitInfo])
Committer->>Sink: Commit (idempotent)
Committer-->>Sink: [Commit fails due to network]
Committer->>Committer: Retry
Committer->>Sink: Commit (idempotent)
Sink-->>Committer: Success
Note over Writer,Committer: Framework may notify checkpoint completion for cleanup (engine-dependent)
The writer runs on workers and performs actual data writing.
public interface SinkWriter<IN, CommitInfoT, StateT> {
/**
* Write single record
*/
void write(IN element) throws IOException;
/**
* Prepare commit info during checkpoint.
*
* Guideline: do not make data externally visible in this phase.
*/
Optional<CommitInfoT> prepareCommit(long checkpointId) throws IOException;
/**
* Abort prepared commit if checkpoint fails
*/
void abortPrepare();
/**
* Snapshot writer state for checkpoint
*/
List<StateT> snapshotState(long checkpointId) throws IOException;
/**
* Close writer
*/
void close() throws IOException;
/**
* Context for interacting with framework
*/
interface Context {
int getIndexOfSubtask();
MetricsContext getMetricsContext();
}
}
Critical Requirements:
prepareCommit(checkpointId) should not make data externally visible (commit is done in SinkCommitter / SinkAggregatedCommitter)prepareCommit(checkpointId) returns commit info that will be passed to committersnapshotState() must capture all uncommitted writesabortPrepare() is only used by Spark when prepareCommit(...) fails by throwing an exceptionImplementation Example (JDBC with XA Transactions):
public class JdbcExactlyOnceSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, Void> {
private final XAConnection xaConnection;
private final XAResource xaResource;
private final Connection connection;
private final PreparedStatement statement;
private final List<Xid> pendingXids = new ArrayList<>();
@Override
public void write(SeaTunnelRow element) throws IOException {
try {
// Start XA transaction if needed
if (currentXid == null) {
currentXid = generateXid();
xaResource.start(currentXid, XAResource.TMNOFLAGS);
}
// Execute INSERT (buffered in transaction)
setParameters(statement, element);
statement.executeUpdate();
} catch (SQLException e) {
throw new IOException("Failed to write record", e);
}
}
@Override
public Optional<XidInfo> prepareCommit(long checkpointId) throws IOException {
if (currentXid == null) {
return Optional.empty(); // No data written
}
try {
// End XA transaction
xaResource.end(currentXid, XAResource.TMSUCCESS);
// Prepare XA transaction (FIRST PHASE - no side effects yet)
xaResource.prepare(currentXid);
// Return XID for committer
XidInfo xidInfo = new XidInfo(currentXid);
pendingXids.add(currentXid);
currentXid = null;
return Optional.of(xidInfo);
} catch (XAException e) {
throw new IOException("Failed to prepare XA transaction", e);
}
}
@Override
public void abortPrepare() {
// Rollback prepared transaction
if (currentXid != null) {
try {
xaResource.rollback(currentXid);
} catch (XAException e) {
LOG.error("Failed to rollback XA transaction", e);
}
}
}
@Override
public List<Void> snapshotState(long checkpointId) {
// For XA, state is managed by database
return Collections.emptyList();
}
}
Implementation Example (File Sink with Atomic Rename):
public class FileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileWriterState> {
private final String tempFilePath;
private final String finalFilePath;
private final OutputStream outputStream;
private long bytesWritten = 0;
@Override
public void write(SeaTunnelRow element) throws IOException {
// Write to temporary file
byte[] bytes = serialize(element);
outputStream.write(bytes);
bytesWritten += bytes.length;
}
@Override
public Optional<FileCommitInfo> prepareCommit(long checkpointId) throws IOException {
// Flush and close temp file (no rename yet!)
outputStream.flush();
outputStream.close();
// Return commit info for committer to rename file
return Optional.of(new FileCommitInfo(tempFilePath, finalFilePath));
}
@Override
public void abortPrepare() {
// Delete temporary file
new File(tempFilePath).delete();
}
@Override
public List<FileWriterState> snapshotState(long checkpointId) {
// Save current write position
return Collections.singletonList(new FileWriterState(bytesWritten));
}
}
The committer runs on master and coordinates commits from multiple writers.
public interface SinkCommitter<CommitInfoT> extends Closeable {
/**
* Commit multiple commit infos (from multiple writers or retries)
* MUST be idempotent - may be called multiple times with same commitInfo
*/
List<CommitInfoT> commit(List<CommitInfoT> commitInfos) throws IOException;
/**
* Abort commit infos (optional)
*/
default void abort(List<CommitInfoT> commitInfos) throws IOException {}
/**
* Close committer
*/
void close() throws IOException;
}
Critical Requirements:
commit() MUST be idempotent (calling twice with same commitInfo should be safe)Implementation Example (JDBC XA Committer):
public class JdbcSinkCommitter implements SinkCommitter<XidInfo> {
private final XADataSource xaDataSource;
@Override
public List<XidInfo> commit(List<XidInfo> commitInfos) throws IOException {
List<XidInfo> failed = new ArrayList<>();
for (XidInfo xidInfo : commitInfos) {
try {
XAConnection xaConn = xaDataSource.getXAConnection();
XAResource xaResource = xaConn.getXAResource();
// SECOND PHASE: Commit prepared transaction
xaResource.commit(xidInfo.getXid(), false);
xaConn.close();
} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
// Transaction already committed (idempotent)
LOG.info("XA transaction already committed: {}", xidInfo.getXid());
} else {
// Commit failed, will retry
LOG.error("Failed to commit XA transaction: {}", xidInfo.getXid(), e);
failed.add(xidInfo);
}
}
}
return failed; // Framework will retry failed commits
}
@Override
public void abort(List<XidInfo> commitInfos) {
// Rollback prepared transactions
for (XidInfo xidInfo : commitInfos) {
try {
XAConnection xaConn = xaDataSource.getXAConnection();
xaConn.getXAResource().rollback(xidInfo.getXid());
xaConn.close();
} catch (Exception e) {
LOG.error("Failed to rollback XA transaction", e);
}
}
}
}
Implementation Example (File Committer with Atomic Rename):
public class FileSinkCommitter implements SinkCommitter<FileCommitInfo> {
private final FileSystem fileSystem;
@Override
public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos) {
List<FileCommitInfo> failed = new ArrayList<>();
for (FileCommitInfo commitInfo : commitInfos) {
try {
Path tempPath = new Path(commitInfo.getTempFilePath());
Path finalPath = new Path(commitInfo.getFinalFilePath());
// Atomic rename (commit)
if (fileSystem.exists(finalPath)) {
// File already committed (idempotent)
LOG.info("File already exists, skipping: {}", finalPath);
fileSystem.delete(tempPath, false); // Clean up temp file
} else {
boolean success = fileSystem.rename(tempPath, finalPath);
if (!success) {
failed.add(commitInfo);
}
}
} catch (IOException e) {
LOG.error("Failed to commit file: {}", commitInfo, e);
failed.add(commitInfo);
}
}
return failed;
}
}
The aggregated committer performs single global commit for all writers.
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
extends Closeable {
/**
* Combine commit infos from multiple writers into single aggregated info
*/
AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
/**
* Commit aggregated info (single global operation)
* MUST be idempotent
*/
List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfos)
throws IOException;
/**
* Abort aggregated commit infos
*/
default void abort(List<AggregatedCommitInfoT> aggregatedCommitInfos) throws IOException {}
/**
* Restore committer state from checkpoint
*/
default void restoreCommit(List<AggregatedCommitInfoT> aggregatedCommitInfos)
throws IOException {}
/**
* Close committer
*/
void close() throws IOException;
}
Use Cases:
Implementation Example (Hive Sink):
public class HiveAggregatedCommitter
implements SinkAggregatedCommitter<HiveWriteInfo, HiveCommitInfo> {
@Override
public HiveCommitInfo combine(List<HiveWriteInfo> commitInfos) {
// Collect all written files across all writers
List<String> allFiles = new ArrayList<>();
for (HiveWriteInfo writeInfo : commitInfos) {
allFiles.addAll(writeInfo.getWrittenFiles());
}
return new HiveCommitInfo(allFiles);
}
@Override
public List<HiveCommitInfo> commit(List<HiveCommitInfo> aggregatedCommitInfos) {
List<HiveCommitInfo> failed = new ArrayList<>();
for (HiveCommitInfo commitInfo : aggregatedCommitInfos) {
try {
// Single global commit for entire table
hiveMetastore.beginTransaction();
for (String file : commitInfo.getAllFiles()) {
hiveMetastore.addPartitionFile(tableName, file);
}
hiveMetastore.commitTransaction(); // Global atomic commit
} catch (Exception e) {
LOG.error("Failed to commit to Hive", e);
hiveMetastore.rollbackTransaction();
failed.add(commitInfo);
}
}
return failed;
}
}
API Interfaces:
Example Implementations:
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/Pros:
Cons:
When to Use:
When Not to Use:
Two-Tier (Writer → Committer):
Three-Tier (Writer → Committer → AggregatedCommitter):
public class BatchSinkWriter {
private final List<SeaTunnelRow> batch = new ArrayList<>();
private static final int BATCH_SIZE = 1000;
@Override
public void write(SeaTunnelRow element) {
batch.add(element);
if (batch.size() >= BATCH_SIZE) {
flushBatch();
}
}
private void flushBatch() {
// Write entire batch in single operation
statement.executeBatch();
batch.clear();
}
}
Benefits:
public class AsyncSinkWriter {
private final BlockingQueue<CompletableFuture<Void>> pendingWrites = new LinkedBlockingQueue<>();
@Override
public void write(SeaTunnelRow element) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Async write operation
actualWrite(element);
}, executorService);
pendingWrites.add(future);
}
@Override
public Optional<CommitInfo> prepareCommit(long checkpointId) {
// Wait for all pending writes to complete
for (CompletableFuture<Void> future : pendingWrites) {
future.join();
}
pendingWrites.clear();
return Optional.of(createCommitInfo());
}
}
public class JdbcSinkWriter {
private final HikariDataSource dataSource;
@Override
public void write(SeaTunnelRow element) {
try (Connection conn = dataSource.getConnection()) {
// Reuse pooled connections
PreparedStatement stmt = conn.prepareStatement(sql);
stmt.executeUpdate();
}
}
}
// INSERT ON DUPLICATE KEY UPDATE (MySQL)
String sql = "INSERT INTO table (id, name) VALUES (?, ?) " +
"ON DUPLICATE KEY UPDATE name = VALUES(name)";
// MERGE INTO (Oracle, SQL Server)
String sql = "MERGE INTO table USING (SELECT ? as id, ? as name FROM dual) src " +
"ON (table.id = src.id) " +
"WHEN MATCHED THEN UPDATE SET table.name = src.name " +
"WHEN NOT MATCHED THEN INSERT (id, name) VALUES (src.id, src.name)";
public class KafkaSinkWriter {
@Override
public void write(SeaTunnelRow element) {
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
element.getField(0).toString(), // Key for deduplication
element.toString()
);
// Kafka deduplicates based on (topic, partition, offset, idempotent producer)
producer.send(record);
}
}
public class JdbcCommitter {
@Override
public List<XidInfo> commit(List<XidInfo> commitInfos) {
for (XidInfo xidInfo : commitInfos) {
String xidString = xidInfo.getXid().toString();
// Check if already committed
boolean exists = checkCommitTable(xidString);
if (exists) {
LOG.info("XID already committed: {}", xidString);
continue; // Idempotent
}
// Commit transaction
xaResource.commit(xidInfo.getXid(), false);
// Record commit
insertCommitTable(xidString, System.currentTimeMillis());
}
}
}
1. Choose Appropriate Commit Level
// Simple sink: Writer only (at-least-once)
public class SimpleSink implements SeaTunnelSink<...> {
SinkWriter createWriter(...) { return new SimpleWriter(); }
// No committer - data written directly
}
// Transactional sink: Writer + Committer (exactly-once)
public class TransactionalSink implements SeaTunnelSink<...> {
SinkWriter createWriter(...) { return new TransactionalWriter(); }
Optional<SinkCommitter> createCommitter() { return Optional.of(new Committer()); }
}
// Table sink: Writer + Committer + AggregatedCommitter
public class TableSink implements SeaTunnelSink<...> {
SinkWriter createWriter(...) { return new TableWriter(); }
Optional<SinkCommitter> createCommitter() { return Optional.of(new Committer()); }
Optional<SinkAggregatedCommitter> createAggregatedCommitter() {
return Optional.of(new AggregatedCommitter());
}
}
2. Proper State Management
public class StatefulSinkWriter {
private long recordsWritten = 0;
private long bytesWritten = 0;
@Override
public List<WriterState> snapshotState(long checkpointId) {
return Collections.singletonList(
new WriterState(recordsWritten, bytesWritten)
);
}
public StatefulSinkWriter restoreState(List<WriterState> states) {
if (!states.isEmpty()) {
WriterState state = states.get(0);
this.recordsWritten = state.getRecordsWritten();
this.bytesWritten = state.getBytesWritten();
}
return this;
}
}
3. Resource Management
@Override
public void close() throws IOException {
// Close in reverse order of creation
if (statement != null) statement.close();
if (connection != null) connection.close();
if (dataSource != null) dataSource.close();
}
1. Side Effects in prepareCommit(checkpointId)
// ❌ BAD: Actual commit in prepareCommit(checkpointId)
public Optional<CommitInfo> prepareCommit(long checkpointId) {
connection.commit(); // WRONG! This is a side effect!
return Optional.of(new CommitInfo());
}
// ✅ GOOD: Only prepare, no side effects
public Optional<CommitInfo> prepareCommit(long checkpointId) {
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid); // Prepare only, no commit yet
return Optional.of(new XidInfo(xid));
}
2. Non-Idempotent Commit
// ❌ BAD: Direct INSERT (not idempotent)
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
for (CommitInfo info : commitInfos) {
executeInsert(info); // May fail if called twice!
}
}
// ✅ GOOD: UPSERT (idempotent)
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
for (CommitInfo info : commitInfos) {
executeUpsert(info); // Safe to call multiple times
}
}
3. Large State
// ❌ BAD: Buffer all records in state
public class BadWriter {
private List<SeaTunnelRow> bufferedRows = new ArrayList<>(); // May be huge!
public List<State> snapshotState() {
return Collections.singletonList(new State(bufferedRows));
}
}
// ✅ GOOD: Flush before checkpoint, track metadata only
public class GoodWriter {
private long lastCommittedOffset = 0;
public Optional<CommitInfo> prepareCommit(long checkpointId) {
flushBufferedRows(); // Write to external system
return Optional.of(new CommitInfo(lastCommittedOffset));
}
}
1. Enable XA Transaction Logging
// Log XA operations for debugging
LOG.info("Starting XA transaction: {}", xid);
xaResource.start(xid, XAResource.TMNOFLAGS);
LOG.info("Preparing XA transaction: {}", xid);
xaResource.prepare(xid);
LOG.info("Committing XA transaction: {}", xid);
xaResource.commit(xid, false);
2. Track Commit Progress
public class MonitoredCommitter {
private final Counter commitAttempts = metricGroup.counter("commit_attempts");
private final Counter commitSuccesses = metricGroup.counter("commit_successes");
private final Counter commitFailures = metricGroup.counter("commit_failures");
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
commitAttempts.inc(commitInfos.size());
List<CommitInfo> failed = new ArrayList<>();
for (CommitInfo info : commitInfos) {
try {
doCommit(info);
commitSuccesses.inc();
} catch (Exception e) {
commitFailures.inc();
failed.add(info);
}
}
return failed;
}
}
3. Test Failure Scenarios
@Test
public void testCheckpointFailureRecovery() {
// Write data
writer.write(row1);
writer.write(row2);
// Prepare commit
Optional<CommitInfo> commitInfo = writer.prepareCommit(checkpointId);
// Simulate checkpoint failure
writer.abortPrepare();
// Verify no data committed
assertFalse(dataExistsInSink());
// Restore and retry
writer.write(row1);
writer.write(row2);
commitInfo = writer.prepareCommit(checkpointId);
// Commit should succeed
committer.commit(Collections.singletonList(commitInfo.get()));
assertTrue(dataExistsInSink());
}