docs/en/architecture/fault-tolerance/checkpoint-mechanism.md
Distributed data processing systems face critical challenges for fault tolerance:
SeaTunnel's checkpoint mechanism aims to:
SeaTunnel's checkpoint is based on the Chandy-Lamport distributed snapshot algorithm:
Key Idea: Insert special markers (barriers) into data streams. When a task receives barrier:
Result: Globally consistent snapshot without pausing entire system.
Reference: "Distributed Snapshots: Determining Global States of Distributed Systems" (Chandy & Lamport, 1985)
┌─────────────────────────────────────────────────────────────────┐
│ JobMaster (per job) │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ CheckpointCoordinator (per pipeline) │ │
│ │ │ │
│ │ • Trigger checkpoint (periodic/manual) │ │
│ │ • Generate checkpoint ID │ │
│ │ • Track pending checkpoints │ │
│ │ • Collect task acknowledgements │ │
│ │ • Persist completed checkpoints │ │
│ │ • Cleanup old checkpoints │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ │ (Trigger Barrier) │
│ ▼ │
└─────────────────────────────────────────────────────────────────┘
│
│ (CheckpointBarrier)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Nodes │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SourceTask 1 │ │ SourceTask 2 │ │ SourceTask N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Snapshot │ │ 2. Snapshot │ │ 2. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 3. ACK │ │ 3. ACK │ │ 3. ACK │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ │ (Barrier Propagation) │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Transform 1 │ │ Transform 2 │ │ Transform N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Snapshot │ │ 2. Snapshot │ │ 2. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 3. ACK │ │ 3. ACK │ │ 3. ACK │ │
│ │ 4. Forward │ │ 4. Forward │ │ 4. Forward │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SinkTask 1 │ │ SinkTask 2 │ │ SinkTask N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Prepare │ │ 2. Prepare │ │ 2. Prepare │ │
│ │ Commit │ │ Commit │ │ Commit │ │
│ │ 3. Snapshot │ │ 3. Snapshot │ │ 3. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 4. ACK │ │ 4. ACK │ │ 4. ACK │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ (All ACKs received)
▼
┌─────────────────────────────────────────────────────────────────┐
│ CheckpointStorage │
│ (HDFS / S3 / Local / OSS) │
│ │
│ CompletedCheckpoint { │
│ checkpointId: 123 │
│ taskStates: { │
│ SourceTask-1: { splits: [...], offsets: [...] } │
│ SinkTask-1: { commitInfo: XidInfo(...) } │
│ ... │
│ } │
│ } │
└─────────────────────────────────────────────────────────────────┘
public class CheckpointCoordinator {
// Checkpoint ID generator
private final CheckpointIDCounter checkpointIdCounter;
// Checkpoint execution plan
private final CheckpointPlan checkpointPlan;
// Pending checkpoints (in progress)
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
// Completed checkpoints (success)
private final ArrayDeque<String> completedCheckpointIds;
// Latest completed checkpoint
private CompletedCheckpoint latestCompletedCheckpoint;
// Checkpoint storage
private final CheckpointStorage checkpointStorage;
// Configuration
private final long checkpointInterval; // Trigger interval (ms)
private final long checkpointTimeout; // Timeout (ms)
private final int minPauseBetweenCheckpoints; // Min pause (ms)
}
Represents in-progress checkpoint.
public class PendingCheckpoint {
private final long checkpointId;
private final CheckpointType checkpointType; // CHECKPOINT or SAVEPOINT
private final long triggerTimestamp;
// Tasks that haven't acknowledged yet
private final Set<Long> notYetAcknowledgedTasks;
// Collected action states (from task ACKs)
private final Map<ActionStateKey, ActionState> actionStates;
// Task statistics (records processed, bytes, etc.)
private final Map<Long, TaskStatistics> taskStatistics;
// Future completed when all tasks ACK
private final CompletableFuture<CompletedCheckpoint> completableFuture;
/**
* Called when task acknowledges checkpoint
*/
public void acknowledgeTask(long taskId, List<ActionSubtaskState> states,
TaskStatistics statistics) {
notYetAcknowledgedTasks.remove(taskId);
// Collect states
for (ActionSubtaskState state : states) {
actionStates.computeIfAbsent(state.getKey(), k -> new ActionState())
.putSubtaskState(state);
}
// Collect statistics
taskStatistics.put(taskId, statistics);
// Check if all tasks acknowledged
if (notYetAcknowledgedTasks.isEmpty()) {
completeCheckpoint();
}
}
private void completeCheckpoint() {
CompletedCheckpoint completed = new CompletedCheckpoint(
checkpointId, actionStates, taskStatistics, System.currentTimeMillis()
);
completableFuture.complete(completed);
}
}
Persisted checkpoint data.
public class CompletedCheckpoint implements Serializable {
private final long checkpointId;
private final Map<ActionStateKey, ActionState> taskStates;
private final Map<Long, TaskStatistics> taskStatistics;
private final long completedTimestamp;
}
public class ActionState implements Serializable {
private final ActionStateKey key; // (pipelineId, actionId)
private final Map<Integer, ActionSubtaskState> subtaskStates;
}
public class ActionSubtaskState implements Serializable {
private final int subtaskIndex;
private final byte[] state; // Serialized state
}
Abstraction for checkpoint persistence.
public interface CheckpointStorage {
/**
* Store completed checkpoint
*/
void storeCheckpoint(CompletedCheckpoint checkpoint) throws IOException;
/**
* Get latest checkpoint
*/
Optional<CompletedCheckpoint> getLatestCheckpoint() throws IOException;
/**
* Get specific checkpoint by ID
*/
Optional<CompletedCheckpoint> getCheckpoint(long checkpointId) throws IOException;
/**
* Delete old checkpoint
*/
void deleteCheckpoint(long checkpointId) throws IOException;
}
Implementations:
LocalFileStorage: Local file system (testing)HdfsStorage: Hadoop FileSystem-based backend; can work with HDFS/S3A/etc depending on Hadoop configurationNote: S3 and OSS support are provided through Hadoop FileSystem configuration (e.g., fs.s3a.impl) rather than separate CheckpointStorage implementations.
sequenceDiagram
participant Timer as Periodic Timer
participant Coord as CheckpointCoordinator
participant Plan as CheckpointPlan
Timer->>Coord: Trigger (every 60s)
Coord->>Coord: Generate checkpointId (123)
Coord->>Coord: Check conditions
Note over Coord: • Min pause elapsed?
• Max concurrent not exceeded?
• Previous checkpoint complete?
Coord->>Coord: Create PendingCheckpoint(123)
Coord->>Plan: Get starting tasks
loop For each starting task
Coord->>Task: Send CheckpointBarrierTriggerOperation(123)
end
Coord->>Coord: Start timeout timer (10 minutes)
Trigger Conditions:
sequenceDiagram
participant Coord as Coordinator
participant Source as SourceTask
participant Transform as TransformTask
participant Sink as SinkTask
Coord->>Source: Trigger barrier(123)
Source->>Source: Receive barrier
Source->>Source: snapshotState() → splits, offsets
Source->>Coord: ACK(state)
Source->>Transform: Forward barrier(123)
Transform->>Transform: Receive barrier
Transform->>Transform: snapshotState() → transform state
Transform->>Coord: ACK(state)
Transform->>Sink: Forward barrier(123)
Sink->>Sink: Receive barrier
Sink->>Sink: prepareCommit(checkpointId) → commitInfo
Sink->>Sink: snapshotState() → writer state
Sink->>Coord: ACK(commitInfo + state)
Coord->>Coord: All ACKs received
Coord->>Coord: Create CompletedCheckpoint
Barrier Flow Rules:
Barrier Alignment (for tasks with multiple inputs):
// Task with 2 inputs
Input 1: ──data──data──[barrier-123]──data──data──
│ Wait!
Input 2: ──data──data──data──data──[barrier-123]──
│
▼
Both barriers received, snapshot state
Each task type snapshots different state:
SourceTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Snapshot SourceReader state (splits + offsets)
List<byte[]> states = sourceFlowLifeCycle.snapshotState(checkpointId);
// 2. Create ActionSubtaskState
ActionSubtaskState state = new ActionSubtaskState(subtaskIndex, states);
// 3. Send ACK to coordinator
sendAcknowledgement(checkpointId, Collections.singletonList(state));
// 4. Forward barrier downstream
forwardBarrierToDownstream(checkpointId);
}
TransformTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Snapshot Transform state (usually stateless, empty state)
List<byte[]> states = transformFlowLifeCycle.snapshotState(checkpointId);
// 2. Create ActionSubtaskState
ActionSubtaskState state = new ActionSubtaskState(subtaskIndex, states);
// 3. Send ACK
sendAcknowledgement(checkpointId, Collections.singletonList(state));
// 4. Forward barrier
forwardBarrierToDownstream(checkpointId);
}
SinkTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Prepare commit (TWO-PHASE COMMIT)
Optional<CommitInfoT> commitInfo = sinkWriter.prepareCommit(checkpointId);
// 2. Snapshot writer state
List<StateT> writerStates = sinkWriter.snapshotState(checkpointId);
// 3. Create ActionSubtaskState (includes both commit info and state)
ActionSubtaskState state = new ActionSubtaskState(
subtaskIndex,
serialize(writerStates),
commitInfo.orElse(null)
);
// 4. Send ACK (NO forwarding - end of pipeline)
sendAcknowledgement(checkpointId, Collections.singletonList(state));
}
sequenceDiagram
participant Coord as CheckpointCoordinator
participant Pending as PendingCheckpoint
participant Storage as CheckpointStorage
participant Committer as SinkCommitter
participant Tasks as All Tasks
Pending->>Pending: All tasks ACKed
Pending->>Coord: notifyCheckpointComplete()
Coord->>Coord: Create CompletedCheckpoint
Coord->>Storage: Persist checkpoint
Storage-->>Coord: Success
Coord->>Committer: commit(commitInfos)
Committer-->>Coord: Success
Coord->>Tasks: notifyCheckpointComplete(123)
Tasks->>Tasks: Cleanup resources
Coord->>Storage: Delete old checkpoints
Completion Steps:
CompletedCheckpoint from PendingCheckpoint// CheckpointCoordinator
private void startCheckpointTimeout(long checkpointId, long timeoutMs) {
scheduledExecutor.schedule(() -> {
PendingCheckpoint pending = pendingCheckpoints.get(checkpointId);
if (pending != null && !pending.isCompleted()) {
LOG.warn("Checkpoint {} timeout after {}ms, {} tasks not yet acknowledged",
checkpointId, timeoutMs, pending.getNotYetAcknowledgedTasks());
// Fail checkpoint
pending.abort();
pendingCheckpoints.remove(checkpointId);
// Trigger job failover if needed
handleCheckpointFailure(checkpointId);
}
}, timeoutMs, TimeUnit.MILLISECONDS);
}
Timeout Handling:
sequenceDiagram
participant JM as JobMaster
participant Storage as CheckpointStorage
participant Source as SourceTask
participant Sink as SinkTask
JM->>Storage: getLatestCheckpoint()
Storage-->>JM: CompletedCheckpoint(123)
JM->>JM: Extract states per task
JM->>Source: Deploy with NotifyTaskRestoreOperation
activate Source
Source->>Source: restoreState(splits, offsets)
Source->>Source: Seek to checkpointed offset
Source-->>JM: Ready
deactivate Source
JM->>Sink: Deploy with NotifyTaskRestoreOperation
activate Sink
Sink->>Sink: restoreWriter(writerState)
Sink->>Sink: Restore uncommitted transactions
Sink-->>JM: Ready
deactivate Sink
JM->>Source: Start execution
JM->>Sink: Start execution
Restore Steps:
CompletedCheckpoint from storageNotifyTaskRestoreOperation containing stateExample: JDBC Source Recovery:
public class JdbcSourceReader {
@Override
public void restoreState(List<JdbcSourceState> states) {
for (JdbcSourceState state : states) {
JdbcSourceSplit split = state.getSplit();
long offset = state.getCurrentOffset();
// Restore split with offset
pendingSplits.add(split);
// When processing split, start from offset
String query = split.getQuery() + " OFFSET " + offset;
}
}
}
Combination of checkpoint restore + sink two-phase commit ensures exactly-once:
Checkpoint N (completed):
Source offsets: [100, 200, 300]
Sink prepared commits: [XID-1, XID-2, XID-3]
Sink committer commits XID-1, XID-2, XID-3
↓ [Failure]
Recovery from Checkpoint N:
1. Restore source offsets: [100, 200, 300]
2. Sources start reading from offset 100, 200, 300
3. Sink writers restore state (may have uncommitted XIDs)
4. Sink committer retries committing XIDs (idempotent)
Result: Records 0-99, 100-199, 200-299 committed exactly once
Records from 100+ reprocessed but not duplicated (idempotent commit)
env {
# Enable checkpoint
checkpoint.interval = 60000 # Trigger every 60 seconds
# Checkpoint timeout
checkpoint.timeout = 600000 # 10 minutes
# Min pause between checkpoints
min-pause = 10000 # 10 seconds
}
Checkpoint storage is configured on the engine side (e.g., config/seatunnel.yaml under seatunnel.engine.checkpoint.storage), rather than as job-level env options.
Checkpoint Interval:
Trade-offs:
Rule of Thumb: Set interval to tolerable recovery time (data loss window).
Checkpoint Timeout:
Storage Selection (SeaTunnel Engine):
localfile (LocalFileStorage): local filesystem, non-HAhdfs (HdfsStorage): Hadoop FileSystem-based backend; can work with HDFS/S3A/etc depending on Hadoop configurationState snapshot doesn't block data processing:
public class AsyncSnapshotSupport {
@Override
public void snapshotState(long checkpointId) {
// 1. Create snapshot of current state (fast, in-memory copy)
StateSnapshot snapshot = createSnapshot();
// 2. Continue data processing (doesn't wait for serialization/upload)
// ...
// 3. Async serialize and upload
CompletableFuture.runAsync(() -> {
byte[] serialized = serialize(snapshot);
checkpointStorage.upload(checkpointId, serialized);
}, executorService);
}
}
Only checkpoint changed state:
// Full checkpoint (first)
Checkpoint 1: State = 1GB → Upload 1GB
// Incremental checkpoints (subsequent)
Checkpoint 2: State = 1.1GB → Upload 100MB (delta)
Checkpoint 3: State = 1.05GB → Upload 0MB (deletion doesn't upload)
Benefits:
Challenges:
Store hot state locally, checkpoint only summary:
// RocksDB local state backend
class RocksDBStateBackend {
private final RocksDB rocksDB; // Fast local SSD
@Override
public void put(String key, byte[] value) {
rocksDB.put(key.getBytes(), value); // Local write (fast)
}
@Override
public byte[] snapshotState() {
// Only checkpoint RocksDB snapshot reference
return rocksDB.createCheckpoint().getBytes();
}
}
1. Keep State Small:
// ❌ BAD: Buffer entire dataset
class BadSourceReader {
private List<SeaTunnelRow> bufferedRows = new ArrayList<>(); // May be huge!
List<State> snapshotState() {
return serialize(bufferedRows); // Huge state
}
}
// ✅ GOOD: Track offset only
class GoodSourceReader {
private long currentOffset = 0;
List<State> snapshotState() {
return serialize(currentOffset); // Small state
}
}
2. Use Efficient Serialization:
Key Metrics:
checkpoint_duration: Time from trigger to completioncheckpoint_size: Size of persisted checkpointcheckpoint_failure_rate: Percentage of failed checkpointscheckpoint_alignment_duration: Time spent aligning barriersAlerting:
checkpoint_duration > threshold (e.g., 5 minutes)checkpoint_failure_rate > 10%Problem: Checkpoint timeout
Possible Causes:
Solutions:
Problem: High checkpoint overhead
Possible Causes:
Solutions: