docs/en/architecture/api-design/source-architecture.md
Data sources in distributed systems present several challenges:
SeaTunnel's Source API aims to:
┌──────────────────────────────────────────────────────────────┐
│ Coordinator (master/coordinator side) │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ SourceSplitEnumerator<SplitT, StateT> │ │
│ │ │ │
│ │ • Discover/generate splits in run() (impl-defined) │ │
│ │ • Assign splits to readers │ │
│ │ • Handle reader registration │ │
│ │ • Handle split requests │ │
│ │ • Reclaim splits from failed readers │ │
│ │ • Snapshot enumerator state │ │
│ │ • Send/receive custom events │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
└────────────────────────────┼───────────────────────────────────┘
│ (Split Assignment)
▼
┌──────────────────────────────────────────────────────────────┐
│ TaskExecutionService (Worker Side) │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ SourceReader<T, SplitT> │ │
│ │ │ │
│ │ • Receive assigned splits │ │
│ │ • Read data from splits │ │
│ │ • Emit records downstream │ │
│ │ • Snapshot reader state (split progress) │ │
│ │ • Handle split completion │ │
│ │ • Send/receive custom events │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
└────────────────────────────┼───────────────────────────────────┘
│
▼
SeaTunnelRow
(to Transform/Sink)
The top-level interface that serves as a factory for creating readers and enumerators.
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends Serializable>
extends Serializable {
/**
* Get source boundedness (BOUNDED for batch, UNBOUNDED for streaming)
*/
Boundedness getBoundedness();
/**
* Create SourceReader (called on worker)
*/
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;
/**
* Split serializer used for network transfer and checkpointing.
*/
Serializer<SplitT> getSplitSerializer();
/**
* Create SourceSplitEnumerator (called on master)
*/
SourceSplitEnumerator<SplitT, StateT> createEnumerator(
SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
/**
* Restore SourceSplitEnumerator from checkpoint (called on master)
*/
SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(
SourceSplitEnumerator.Context<SplitT> enumeratorContext,
StateT checkpointState) throws Exception;
/**
* Enumerator-state serializer used for checkpointing.
*/
Serializer<StateT> getEnumeratorStateSerializer();
/**
* Get output schema (CatalogTable list, supports multi-table)
*/
List<CatalogTable> getProducedCatalogTables();
}
Key Methods:
getBoundedness(): Indicates if source is bounded (batch) or unbounded (stream)createReader(): Factory for reader instances (one per worker task)createEnumerator(): Factory for enumerator (single instance on master)restoreEnumerator(): Restore enumerator from checkpoint stategetProducedCatalogTables(): Defines output schema (supports multi-table)getSplitSerializer() / getEnumeratorStateSerializer(): Split/enumerator-state serializers for network transfer and checkpointingRepresents a partitionable unit of data.
public interface SourceSplit extends Serializable {
/**
* Unique identifier for this split
*/
String splitId();
}
Implementation Examples:
// File-based split
public class FileSplit implements SourceSplit {
private final String splitId;
private final String filePath;
private final long startOffset;
private final long length;
}
// JDBC-based split (query range)
public class JdbcSourceSplit implements SourceSplit {
private final String splitId;
private final String query;
private final Object[] queryParams;
}
// Kafka-based split (partition)
public class KafkaSourceSplit implements SourceSplit {
private final String splitId;
private final String topic;
private final int partition;
private final long startOffset;
}
Design Notes:
sequenceDiagram
participant Coord as Coordinator
participant Enum as SourceSplitEnumerator
participant Worker as TaskExecutionService
participant Reader as SourceReader
Coord->>Enum: createEnumerator(context)
Enum->>Enum: open()
Worker->>Reader: createReader(context)
Reader->>Reader: open()
Coord->>Enum: registerReader(subtaskId)
Enum->>Enum: run() (discover/generate splits, impl-defined)
Reader->>Enum: context.sendSplitRequest()
Enum->>Enum: handleSplitRequest(subtaskId)
Enum->>Reader: assignSplit(splits)
Reader->>Reader: addSplits(splits)
Reader->>Reader: pollNext(collector)
Reader->>Worker: collect(record)
sequenceDiagram
participant CP as CheckpointCoordinator
participant Enum as SourceSplitEnumerator
participant Reader as SourceReader
CP->>Reader: triggerBarrier(checkpointId)
Reader->>Reader: snapshotState(checkpointId)
Reader->>CP: ack(readerState)
CP->>Enum: snapshotState(checkpointId)
Enum->>Enum: snapshot enumerator state
Enum->>CP: ack(enumeratorState)
CP->>CP: All acks received
CP->>CP: Persist checkpoint
sequenceDiagram
participant Coord as Coordinator
participant Enum as SourceSplitEnumerator
participant OldReader as Failed Reader
participant NewReader as New Reader
OldReader->>OldReader: [Failure]
Coord->>Enum: addSplitsBack(splits, subtaskId)
Enum->>Enum: Mark splits as pending
Coord->>NewReader: Deploy on new worker
NewReader->>NewReader: Restore from checkpoint (reader state)
Coord->>Enum: registerReader(subtaskId)
Enum->>NewReader: assignSplit(recovered splits)
NewReader->>NewReader: Resume from checkpointed offset
The enumerator runs on the master side and coordinates split assignment.
public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT>
extends AutoCloseable, CheckpointListener {
/**
* Called when enumerator starts
*/
void open();
/**
* Executes split discovery and background coordination logic.
*
* Note: run() and snapshotState() may be invoked concurrently by different threads.
*/
void run() throws Exception;
/**
* Add a split back to the enumerator for reassignment (typically after reader failure).
*/
void addSplitsBack(List<SplitT> splits, int subtaskId);
/**
* Current number of unassigned splits.
*/
int currentUnassignedSplitSize();
/**
* Called when a reader requests more splits.
*/
void handleSplitRequest(int subtaskId);
/**
* Called when a reader registers.
*/
void registerReader(int subtaskId);
/**
* Snapshot enumerator state for checkpoint
*/
StateT snapshotState(long checkpointId) throws Exception;
/**
* Handle custom event from reader
*/
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
/**
* Close enumerator
*/
void close() throws IOException;
/**
* Context for interacting with framework
*/
interface Context<SplitT extends SourceSplit> {
int currentParallelism();
Set<Integer> registeredReaders();
void assignSplit(int subtaskId, List<SplitT> splits);
void signalNoMoreSplits(int subtaskId);
void sendEventToSourceReader(int subtaskId, SourceEvent event);
}
}
Key Responsibilities:
Implementation Example:
public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
private final Queue<JdbcSourceSplit> pendingSplits = new LinkedList<>();
private final Set<String> assignedSplits = new HashSet<>();
private final Context<JdbcSourceSplit> context;
@Override
public void run() throws Exception {
// Discover splits by querying database metadata
List<JdbcSourceSplit> splits = generateSplitsByPartition();
pendingSplits.addAll(splits);
}
@Override
public void handleSplitRequest(int subtaskId) {
// Assign next available split
JdbcSourceSplit split = pendingSplits.poll();
if (split != null) {
context.assignSplit(subtaskId, Collections.singletonList(split));
assignedSplits.add(split.splitId());
} else {
context.signalNoMoreSplits(subtaskId);
}
}
@Override
public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
// Reclaim splits from failed reader
pendingSplits.addAll(splits);
splits.forEach(split -> assignedSplits.remove(split.splitId()));
}
@Override
public JdbcSourceState snapshotState(long checkpointId) {
// Save remaining splits and assignment info
return new JdbcSourceState(new ArrayList<>(pendingSplits), assignedSplits);
}
}
The reader runs on workers and performs actual data reading.
public interface SourceReader<T, SplitT extends SourceSplit>
extends AutoCloseable, CheckpointListener {
/**
* Called when reader starts
*/
void open() throws Exception;
/**
* Poll next batch of records (non-blocking or timeout)
*/
void pollNext(Collector<T> output) throws Exception;
/**
* Snapshot reader state for checkpoint (typically the current splits/positions).
*/
List<SplitT> snapshotState(long checkpointId) throws Exception;
/**
* Add newly assigned splits.
*/
void addSplits(List<SplitT> splits);
/**
* Signal no more splits will be assigned.
*/
void handleNoMoreSplits();
/**
* Handle custom event from enumerator
*/
default void handleSourceEvent(SourceEvent sourceEvent) {}
/**
* Close reader
*/
void close() throws IOException;
/**
* Context for interacting with framework
*/
interface Context {
int getIndexOfSubtask();
Boundedness getBoundedness();
void signalNoMoreElement();
void sendSplitRequest();
void sendSourceEventToEnumerator(SourceEvent sourceEvent);
}
}
Key Responsibilities:
Implementation Example:
public class JdbcSourceReader implements SourceReader<SeaTunnelRow, JdbcSourceSplit> {
private final Queue<JdbcSourceSplit> pendingSplits = new LinkedList<>();
private JdbcSourceSplit currentSplit;
private ResultSet currentResultSet;
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
if (currentResultSet == null) {
// Fetch next split
currentSplit = pendingSplits.poll();
if (currentSplit == null) {
context.sendSplitRequest(); // Request more splits
return;
}
// Execute query for current split
currentResultSet = executeQuery(currentSplit);
}
// Read batch of rows
int count = 0;
while (currentResultSet.next() && count++ < BATCH_SIZE) {
SeaTunnelRow row = convertToRow(currentResultSet);
output.collect(row);
}
// Check if split completed
if (!currentResultSet.next()) {
currentResultSet.close();
currentResultSet = null;
currentSplit = null;
}
}
@Override
public void addSplits(List<JdbcSourceSplit> splits) {
pendingSplits.addAll(splits);
}
@Override
public List<JdbcSourceState> snapshotState(long checkpointId) {
// Save current split and offset
List<JdbcSourceState> states = new ArrayList<>();
if (currentSplit != null) {
states.add(new JdbcSourceState(currentSplit, currentRow));
}
pendingSplits.forEach(split ->
states.add(new JdbcSourceState(split, 0)));
return states;
}
}
Allows enumerator and reader to exchange custom messages.
public interface SourceEvent extends Serializable {
}
// Example: Reader notifies enumerator of discovered partitions
public class PartitionDiscoveredEvent implements SourceEvent {
private final List<String> newPartitions;
}
// Example: Enumerator notifies reader of configuration change
public class ConfigChangeEvent implements SourceEvent {
private final Map<String, String> newConfig;
}
Use Cases:
API Interfaces:
Example Implementations:
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/Pros:
Cons:
Mitigation:
Coarse-grained splits (few large splits):
Fine-grained splits (many small splits):
Guideline: Choose split granularity based on source capabilities, expected parallelism, and checkpoint/recovery cost.
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
// Read batch instead of single record
for (int i = 0; i < BATCH_SIZE && hasNext(); i++) {
output.collect(readNextRow());
}
}
Benefits:
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
// Return immediately if no data available
if (!hasNext()) {
return; // Framework will call again later
}
output.collect(readNextRow());
}
Benefits:
public class JdbcSourceReader {
private final HikariDataSource dataSource; // Connection pool
@Override
public void pollNext(Collector<SeaTunnelRow> output) {
try (Connection conn = dataSource.getConnection()) {
// Reuse pooled connections
}
}
}
public class CustomEnumerator implements SourceSplitEnumerator<...> {
@Override
public void handleSplitRequest(int subtaskId) {
// Custom logic: assign splits based on data locality
JdbcSourceSplit split = findClosestSplit(subtaskId);
context.assignSplit(subtaskId, Collections.singletonList(split));
}
private JdbcSourceSplit findClosestSplit(int subtaskId) {
// Check worker location and assign split on same rack/region
WorkerLocation location = getWorkerLocation(subtaskId);
return pendingSplits.stream()
.filter(split -> split.location().equals(location))
.findFirst()
.orElse(pendingSplits.poll());
}
}
public class KafkaSourceSplitEnumerator {
@Override
public void run() throws Exception {
// Discover initial partitions
discoverPartitions();
// Periodically check for new partitions
scheduledExecutor.scheduleAtFixedRate(
this::discoverPartitions,
60, 60, TimeUnit.SECONDS
);
}
private void discoverPartitions() {
List<TopicPartition> newPartitions = kafkaAdmin.listPartitions();
// Assign new partitions to readers
assignNewPartitions(newPartitions);
}
}
1. Split Sizing
2. State Management
3. Error Handling
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try {
// Read data
} catch (TransientException e) {
// Retry transient errors
Thread.sleep(1000);
retry();
} catch (FatalException e) {
// Fatal errors should propagate
throw e;
}
}
4. Resource Management
@Override
public void close() throws IOException {
// Always close resources
if (resultSet != null) resultSet.close();
if (connection != null) connection.close();
if (dataSource != null) dataSource.close();
}
1. Blocking pollNext()
// ❌ BAD: Blocks indefinitely
public void pollNext(Collector<SeaTunnelRow> output) {
while (true) {
Record record = queue.take(); // Blocks until data available
output.collect(record);
}
}
// ✅ GOOD: Non-blocking or timeout
public void pollNext(Collector<SeaTunnelRow> output) {
Record record = queue.poll(100, TimeUnit.MILLISECONDS);
if (record != null) {
output.collect(record);
}
}
2. Large State
// ❌ BAD: Buffer entire split in state
public class BadReaderState {
private List<SeaTunnelRow> bufferedRows; // May be huge!
}
// ✅ GOOD: Only track offset
public class GoodReaderState {
private long currentOffset; // Small and efficient
}
3. Forgetting to Request Splits
// ❌ BAD: Reader never gets splits
public void pollNext(Collector<SeaTunnelRow> output) {
if (pendingSplits.isEmpty()) {
return; // Oops, should request more splits!
}
}
// ✅ GOOD: Explicitly request splits
public void pollNext(Collector<SeaTunnelRow> output) {
if (pendingSplits.isEmpty()) {
context.sendSplitRequest();
return;
}
}
1. Enable Debug Logging
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceReader.class);
public void pollNext(Collector<SeaTunnelRow> output) {
LOG.debug("Polling split: {}, offset: {}", currentSplit.splitId(), currentOffset);
// ...
}
2. Track Metrics
public class JdbcSourceReader {
private long recordsRead = 0;
private long bytesRead = 0;
public void pollNext(Collector<SeaTunnelRow> output) {
SeaTunnelRow row = readRow();
recordsRead++;
bytesRead += row.getBytesSize();
output.collect(row);
}
}
3. Test Split Reassignment
// Simulate reader failure to test split recovery
@Test
public void testSplitReassignment() {
// Assign splits to reader 0
enumerator.handleSplitRequest(0);
// Simulate reader 0 failure
enumerator.addSplitsBack(assignedSplits, 0);
// New reader 1 should get those splits
enumerator.registerReader(1);
enumerator.handleSplitRequest(1);
// Verify splits were reassigned
assertThat(assignedSplits).isNotEmpty();
}