scientific-skills/dnanexus-integration/references/job-execution.md
Jobs are the fundamental execution units on DNAnexus. When an applet or app runs, a job is created and executed on a worker node in an isolated Linux environment with constant API access.
Initially created by users or automated systems.
Result from directly launching an executable (app/applet).
Spawned by parent jobs for parallel processing or sub-workflows.
Basic execution:
import dxpy
# Run an applet
job = dxpy.DXApplet("applet-xxxx").run({
"input1": {"$dnanexus_link": "file-yyyy"},
"input2": "parameter_value"
})
print(f"Job ID: {job.get_id()}")
Using command line:
dx run applet-xxxx -i input1=file-yyyy -i input2="value"
# Run an app by name
job = dxpy.DXApp(name="my-app").run({
"reads": {"$dnanexus_link": "file-xxxx"},
"quality_threshold": 30
})
job = dxpy.DXApplet("applet-xxxx").run(
applet_input={
"input_file": {"$dnanexus_link": "file-yyyy"}
},
project="project-zzzz", # Output project
folder="/results", # Output folder
name="My Analysis Job", # Job name
instance_type="mem2_hdd2_x4", # Override instance type
priority="high" # Job priority
)
job = dxpy.DXJob("job-xxxx")
state = job.describe()["state"]
# States: idle, waiting_on_input, runnable, running, done, failed, terminated
print(f"Job state: {state}")
Using command line:
dx watch job-xxxx
# Block until job completes
job.wait_on_done()
# Check if successful
if job.describe()["state"] == "done":
output = job.describe()["output"]
print(f"Job completed: {output}")
else:
print("Job failed")
job = dxpy.DXJob("job-xxxx")
# Wait for completion
job.wait_on_done()
# Get outputs
output = job.describe()["output"]
output_file_id = output["result_file"]["$dnanexus_link"]
# Download result
dxpy.download_dxfile(output_file_id, "result.txt")
Create references to job outputs before they complete:
# Launch first job
job1 = dxpy.DXApplet("applet-1").run({"input": "..."})
# Launch second job using output reference
job2 = dxpy.DXApplet("applet-2").run({
"input": dxpy.dxlink(job1.get_output_ref("output_name"))
})
Command line:
dx watch job-xxxx --get-streams
Programmatically:
import sys
# Get job logs
job = dxpy.DXJob("job-xxxx")
log = dxpy.api.job_get_log(job.get_id())
for log_entry in log["loglines"]:
print(log_entry)
@dxpy.entry_point('main')
def main(input_files):
# Create subjobs for parallel processing
subjobs = []
for input_file in input_files:
subjob = dxpy.new_dxjob(
fn_input={"file": input_file},
fn_name="process_file"
)
subjobs.append(subjob)
# Collect results
results = []
for subjob in subjobs:
result = subjob.get_output_ref("processed_file")
results.append(result)
return {"all_results": results}
@dxpy.entry_point('process_file')
def process_file(file):
# Process single file
# ...
return {"processed_file": output_file}
# Scatter: Process items in parallel
scatter_jobs = []
for item in items:
job = dxpy.new_dxjob(
fn_input={"item": item},
fn_name="process_item"
)
scatter_jobs.append(job)
# Gather: Combine results
gather_job = dxpy.new_dxjob(
fn_input={
"results": [job.get_output_ref("result") for job in scatter_jobs]
},
fn_name="combine_results"
)
Workflows combine multiple apps/applets into multi-step pipelines.
# Create workflow
workflow = dxpy.new_dxworkflow(
name="My Analysis Pipeline",
project="project-xxxx"
)
# Add stages
stage1 = workflow.add_stage(
dxpy.DXApplet("applet-1"),
name="Quality Control",
folder="/qc"
)
stage2 = workflow.add_stage(
dxpy.DXApplet("applet-2"),
name="Alignment",
folder="/alignment"
)
# Connect stages
stage2.set_input("reads", stage1.get_output_ref("filtered_reads"))
# Close workflow
workflow.close()
# Run workflow
analysis = workflow.run({
"stage-xxxx.input1": {"$dnanexus_link": "file-yyyy"}
})
# Monitor analysis (collection of jobs)
analysis.wait_on_done()
# Get workflow outputs
outputs = analysis.describe()["output"]
Using command line:
dx run workflow-xxxx -i stage-1.input=file-yyyy
Jobs run in a workspace project with cloned input data:
CONTRIBUTE permission to workspaceVIEW access to source projectsJobs cannot start until:
closed stateOutput objects must reach closed state before workspace cleanup.
Created → Waiting on Input → Runnable → Running → Done/Failed
States:
idle: Job created but not yet queuedwaiting_on_input: Waiting for input data objects to closerunnable: Ready to run, waiting for resourcesrunning: Currently executingdone: Completed successfullyfailed: Execution failedterminated: Manually stoppedjob = dxpy.DXJob("job-xxxx")
job.wait_on_done()
desc = job.describe()
if desc["state"] == "failed":
print(f"Job failed: {desc.get('failureReason', 'Unknown')}")
print(f"Failure message: {desc.get('failureMessage', '')}")
# Rerun failed job
new_job = dxpy.DXApplet(desc["applet"]).run(
desc["originalInput"],
project=desc["project"]
)
# Stop a running job
job = dxpy.DXJob("job-xxxx")
job.terminate()
Using command line:
dx terminate job-xxxx
Specify computational resources:
# Run with specific instance type
job = dxpy.DXApplet("applet-xxxx").run(
{"input": "..."},
instance_type="mem3_ssd1_v2_x8" # 8 cores, high memory, SSD
)
Common instance types:
mem1_ssd1_v2_x4 - 4 cores, standard memorymem2_ssd1_v2_x8 - 8 cores, high memorymem3_ssd1_v2_x16 - 16 cores, very high memorymem1_ssd1_v2_x36 - 36 cores for parallel workloadsSet maximum execution time:
job = dxpy.DXApplet("applet-xxxx").run(
{"input": "..."},
timeout="24h" # Maximum runtime
)
job = dxpy.DXApplet("applet-xxxx").run(
{"input": "..."},
tags=["experiment1", "batch2", "production"]
)
job = dxpy.DXApplet("applet-xxxx").run(
{"input": "..."},
properties={
"experiment": "exp001",
"sample": "sample1",
"batch": "batch2"
}
)
# Find jobs by tag
jobs = dxpy.find_jobs(
project="project-xxxx",
tags=["experiment1"],
describe=True
)
for job in jobs:
print(f"{job['describe']['name']}: {job['id']}")