docs/core/jobs.mdx
The job system powers long-running operations in Spacedrive. It provides automatic persistence, progress tracking, and graceful interruption handling for tasks like indexing, file processing, and sync operations.
Jobs execute asynchronously with minimal boilerplate. They persist their state to survive crashes and resume where they left off. The system integrates with Spacedrive's task executor for efficient resource usage.
A job represents a resumable unit of work. Jobs report progress, handle interruptions, and maintain state across executions. The system manages job lifecycles automatically.
<Note> Jobs are library-scoped. Each library maintains its own job database and execution queue. </Note>Jobs transition through defined states during execution:
<Steps> <Step title="Queued"> Job created and waiting for execution. Initial state after dispatch. </Step> <Step title="Running"> Job actively executing. Progress updates flow to subscribers. </Step> <Step title="Paused"> Job interrupted but resumable. State persisted to database. </Step> <Step title="Completed"> Job finished successfully. Moved to history table. </Step> </Steps>Failed or cancelled jobs cannot resume. The system distinguishes between recoverable interruptions and permanent failures.
The job system consists of several interconnected parts:
Job Manager coordinates all job operations. It maintains the job database, tracks running jobs, and handles lifecycle transitions. Located at core/src/infra/job/manager.rs.
Job Registry enables automatic job discovery. Jobs register themselves at compile time using the derive macro. The registry creates jobs dynamically from saved state. See core/src/infra/job/registry.rs.
Job Context provides execution environment. Jobs access the database, report progress, and interact with services through context. Implementation in core/src/infra/job/context.rs.
Job Executor bridges jobs with the task system. It manages interruption signals and checkpoint operations. Found at core/src/infra/job/executor.rs.
Jobs implement two traits: Job for metadata and JobHandler for execution logic.
use sd_task_system::TaskId;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug)]
struct ProcessFilesJob {
location_id: i32,
files: Vec<PathBuf>,
processed: usize,
}
#[typetag::serde(name = "process_files")]
impl Job for ProcessFilesJob {
const NAME: &'static str = "process_files";
const VERSION: u32 = 1;
const IS_RESUMABLE: bool = true;
}
#[async_trait::async_trait]
impl JobHandler for ProcessFilesJob {
async fn run(&mut self, ctx: &JobContext) -> Result<Vec<i32>, JobError> {
// Skip already processed files when resuming
let files_to_process = &self.files[self.processed..];
for (idx, file) in files_to_process.iter().enumerate() {
// Check for interruption
if ctx.check_interrupted().await? {
return Err(JobError::Interrupted);
}
// Process file
process_file(file, ctx).await?;
// Update progress
self.processed += 1;
ctx.report_count_progress(self.processed, self.files.len()).await;
// Save checkpoint periodically
if idx % 100 == 0 {
ctx.checkpoint().await?;
}
}
Ok(vec![])
}
}
Jobs communicate progress through the context. The system supports multiple progress types:
// Count-based progress
ctx.report_count_progress(current, total).await;
// Percentage progress
ctx.report_percentage_progress(0.75).await;
// Bytes processed
ctx.report_bytes_progress(processed_bytes, total_bytes).await;
// Custom structured data
ctx.report_structured_progress("phase", json!({
"stage": "analyzing",
"files_found": 1500
})).await;
Progress updates throttle automatically. The system batches updates to prevent database overhead.
Jobs distinguish between recoverable and permanent errors:
// Recoverable - job can resume
if network_unavailable() {
return Err(JobError::Interrupted);
}
// Permanent failure - job cannot resume
if corrupt_data() {
return Err(JobError::Critical(
"Data corruption detected".into()
));
}
// Non-critical errors accumulate
ctx.report_non_critical_error("Skipped locked file").await;
The job manager provides typed and dynamic dispatch methods:
// Typed dispatch
let handle = manager.dispatch(ProcessFilesJob {
location_id: 1,
files: vec![path1, path2],
processed: 0,
}).await?;
// Dynamic dispatch by name
let handle = manager.dispatch_by_name(
"process_files",
json!({
"location_id": 1,
"files": ["path1", "path2"],
"processed": 0
})
).await?;
// Wait for completion
let result = handle.wait().await?;
Job handles provide status monitoring and progress streaming:
// Subscribe to status changes
let mut status_rx = handle.status();
while let Ok(status) = status_rx.recv().await {
match status {
JobStatus::Running => println!("Job started"),
JobStatus::Completed => break,
_ => {}
}
}
// Stream progress updates
let mut progress_rx = handle.progress();
while let Ok(progress) = progress_rx.recv().await {
if let Some(count) = progress.count {
println!("Processed {}/{}", count.current, count.total);
}
}
Jobs persist to a dedicated SQLite database (jobs.db) with three tables:
Jobs specify versions for schema evolution:
impl Job for DataMigrationJob {
const VERSION: u32 = 2; // Increment when schema changes
}
The registry validates versions during resumption. Incompatible versions fail to load.
The system supports WASM-based extension jobs:
manager.dispatch_extension_job(
extension_id,
job_name,
job_data
).await?;
Extensions run in isolated contexts with limited capabilities.
The job system optimizes for throughput and resumability:
Jobs integrate with core Spacedrive systems:
Task System: Jobs execute as tasks with configurable priority. The executor handles work distribution across threads.
Event System: State changes emit events for UI updates. Subscribe to JOB_MANAGER_EVENTS for notifications.
Action System: User actions spawn jobs with audit context. The system tracks who initiated operations.
Library System: Each library maintains independent job state. Jobs cannot access cross-library data.
Process items in chunks for efficiency:
const BATCH_SIZE: usize = 1000;
for (batch_idx, chunk) in items.chunks(BATCH_SIZE).enumerate() {
if ctx.check_interrupted().await? {
self.batch_idx = batch_idx;
return Err(JobError::Interrupted);
}
process_batch(chunk).await?;
ctx.report_count_progress(
batch_idx * BATCH_SIZE + chunk.len(),
items.len()
).await;
ctx.checkpoint().await?;
}
Split complex jobs into phases:
match self.phase {
Phase::Discovery => {
let items = discover_items(ctx).await?;
self.items = items;
self.phase = Phase::Processing;
ctx.checkpoint().await?;
}
Phase::Processing => {
process_items(&mut self.items, ctx).await?;
self.phase = Phase::Cleanup;
ctx.checkpoint().await?;
}
Phase::Cleanup => {
cleanup_resources(ctx).await?;
}
}
Spawn dependent jobs (feature in development):
let child_ids = ctx.spawn_children(vec![
AnalyzeFileJob { path: file1 },
AnalyzeFileJob { path: file2 },
]).await?;
ctx.wait_for_children(child_ids).await?;
Enable file-based logging for troubleshooting:
std::env::set_var("SD_JOBS_FILE_LOG", "1");
Logs write to .spacedrive/jobs/{job_id}.log with detailed execution traces.
Monitor job metrics through the context:
let metrics = ctx.get_metrics();
println!("Execution time: {}s", metrics.elapsed_seconds);
println!("Memory used: {}MB", metrics.memory_mb);
The job system provides the foundation for reliable background processing in Spacedrive. Its resumable design ensures operations complete despite interruptions, while the progress system keeps users informed of ongoing work.