docs/v3/examples/per-worker-task-concurrency.mdx
{/*
This page is automatically generated via the generate_example_pages.py script. Any changes to this page will be overwritten.
*/}
<a href="https://github.com/PrefectHQ/prefect/blob/main/examples/per_worker_task_concurrency.py" target="_blank">View on GitHub</a>
When a worker runs multiple flow runs concurrently, those flow runs share the worker machine's resources—CPU, memory, GPU, local software. Some tasks may need to limit how many can run at once to avoid overloading these resources.
The problem: Using --limit 1 on the worker forces entire flow runs to be
sequential. But often only specific tasks need limits—other tasks could overlap.
The solution: Use Global Concurrency Limits with worker-specific names. GCLs are coordinated by the Prefect server, so they work across the separate subprocesses that each flow run executes in.
Consider a pipeline that processes images through an ML model:
Without limits, if 5 flow runs hit the ML step simultaneously, they'd all try to load the model into GPU memory and crash. With per-worker limits, only 1-2 run at a time while others wait.
import os
import time
from prefect import flow, get_run_logger, task
from prefect.concurrency.sync import concurrency
def get_worker_id() -> str:
"""
Get worker identity from environment.
Set this when starting the worker:
WORKER_ID=gpu-1 prefect worker start --pool ml-pool
"""
return os.getenv("WORKER_ID", "default")
These tasks don't contend for limited resources, so they run freely.
@task
def download_image(image_id: int) -> dict:
"""Download an image from storage. Network-bound, no local resource contention."""
logger = get_run_logger()
logger.info(f"Image {image_id}: downloading...")
time.sleep(1) # simulate download
return {"image_id": image_id, "path": f"/tmp/image_{image_id}.jpg"}
@task
def save_results(data: dict) -> str:
"""Save processed results. Fast operation, no limits needed."""
logger = get_run_logger()
logger.info(f"Image {data['image_id']}: saving results...")
time.sleep(0.5)
return f"processed-{data['image_id']}"
This task uses a local resource (GPU) that can only handle limited concurrent usage. The limit is scoped to this worker so each machine has independent limits.
@task
def run_ml_model(data: dict) -> dict:
"""
Run image through ML model.
GPU memory is limited—only 1-2 can run at once per worker machine.
Uses a Global Concurrency Limit scoped to this worker's identity.
"""
logger = get_run_logger()
worker_id = get_worker_id()
image_id = data["image_id"]
# Limit key includes worker ID: each worker has its own limit
with concurrency(f"gpu:{worker_id}", occupy=1):
logger.info(f"Image {image_id}: running ML inference (GPU)...")
time.sleep(3) # simulate model inference
return {**data, "predictions": [0.9, 0.1]}
@flow(log_prints=True)
def process_image(image_id: int = 1) -> str:
"""
Process an image through the ML pipeline.
When multiple instances run concurrently on the same worker:
- download and save tasks overlap freely
- run_ml_model tasks are limited by the per-worker GPU concurrency limit
"""
logger = get_run_logger()
logger.info(f"Processing image {image_id} on worker '{get_worker_id()}'")
image = download_image(image_id)
predictions = run_ml_model(image)
result = save_results(predictions)
return result
Each worker machine needs its own limit. The limit value controls how many ML tasks can run simultaneously on that machine.
# GPU machine 1: allow 2 concurrent ML tasks
prefect gcl create gpu:gpu-1 --limit 2
# GPU machine 2: allow 2 concurrent ML tasks
prefect gcl create gpu:gpu-2 --limit 2
prefect work-pool create ml-pool --type process
prefect deploy --all
Each worker needs a unique ID that matches its GCL name:
# Machine 1
WORKER_ID=gpu-1 prefect worker start --pool ml-pool --limit 10
# Machine 2
WORKER_ID=gpu-2 prefect worker start --pool ml-pool --limit 10
The --limit 10 allows up to 10 concurrent flow runs, but the GCL ensures
only 2 are in the ML step at any time.
for i in {1..20}; do
prefect deployment run process-image/process-image --param image_id=$i --timeout 0
done
With 10 concurrent flow runs on a worker:
Flow runs aren't blocked entirely—just the resource-intensive step is limited. This maximizes throughput while protecting the GPU from overload.
GCLs are server-coordinated — The Prefect server tracks who holds what limit. It doesn't matter that flow runs are separate processes.
Worker-specific names — By including worker_id in the limit name,
each worker machine has independent limits. GPU-1's limit doesn't affect GPU-2.
Selective application — Only the tasks that need limits acquire them. Everything else runs at full concurrency.
The same pattern works for any local resource constraint:
Just change the limit name and value to match your constraint.
if __name__ == "__main__":
process_image(image_id=1)