docs/en/architecture/engine/resource-management.md
Distributed execution engines must efficiently manage computing resources:
SeaTunnel's resource management system aims to:
┌──────────────────────────────────────────────────────────────┐
│ JobMaster │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Request Resources │ │
│ │ • Calculate required slots │ │
│ │ • Specify resource profiles (CPU, memory) │ │
│ │ • Apply tag filters (optional) │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ ResourceManager │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Worker Registry │ │
│ │ • WorkerProfile (per worker) │ │
│ │ - Total resources │ │
│ │ - Available resources │ │
│ │ - Assigned slots │ │
│ │ - Unassigned slots │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Allocation Strategies │ │
│ │ • RandomStrategy / SlotRatioStrategy / SystemLoadStrategy │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Slot Management │ │
│ │ • Allocate slots │ │
│ │ • Release slots │ │
│ │ • Track slot usage │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Worker Nodes │
│ │
│ Worker 1 Worker 2 Worker N │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Slot 1 │ │ Slot 1 │ │ Slot 1 │ │
│ │ Slot 2 │ │ Slot 2 │ │ Slot 2 │ │
│ │ ... │ │ ... │ │ ... │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└──────────────────────────────────────────────────────────────┘
A Slot is the fundamental unit of resource allocation.
public class SlotProfile {
// Unique slot identifier
private final int slotID;
// Worker address where this slot resides
private final Address worker;
// Resource capacity of this slot
private final ResourceProfile resourceProfile;
}
Key Properties:
Example:
SlotProfile slot =
new SlotProfile(
new Address("worker-1", 5801),
1001,
new ResourceProfile(CPU.of(1), Memory.of(512 * 1024 * 1024L)),
"seq-1"
);
Describes resource requirements or capacity.
public class ResourceProfile {
private final CPU cpu;
private final Memory heapMemory;
}
public class CPU {
private final int core; // Number of CPU cores
}
public class Memory {
private final long bytes; // Heap memory in bytes
}
Usage:
Represents a worker node's resources and slot inventory.
public class WorkerProfile {
// Worker address
private final Address address;
// Total resources (all slots combined)
private final ResourceProfile profile;
// Currently available resources
private final ResourceProfile unassignedResource;
// Slots assigned to jobs
private final SlotProfile[] assignedSlots;
// Slots available for assignment
private final SlotProfile[] unassignedSlots;
// Worker attributes (used by job-level tag_filter)
private final Map<String, String> attributes;
// Optional system load info (for SystemLoadStrategy)
private final SystemLoadInfo systemLoadInfo;
}
Lifecycle:
public interface ResourceManager {
/**
* Apply for resources (called by JobMaster)
*/
CompletableFuture<List<SlotProfile>> applyResources(
long jobId,
List<ResourceProfile> resourceProfiles,
Map<String, String> tagFilter
) throws NoEnoughResourceException;
/**
* Release resources (called by JobMaster after task completion)
*/
CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> slots);
/**
* Worker heartbeat (called by TaskExecutionService)
*/
void heartbeat(WorkerProfile workerProfile);
/**
* Handle worker removal (failure or graceful shutdown)
*/
void memberRemoved(MembershipServiceEvent event);
}
public abstract class AbstractResourceManager implements ResourceManager {
// Registered workers
protected final ConcurrentMap<Address, WorkerProfile> registerWorker;
// Worker selection strategy (RandomStrategy / SlotRatioStrategy / SystemLoadStrategy)
protected final SlotAllocationStrategy slotAllocationStrategy;
@Override
public CompletableFuture<List<SlotProfile>> applyResources(
long jobId,
List<ResourceProfile> resourceProfiles,
Map<String, String> tagFilter
) throws NoEnoughResourceException {
// 1. Filter workers by tagFilter (match worker attributes)
Map<Address, WorkerProfile> candidates = filterWorkerByTag(tagFilter);
// 2. For each requested profile, select a worker by strategy and pick an unassigned slot
// (actual slot selection/marking is implementation-defined)
return requestSlots(jobId, resourceProfiles, candidates, slotAllocationStrategy);
}
}
In SeaTunnel Engine / Zeta, allocation typically consists of:
Randomly selects a worker from the available candidates.
public class RandomStrategy implements SlotAllocationStrategy {
@Override
public Optional<WorkerProfile> selectWorker(List<WorkerProfile> availableWorkers) {
Collections.shuffle(availableWorkers);
return availableWorkers.stream().findFirst();
}
}
Selects the worker with the lowest slot usage ratio (prefers workers with more available slots).
Selects the worker with the lowest system load (based on heartbeat-reported load information).
Data Locality:
env {
# Job-level worker attribute filter (full key/value match)
tag_filter = {
zone = "us-west-1"
}
}
Resource Specialization:
env {
tag_filter = {
resource = "gpu"
}
}
Multi-Tenancy:
env {
job.name = "tenant-a-job"
tag_filter = {
tenant = "a"
}
}
The engine matches env.tag_filter against worker attributes (key/value full match). If no worker matches, resource allocation fails.
sequenceDiagram
participant JM as JobMaster
participant RM as ResourceManager
participant Worker as Worker Node
JM->>JM: Generate PhysicalPlan
JM->>JM: Calculate required resources
JM->>RM: applyResources(profiles, tags)
RM->>RM: Filter workers by tags
RM->>RM: Select workers by strategy
RM->>RM: Allocate slots
RM-->>JM: Return SlotProfiles
JM->>JM: Assign slots to PhysicalVertices
loop For each task
JM->>Worker: DeployTaskOperation(task, slot)
Worker->>Worker: Execute task in slot
Worker-->>JM: ACK
end
sequenceDiagram
participant JM as JobMaster
participant RM as ResourceManager
JM->>RM: applyResources(100 slots)
RM->>RM: Check available slots
Note over RM: Only 50 slots available
RM-->>JM: NoEnoughResourceException
JM->>JM: Retry with backoff
Note over JM: Wait for resources to free up
JM->>RM: applyResources(100 slots)
RM-->>JM: Success (after resources freed)
sequenceDiagram
participant Task as SeaTunnelTask
participant JM as JobMaster
participant RM as ResourceManager
Task->>Task: Task completes/fails
Task->>JM: Task finished
JM->>RM: releaseResources(slots)
RM->>RM: Mark slots as unassigned
RM->>RM: Update WorkerProfile
Note over RM: Slots available for
new allocations
Detection:
Recovery:
@Override
public void memberRemoved(MembershipEvent event) {
Address failedWorker = event.getMember().getAddress();
// 1. Remove worker from registry
WorkerProfile failed = registerWorker.remove(failedWorker);
// 2. Notify JobMasters of slot losses
List<SlotProfile> lostSlots = failed.getAssignedSlots();
for (SlotProfile slot : lostSlots) {
long jobId = getJobIdForSlot(slot);
JobMaster jobMaster = getJobMaster(jobId);
// 3. Trigger job failover
jobMaster.notifySlotLost(slot);
}
}
JobMaster Response:
High Availability:
Recovery:
Example (config/seatunnel.yaml, SeaTunnel Engine / Zeta):
seatunnel:
engine:
slot-service:
dynamic-slot: true
slot-num: 16
slot-allocate-strategy: RANDOM # RANDOM / SLOT_RATIO / SYSTEM_LOAD
Cluster-Level:
Per-Worker:
Per-Job:
Resource Dashboard Example:
Cluster Resources:
Workers: 10 (all healthy)
Total Slots: 20
Available Slots: 8
Utilization: 60%
Top Resource Consumers:
job-123: 6 slots (mysql-cdc → elasticsearch)
job-456: 4 slots (kafka → jdbc)
job-789: 2 slots (file → s3)
Worker Distribution:
worker-1: 2/2 slots (100%)
worker-2: 1/2 slots (50%)
worker-3: 2/2 slots (100%)
...
Slot sizing (slots per worker, heap per slot, etc.) depends on workload characteristics and deployment constraints. Avoid treating formulas in architecture docs as mandatory defaults.
Use RandomStrategy when:
Use SlotRatioStrategy when:
Use SystemLoadStrategy when:
Data Locality:
env {
# Match worker attributes, e.g., zone=us-west-1a
tag_filter = {
zone = "us-west-1a"
}
}
Resource Isolation:
env {
job.name = "critical-job"
tag_filter = {
priority = "high"
}
}