docs/en/architecture/api-design/translation-layer.md
SeaTunnel provides a unified connector API, but jobs need to run on different execution engines:
SeaTunnel's translation layer aims to:
┌──────────────────────────────────────────────────────────────┐
│ SeaTunnel API Layer │
│ (Engine-Independent Connector Interface) │
│ │
│ SeaTunnelSource SeaTunnelSink SeaTunnelTransform │
└──────────────────────────────────────────────────────────────┘
│
│ Translation Layer
┌─────────────┼─────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Flink Adapter │ │ Spark Adapter │ │ Zeta (Native) │
│ │ │ │ │ │
│ FlinkSource │ │ SparkSource │ │ Direct │
│ FlinkSink │ │ SparkSink │ │ Execution │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Apache Flink │ │ Apache Spark │ │ SeaTunnel Engine │
│ Runtime │ │ Runtime │ │ (Zeta) │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Adapts SeaTunnelSource to Flink's Source interface.
public class FlinkSource<T, SplitT extends SourceSplit, StateT>
implements Source<T, SplitWrapper<SplitT>, EnumeratorStateWrapper<StateT>> {
// Wrapped SeaTunnel source
private final SeaTunnelSource<T, SplitT, StateT> seaTunnelSource;
@Override
public Boundedness getBoundedness() {
// Delegate to SeaTunnel source
return seaTunnelSource.getBoundedness() == Boundedness.BOUNDED
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SourceReader<T, SplitWrapper<SplitT>> createReader(
SourceReaderContext readerContext
) {
// Create SeaTunnel reader with adapted context
org.apache.seatunnel.api.source.SourceReader<T, SplitT> seaTunnelReader =
seaTunnelSource.createReader(new FlinkSourceReaderContext(readerContext));
// Wrap in Flink adapter
return new FlinkSourceReader<>(seaTunnelReader, readerContext);
}
@Override
public SplitEnumerator<SplitWrapper<SplitT>, EnumeratorStateWrapper<StateT>>
createEnumerator(SplitEnumeratorContext<SplitWrapper<SplitT>> context) {
// Create SeaTunnel enumerator with adapted context
SourceSplitEnumerator<SplitT, StateT> seaTunnelEnumerator =
seaTunnelSource.createEnumerator(
new FlinkSourceSplitEnumeratorContext<>(context)
);
// Wrap in Flink adapter
return new FlinkSourceEnumerator<>(seaTunnelEnumerator, context);
}
@Override
public SimpleVersionedSerializer<SplitWrapper<SplitT>> getSplitSerializer() {
// Adapt SeaTunnel serializer to Flink serializer
return new FlinkSimpleVersionedSerializer<>(
seaTunnelSource.getSplitSerializer()
);
}
}
public class FlinkSourceReader<T, SplitT extends SourceSplit>
implements SourceReader<T, SplitWrapper<SplitT>> {
private final org.apache.seatunnel.api.source.SourceReader<T, SplitT> seaTunnelReader;
private final SourceReaderContext flinkContext;
@Override
public void start() {
// Delegate to SeaTunnel reader
try {
seaTunnelReader.open();
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to open SeaTunnel reader", e);
}
}
@Override
public InputStatus pollNext(ReaderOutput<T> output) {
try {
// Adapt output collector
CollectorAdapter<T> collector = new CollectorAdapter<>(output);
// Poll from SeaTunnel reader
seaTunnelReader.pollNext(collector);
if (collector.hasRecords()) {
return InputStatus.MORE_AVAILABLE;
} else {
return InputStatus.NOTHING_AVAILABLE;
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to poll from SeaTunnel reader", e);
}
}
@Override
public void addSplits(List<SplitWrapper<SplitT>> splits) {
// Unwrap and delegate
List<SplitT> unwrappedSplits = splits.stream()
.map(SplitWrapper::getSplit)
.collect(Collectors.toList());
seaTunnelReader.addSplits(unwrappedSplits);
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
seaTunnelReader.notifyCheckpointComplete(checkpointId);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to notify checkpoint complete", e);
}
}
@Override
public List<SplitWrapper<SplitT>> snapshotState(long checkpointId) {
try {
List<SplitT> state = seaTunnelReader.snapshotState(checkpointId);
// Wrap splits for Flink
return state.stream()
.map(SplitWrapper::new)
.collect(Collectors.toList());
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to snapshot state", e);
}
}
}
public class FlinkSourceEnumerator<SplitT extends SourceSplit, StateT>
implements SplitEnumerator<SplitWrapper<SplitT>, EnumeratorStateWrapper<StateT>> {
private final SourceSplitEnumerator<SplitT, StateT> seaTunnelEnumerator;
private final SplitEnumeratorContext<SplitWrapper<SplitT>> flinkContext;
@Override
public void start() {
try {
seaTunnelEnumerator.open();
seaTunnelEnumerator.run();
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to start enumerator", e);
}
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
// Delegate to SeaTunnel enumerator
seaTunnelEnumerator.handleSplitRequest(subtaskId);
}
@Override
public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) {
// Unwrap and delegate
List<SplitT> unwrappedSplits = splits.stream()
.map(SplitWrapper::getSplit)
.collect(Collectors.toList());
seaTunnelEnumerator.addSplitsBack(unwrappedSplits, subtaskId);
}
@Override
public void addReader(int subtaskId) {
seaTunnelEnumerator.addReader(subtaskId);
}
@Override
public EnumeratorStateWrapper<StateT> snapshotState(long checkpointId) {
try {
StateT state = seaTunnelEnumerator.snapshotState(checkpointId);
return new EnumeratorStateWrapper<>(state);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to snapshot enumerator state", e);
}
}
}
FlinkSourceReaderContext:
public class FlinkSourceReaderContext
implements org.apache.seatunnel.api.source.SourceReader.Context {
private final SourceReaderContext flinkContext;
@Override
public int getIndexOfSubtask() {
return flinkContext.getIndexOfThisSubtask();
}
@Override
public void sendSplitRequest() {
// Flink automatically handles split requests
// No explicit API needed
}
@Override
public void sendSourceEventToEnumerator(SourceEvent event) {
flinkContext.sendSourceEventToCoordinator(
new SourceEventWrapper(event)
);
}
}
FlinkSourceSplitEnumeratorContext:
public class FlinkSourceSplitEnumeratorContext<SplitT extends SourceSplit>
implements SourceSplitEnumerator.Context<SplitT> {
private final SplitEnumeratorContext<SplitWrapper<SplitT>> flinkContext;
@Override
public int currentParallelism() {
return flinkContext.currentParallelism();
}
@Override
public Set<Integer> registeredReaders() {
return flinkContext.registeredReaders().keySet();
}
@Override
public void assignSplit(int subtaskId, List<SplitT> splits) {
// Wrap and delegate
List<SplitWrapper<SplitT>> wrappedSplits = splits.stream()
.map(SplitWrapper::new)
.collect(Collectors.toList());
flinkContext.assignSplits(new SplitsAssignment<>(
Collections.singletonMap(subtaskId, wrappedSplits)
));
}
@Override
public void signalNoMoreSplits(int subtaskId) {
flinkContext.signalNoMoreSplits(subtaskId);
}
@Override
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
flinkContext.sendEventToSourceReader(subtaskId, new SourceEventWrapper(event));
}
}
public class FlinkSink<IN, CommitInfoT, WriterStateT, AggregatedCommitInfoT>
implements Sink<IN, CommitInfoT, WriterStateT, AggregatedCommitInfoT> {
private final SeaTunnelSink<IN, WriterStateT, CommitInfoT, AggregatedCommitInfoT> seaTunnelSink;
@Override
public SinkWriter<IN, CommitInfoT, WriterStateT> createWriter(InitContext context) {
// Create SeaTunnel writer with adapted context
org.apache.seatunnel.api.sink.SinkWriter<IN, CommitInfoT, WriterStateT> seaTunnelWriter =
seaTunnelSink.createWriter(new FlinkSinkWriterContext(context));
// Wrap in Flink adapter
return new FlinkSinkWriter<>(seaTunnelWriter);
}
@Override
public Optional<Committer<CommitInfoT>> createCommitter() {
return seaTunnelSink.createCommitter()
.map(FlinkCommitter::new);
}
@Override
public Optional<GlobalCommitter<CommitInfoT, AggregatedCommitInfoT>> createGlobalCommitter() {
return seaTunnelSink.createAggregatedCommitter()
.map(FlinkGlobalCommitter::new);
}
@Override
public Optional<SimpleVersionedSerializer<CommitInfoT>> getCommittableSerializer() {
return seaTunnelSink.getCommitInfoSerializer()
.map(FlinkSimpleVersionedSerializer::new);
}
@Override
public Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer() {
return seaTunnelSink.getWriterStateSerializer()
.map(FlinkSimpleVersionedSerializer::new);
}
}
public class FlinkSinkWriter<IN, CommitInfoT, WriterStateT>
implements SinkWriter<IN, CommitInfoT, WriterStateT> {
private final org.apache.seatunnel.api.sink.SinkWriter<IN, CommitInfoT, WriterStateT> seaTunnelWriter;
private long checkpointId;
@Override
public void write(IN element, Context context) throws IOException {
// Delegate to SeaTunnel writer
seaTunnelWriter.write(element);
}
@Override
public List<CommitInfoT> prepareCommit(boolean flush) throws IOException {
Optional<CommitInfoT> commitInfo = seaTunnelWriter.prepareCommit(checkpointId);
return commitInfo.map(Collections::singletonList)
.orElse(Collections.emptyList());
}
@Override
public List<WriterStateT> snapshotState(long checkpointId) throws IOException {
return seaTunnelWriter.snapshotState(checkpointId);
}
@Override
public void close() throws Exception {
seaTunnelWriter.close();
}
}
Note: Spark 2.4 and Spark 3.x use different datasource APIs. SeaTunnel maintains separate Spark translation modules/adapters per Spark major version, so the exact adapter types and lifecycle hooks may differ.
Adapts SeaTunnelSource to Spark's DataSourceReader interface.
public class SparkSource<T, SplitT extends SourceSplit, StateT>
implements DataSourceReader {
private final SeaTunnelSource<T, SplitT, StateT> seaTunnelSource;
@Override
public StructType readSchema() {
// Convert SeaTunnel schema to Spark schema
CatalogTable catalogTable = seaTunnelSource.getProducedCatalogTables().get(0);
return SparkTypeConverter.convert(catalogTable.getTableSchema());
}
@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
// Create enumerator and generate splits
SourceSplitEnumerator<SplitT, StateT> enumerator =
seaTunnelSource.createEnumerator(new SparkEnumeratorContext());
try {
enumerator.open();
enumerator.run();
// Collect all splits
List<SplitT> splits = collectAllSplits(enumerator);
// Wrap each split as Spark InputPartition
return splits.stream()
.map(split -> new SparkInputPartition<>(seaTunnelSource, split))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("Failed to plan input partitions", e);
}
}
}
public class SparkInputPartition<T, SplitT extends SourceSplit>
implements InputPartition<InternalRow> {
private final SeaTunnelSource<T, SplitT, ?> seaTunnelSource;
private final SplitT split;
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
// Create SeaTunnel reader
org.apache.seatunnel.api.source.SourceReader<T, SplitT> seaTunnelReader =
seaTunnelSource.createReader(new SparkReaderContext());
// Wrap in Spark adapter
return new SparkPartitionReader<>(seaTunnelReader, split);
}
}
public class SparkPartitionReader<T, SplitT extends SourceSplit>
implements InputPartitionReader<InternalRow> {
private final org.apache.seatunnel.api.source.SourceReader<T, SplitT> seaTunnelReader;
private final Queue<InternalRow> buffer = new LinkedList<>();
public SparkPartitionReader(
org.apache.seatunnel.api.source.SourceReader<T, SplitT> reader,
SplitT split
) {
this.seaTunnelReader = reader;
try {
seaTunnelReader.open();
seaTunnelReader.addSplits(Collections.singletonList(split));
} catch (Exception e) {
throw new RuntimeException("Failed to open reader", e);
}
}
@Override
public boolean next() throws IOException {
if (!buffer.isEmpty()) {
return true;
}
// Poll from SeaTunnel reader
try {
seaTunnelReader.pollNext(new Collector<T>() {
@Override
public void collect(T record) {
// Convert to Spark InternalRow
InternalRow row = SparkTypeConverter.convert(record);
buffer.offer(row);
}
});
return !buffer.isEmpty();
} catch (Exception e) {
throw new IOException("Failed to poll next", e);
}
}
@Override
public InternalRow get() {
return buffer.poll();
}
@Override
public void close() throws IOException {
try {
seaTunnelReader.close();
} catch (Exception e) {
throw new IOException("Failed to close reader", e);
}
}
}
public class SparkSink<IN, WriterStateT, CommitInfoT>
implements DataSourceWriter {
private final SeaTunnelSink<IN, WriterStateT, CommitInfoT, ?> seaTunnelSink;
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
return new SparkDataWriterFactory<>(seaTunnelSink);
}
@Override
public boolean useCommitCoordinator() {
// Use commit coordinator if sink has committer
return seaTunnelSink.createCommitter().isPresent();
}
@Override
public void commit(WriterCommitMessage[] messages) {
Optional<SinkCommitter<CommitInfoT>> committerOpt = seaTunnelSink.createCommitter();
if (committerOpt.isPresent()) {
SinkCommitter<CommitInfoT> committer = committerOpt.get();
// Extract commit infos from messages
List<CommitInfoT> commitInfos = Arrays.stream(messages)
.map(msg -> ((SparkCommitMessage<CommitInfoT>) msg).getCommitInfo())
.collect(Collectors.toList());
// Commit
try {
List<CommitInfoT> failed = committer.commit(commitInfos);
if (!failed.isEmpty()) {
throw new IOException("Some commits failed: " + failed);
}
} catch (IOException e) {
throw new RuntimeException("Failed to commit", e);
}
}
}
@Override
public void abort(WriterCommitMessage[] messages) {
// Handle abort
Optional<SinkCommitter<CommitInfoT>> committerOpt = seaTunnelSink.createCommitter();
if (committerOpt.isPresent()) {
SinkCommitter<CommitInfoT> committer = committerOpt.get();
List<CommitInfoT> commitInfos = Arrays.stream(messages)
.map(msg -> ((SparkCommitMessage<CommitInfoT>) msg).getCommitInfo())
.collect(Collectors.toList());
try {
committer.abort(commitInfos);
} catch (IOException e) {
throw new RuntimeException("Failed to abort", e);
}
}
}
}
public class FlinkSimpleVersionedSerializer<T>
implements SimpleVersionedSerializer<T> {
private final org.apache.seatunnel.api.serialization.Serializer<T> seaTunnelSerializer;
@Override
public int getVersion() {
// Delegate to SeaTunnel serializer
return seaTunnelSerializer.getVersion();
}
@Override
public byte[] serialize(T obj) throws IOException {
return seaTunnelSerializer.serialize(obj);
}
@Override
public T deserialize(int version, byte[] serialized) throws IOException {
return seaTunnelSerializer.deserialize(serialized);
}
}
public class SparkTypeConverter {
public static StructType convert(TableSchema schema) {
List<StructField> fields = new ArrayList<>();
for (Column column : schema.getColumns()) {
StructField field = new StructField(
column.getName(),
convertDataType(column.getDataType()),
column.isNullable(),
Metadata.empty()
);
fields.add(field);
}
return new StructType(fields.toArray(new StructField[0]));
}
private static DataType convertDataType(SeaTunnelDataType<?> seaTunnelType) {
switch (seaTunnelType.getSqlType()) {
case TINYINT:
return DataTypes.ByteType;
case SMALLINT:
return DataTypes.ShortType;
case INT:
return DataTypes.IntegerType;
case BIGINT:
return DataTypes.LongType;
case FLOAT:
return DataTypes.FloatType;
case DOUBLE:
return DataTypes.DoubleType;
case DECIMAL:
DecimalType decimalType = (DecimalType) seaTunnelType;
return DataTypes.createDecimalType(
decimalType.getPrecision(),
decimalType.getScale()
);
case STRING:
return DataTypes.StringType;
case BOOLEAN:
return DataTypes.BooleanType;
case DATE:
return DataTypes.DateType;
case TIMESTAMP:
return DataTypes.TimestampType;
case BYTES:
return DataTypes.BinaryType;
case ARRAY:
ArrayType arrayType = (ArrayType) seaTunnelType;
return DataTypes.createArrayType(
convertDataType(arrayType.getElementType())
);
case MAP:
MapType mapType = (MapType) seaTunnelType;
return DataTypes.createMapType(
convertDataType(mapType.getKeyType()),
convertDataType(mapType.getValueType())
);
default:
throw new UnsupportedOperationException(
"Unsupported type: " + seaTunnelType);
}
}
}
Translation overhead depends on connector implementations, serialization, and type conversion complexity. Prefer measuring in your own workload rather than relying on fixed numbers.
Batch Type Conversion:
// ❌ BAD: Convert per record
public void collect(SeaTunnelRow record) {
InternalRow sparkRow = convertToSparkRow(record);
output.collect(sparkRow);
}
// ✅ GOOD: Batch convert (amortize overhead)
public void collect(List<SeaTunnelRow> records) {
InternalRow[] sparkRows = batchConvertToSparkRows(records);
for (InternalRow row : sparkRows) {
output.collect(row);
}
}
Avoid Unnecessary Wrapping:
// If Split already serializable, don't wrap
public class SplitWrapper<T> {
private final T split;
// Lazy wrapping: only wrap when needed for serialization
public byte[] serialize() {
if (split instanceof Serializable) {
return directSerialize(split); // No wrapping overhead
} else {
return wrapAndSerialize(split); // Fallback
}
}
}
Problem: Some engine features have no SeaTunnel equivalent.
Example: Flink's WatermarkStrategy
// Flink-specific watermark strategy cannot be expressed in SeaTunnel API
WatermarkStrategy<T> watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5));
Workaround: Provide engine-specific configuration
source {
Kafka {
# SeaTunnel config
topic = "my_topic"
# Engine-specific config (for Flink only)
flink.watermark.strategy = "bounded-out-of-orderness"
flink.watermark.max-out-of-orderness = "5s"
}
}
Problem: Type systems don't fully align.
Example: Spark has TimestampType, Flink has LocalZonedTimestampType and TimestampType.
Workaround: Use least common denominator
// SeaTunnel uses generic TIMESTAMP
// Translation layer maps to appropriate engine type based on config
DO:
DON'T:
Test on All Engines:
@RunWith(Parameterized.class)
public class ConnectorTest {
@Parameters
public static Collection<Object[]> engines() {
return Arrays.asList(new Object[][]{
{"flink"},
{"spark"},
{"seatunnel"}
});
}
@Test
public void testExactlyOnce(String engine) {
// Run same test on different engines
runJobOnEngine(engine, jobConfig);
verifyResults();
}
}
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation/seatunnel-translation-spark/seatunnel-api/src/main/java/org/apache/seatunnel/api/