docs/en/architecture/engine/engine-architecture.md
Data integration engines must solve fundamental distributed systems challenges:
SeaTunnel Engine (Zeta) is designed as a native execution engine with:
| Feature | SeaTunnel Zeta | Apache Flink | Apache Spark |
|---|---|---|---|
| Primary Use Case | Data sync, CDC | Stream processing | Batch + ML |
| Resource Model | Slot-based | Slot-based | Executor-based |
| State Backend | Pluggable (HDFS/S3/Local) | RocksDB/Heap | In-memory/Disk |
| Checkpoint | Distributed snapshots | Chandy-Lamport | RDD lineage |
| Operational Complexity | Lower (engine-native) | Higher | Higher |
┌─────────────────────────────────────────────────────────────────┐
│ Master Node │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ CoordinatorService │ │
│ │ • Manages all running jobs │ │
│ │ • Job submission and lifecycle management │ │
│ │ • Maintains job state (IMap) │ │
│ │ • Resource manager factory │ │
│ └───────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ JobMaster (one per job) │ │
│ │ • Generates physical execution plan │ │
│ │ • Requests resources from ResourceManager │ │
│ │ • Deploys tasks to workers │ │
│ │ • Coordinates checkpoints │ │
│ │ • Handles failover and recovery │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │ │
│ │ (Task Deploy) │ (Resource Request) │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌────────────────────────────┐ │
│ │ CheckpointManager│ │ ResourceManager │ │
│ │ (per pipeline) │ │ • Slot allocation │ │
│ └─────────────────┘ │ • Worker registration │ │
│ │ • Load balancing │ │
│ └────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ (Hazelcast Cluster)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Nodes │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ TaskExecutionService │ │
│ │ • Deploys and executes tasks │ │
│ │ • Manages task lifecycle │ │
│ │ • Reports heartbeat │ │
│ │ • Slot resource management │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ SeaTunnelTask (multiple per worker) │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ SourceFlowLifeCycle │ │ │
│ │ │ • SourceReader │ │ │
│ │ │ • SeaTunnelSourceCollector │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ TransformFlowLifeCycle │ │ │
│ │ │ • Transform chain │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ SinkFlowLifeCycle │ │ │
│ │ │ • SinkWriter │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Centralized service managing all jobs in the cluster.
Responsibilities:
Key Data Structures:
// Running job state (distributed IMap backed by Hazelcast)
IMap<Long, JobInfo> runningJobInfoIMap;
IMap<Long, JobStatus> runningJobStateIMap;
IMap<Long, Long> runningJobStateTimestampsIMap;
// Completed job history
IMap<Long, JobInfo> completedJobInfoIMap;
Code Reference:
Manages single job execution lifecycle.
Responsibilities:
Lifecycle:
Created → Initialized → Scheduled → Running → Finished/Failed/Canceled
Key Operations:
init(): Generate physical plan, create checkpoint coordinatorsrun(): Request resources, deploy tasks, start executionhandleFailure(): Restart failed tasks, restore from checkpointCode Reference:
Manages worker resources and slot allocation.
Responsibilities:
Slot Allocation Strategies:
// 1. Random: Random selection among available workers
// 2. SlotRatio: Prefer workers with more available slots
// 3. SystemLoad: Prefer workers with lower CPU/memory usage
Code Reference:
User Config (HOCON)
│
▼
┌───────────────┐
│ LogicalDag │ • Logical vertices (Source/Transform/Sink)
│ │ • Logical edges (data flow)
│ │ • Parallelism (per vertex)
└───────────────┘
│ (JobMaster.generatePhysicalPlan())
▼
┌───────────────┐
│ PhysicalPlan │ • List of SubPlan (pipelines)
│ │ • JobImmutableInformation
│ │ • Resource requirements
└───────────────┘
│
▼
┌───────────────┐
│ SubPlan │ • Pipeline (independent execution unit)
│ (Pipeline) │ • List of PhysicalVertex
│ │ • CheckpointCoordinator
└───────────────┘
│
▼
┌───────────────┐
│PhysicalVertex │ • TaskGroup (co-located tasks)
│ │ • Assigned SlotProfile
│ │ • ExecutionState
└───────────────┘
│
▼
┌───────────────┐
│ TaskGroup │ • Multiple SeaTunnelTask instances
│ │ • Shared network buffer
│ │ • Thread pool
└───────────────┘
│
▼
┌───────────────┐
│ SeaTunnelTask │ • Single task execution
│ │ • Source/Transform/Sink lifecycle
│ │ • Task state machine
└───────────────┘
Represents user's intent in engine-independent way.
public class LogicalDag {
private final Map<Long, LogicalVertex> logicalVertexMap;
private final Set<LogicalEdge> edges;
private final JobConfig jobConfig;
}
public class LogicalVertex {
private final long vertexId;
private final Action action; // SourceAction / TransformChainAction / SinkAction
private final int parallelism;
}
public class LogicalEdge {
private final long inputVertexId;
private final long targetVertexId;
}
Creation:
// From user config
LogicalDag logicalDag = LogicalDagBuilder.build(jobConfig);
Represents actual execution plan with resource allocation.
public class PhysicalPlan {
private final List<SubPlan> pipelineList;
private final JobImmutableInformation jobImmutableInformation;
private final CompletableFuture<JobResult> jobEndFuture;
}
public class SubPlan {
private final int pipelineId;
private final List<PhysicalVertex> physicalVertexList;
private final List<PhysicalVertex> coordinatorVertexList;
private final CheckpointCoordinator checkpointCoordinator;
}
public class PhysicalVertex {
private final TaskGroupLocation taskGroupLocation;
private final TaskGroupDefaultImpl taskGroup;
private final SlotProfile slotProfile; // Assigned slot
private final ExecutionState currentExecutionState;
}
Generation:
PhysicalPlan physicalPlan = jobMaster.getPhysicalPlan();
// JobMaster internally:
// 1. Split LogicalDag into pipelines
// 2. Generate PhysicalVertex for each parallel instance
// 3. Create CheckpointCoordinator per pipeline
Jobs are divided into Pipelines (SubPlans) for independent execution:
Example:
# Config with multiple sources/sinks
env { ... }
source {
MySQL-CDC { table = "orders" }
Kafka { topic = "events" }
}
transform {
Sql { query = "SELECT * FROM orders JOIN events ON ..." }
}
sink {
Elasticsearch { index = "orders" }
JDBC { table = "events" }
}
Generated Pipelines:
Pipeline 1: MySQL-CDC → Transform → Elasticsearch
Pipeline 2: Kafka → Transform → JDBC
Benefits:
Multiple actions can be fused into single TaskGroup for efficiency:
Without Fusion:
[Source Task] → Network → [Transform Task] → Network → [Sink Task]
With Fusion:
[TaskGroup: Source → Transform → Sink] (single thread, no network)
Fusion Conditions:
[Created]
│
▼
[INIT] ────────────────────────────────────┐
│ │
▼ │
[WAITING_RESTORE] (if recovering) │
│ │
▼ │
[READY_START] │
│ │
▼ │
[STARTING] ──────────────┐ │
│ │ │
▼ ▼ ▼
[RUNNING] ──────────> [FAILED] ─────> (Restart)
│
▼
[PREPARE_CLOSE]
│
▼
[CLOSED]
│
▼
[CANCELED] (if job canceled)
State Transitions:
public abstract class SeaTunnelTask implements Runnable {
private final TaskLocation taskLocation;
private final TaskExecutionContext executionContext;
private ExecutionState executionState;
@Override
public void run() {
try {
init();
restoreState(); // If recovering
open();
while (isRunning()) {
processData(); // Source: read, Transform: process, Sink: write
handleBarrier(); // Checkpoint barriers
}
close();
} catch (Exception e) {
handleException(e);
}
}
}
Task Types:
Each task manages component lifecycle through FlowLifeCycle:
// Source task
public class SourceFlowLifeCycle<T> implements FlowLifeCycle {
private final SourceReader<T, ?> sourceReader;
private final SeaTunnelSourceCollector collector;
@Override
public void open() {
sourceReader.open();
}
@Override
public void collect() {
sourceReader.pollNext(collector); // Read data
}
@Override
public void close() {
sourceReader.close();
}
}
// Sink task
public class SinkFlowLifeCycle<T> implements FlowLifeCycle {
private final SinkWriter<T, ?, ?> sinkWriter;
@Override
public void collect() {
T record = inputQueue.poll();
sinkWriter.write(record); // Write data
}
}
Each pipeline has independent checkpoint coordinator.
Responsibilities:
Key Data Structures:
public class CheckpointCoordinator {
private final CheckpointIDCounter checkpointIdCounter;
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
private final ArrayDeque<String> completedCheckpointIds;
private final CheckpointStorage checkpointStorage;
}
Checkpoint Flow:
Code Reference:
Special control message that flows with data:
public class Barrier {
private final long checkpointId;
private final long timestamp;
private final CheckpointType type; // CHECKPOINT or SAVEPOINT
}
Barrier Alignment:
SlotProfile:
public class SlotProfile {
private final int slotID;
private final Address worker;
private final ResourceProfile resourceProfile; // CPU, memory
}
public class ResourceProfile {
private final CPU cpu;
private final Memory heapMemory;
}
WorkerProfile:
public class WorkerProfile {
private final Address address;
private final ResourceProfile profile;
private final ResourceProfile unassignedResource;
private final SlotProfile[] assignedSlots;
private final SlotProfile[] unassignedSlots;
private final Map<String, String> attributes;
}
sequenceDiagram
participant JM as JobMaster
participant RM as ResourceManager
participant Worker as Worker Node
JM->>RM: applyResources(jobId, resourceProfiles)
RM->>RM: Select workers (strategy)
RM->>RM: Allocate slots
RM->>JM: Return slot profiles
JM->>Worker: Deploy task (DeployTaskOperation)
Worker->>Worker: Create SeaTunnelTask
Worker->>JM: ACK
JM->>JM: Task running
Assign tasks to specific worker groups:
env {
# Job-level worker attribute filter (key/value full match)
tag_filter = {
zone = "db-zone"
}
}
Usage:
Detection:
Recovery:
Detection:
Recovery:
High Availability:
Recovery:
Alternative: Single global DAG execution
Decision: Divide into pipelines
Benefits:
Drawbacks:
Alternative: Zookeeper, etcd, custom Raft implementation
Decision: Hazelcast IMDG
Benefits:
Drawbacks:
1. Task Fusion:
2. Async Checkpoint:
3. Incremental Checkpoint:
4. Zero-Copy Data Transfer:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/