.tasks/core/JOB-003-parallel-task-execution.md
Enable jobs to spawn parallel tasks on the task-system's multi-threaded worker pool, dramatically improving performance for I/O-bound operations like file copying, thumbnail generation, and media indexing.
Key Design: Jobs access TaskDispatcher via JobContext (similar to ctx.library(), ctx.networking_service()), allowing them to spawn child tasks that execute in parallel across available worker threads.
Expected Impact: 4-10x faster file operations depending on concurrency level.
Current job system runs each job as a single task, processing work sequentially. For operations like copying 100 files, this leaves CPU cores idle and storage I/O underutilized.
Current: 100 files × 500ms = 50 seconds (sequential) Target: 100 files / 10 workers = 5 seconds (10x faster)
JobManager creates JobExecutor with TaskDispatcher
↓
JobContext exposes ctx.task_dispatcher()
↓
Job spawns parallel tasks via dispatcher
↓
Tasks execute on multi-threaded worker pool
Why via Context, not Job storage?
Enable jobs to access task dispatcher via context.
Changes:
task_dispatcher field to JobExecutorStateJobExecutor::new() to accept dispatcher parametertask_dispatcher field to JobContextctx.task_dispatcher() accessor methodJobManager::dispatch() to pass dispatcher#[derive(Job)] macro if neededFiles:
Acceptance Criteria:
ctx.task_dispatcher() and get valid dispatcherMigrate FileCopyJob to use parallel execution.
Changes:
CopyFileTask implementing Task<JobError>FileCopyJob::run() to use dispatcher.dispatch_many()Files:
Acceptance Criteria:
Document the pattern for other developers.
Deliverables:
Apply pattern to other I/O-bound jobs:
Add centralized resource limits to prevent system overload.
Features:
LimitedTaskDispatcher wrapper with semaphoresNote: Resource limiting deferred to later phase after proving parallel execution concept.
#[async_trait]
impl JobHandler for FileCopyJob {
async fn run(&mut self, ctx: JobContext<'_>) -> JobResult<Self::Output> {
// Get dispatcher from context
let dispatcher = ctx.task_dispatcher();
// Create parallel copy tasks
let tasks: Vec<_> = self.sources.paths.iter()
.enumerate()
.filter(|(idx, _)| !self.completed_indices.contains(idx))
.map(|(idx, source)| CopyFileTask {
id: TaskId::new_v4(),
index: idx,
source: source.clone(),
destination: self.destination.clone(),
options: self.options.clone(),
})
.collect();
// Dispatch all tasks - task system handles distribution
let handles = dispatcher.dispatch_many(tasks).await?;
// Wait for completion and track progress
for (completed, handle) in handles.into_iter().enumerate() {
ctx.check_interrupt().await?;
match handle.await {
Ok(TaskStatus::Done(_)) => {
self.completed_indices.push(completed);
ctx.progress(/* ... */);
}
Ok(TaskStatus::Error(e)) => {
// Handle individual task failure
}
_ => {}
}
if (completed + 1) % 10 == 0 {
ctx.checkpoint().await?;
}
}
Ok(FileCopyOutput { /* ... */ })
}
}
struct CopyFileTask {
id: TaskId,
index: usize,
source: SdPath,
destination: SdPath,
options: CopyOptions,
}
#[async_trait]
impl Task<JobError> for CopyFileTask {
fn id(&self) -> TaskId { self.id }
async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, JobError> {
// Check interruption
interrupter.try_check_interrupt()?;
// Execute copy strategy
let strategy = CopyStrategyRouter::select_strategy(/* ... */).await;
let bytes_copied = strategy.execute_simple(/* ... */).await?;
Ok(ExecStatus::Done(/* output */))
}
}
File Copy (100 files, 1MB each, SSD):
Real-world (Mixed sizes, 10GB total):
This replaces the over-engineered approach from JOB_TASK_COMPOSITION_API.md. Key insight: Jobs are orchestrators, tasks are workers. Jobs don't need special task types - they just need ability to spawn standard tasks.