docs/v3/api-ref/python/prefect-task_runners.mdx
prefect.task_runnersTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L62" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Abstract base class for task runners.
A task runner is responsible for submitting tasks to the task run engine running in an execution environment. Submitted tasks are non-blocking and return a future object that can be used to wait for the task to complete and retrieve the result.
Task runners are context managers and should be used in a with block to ensure
proper cleanup of resources.
Methods:
duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L84" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>duplicate(self) -> Self
Return a new instance of this task runner with the same configuration.
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L117" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any | unmapped[Any] | allow_failure[Any]], wait_for: Iterable[PrefectFuture[R]] | None = None) -> PrefectFutureList[F]
Submit multiple tasks to the task run engine.
Args:
task: The task to submit.parameters: The parameters to use when running the task.wait_for: A list of futures that the task depends on.Returns:
name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L79" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>name(self) -> str
The name of this task runner
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> F
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L100" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> F
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L109" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> F
ThreadPoolTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L239" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A task runner that executes tasks in a separate thread pool.
Attributes:
max_workers: The maximum number of threads to use for executing tasks.
Defaults to PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS or sys.maxsize.Examples:
Use a thread pool task runner with a flow:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def some_io_bound_task(x: int) -> int:
# making a query to a database, reading a file, etc.
return x * 2
@flow(task_runner=ThreadPoolTaskRunner(max_workers=3)) # use at most 3 threads at a time
def my_io_bound_flow():
futures = []
for i in range(10):
future = some_io_bound_task.submit(i * 100)
futures.append(future)
return [future.result() for future in futures]
Use a thread pool task runner as a context manager:
from prefect.task_runners import ThreadPoolTaskRunner
@task
def some_io_bound_task(x: int) -> int:
# making a query to a database, reading a file, etc.
return x * 2
# Use the runner directly
with ThreadPoolTaskRunner(max_workers=2) as runner:
future1 = runner.submit(some_io_bound_task, {"x": 1})
future2 = runner.submit(some_io_bound_task, {"x": 2})
result1 = future1.result() # 2
result2 = future2.result() # 4
Configure max workers via settings:
# Set via environment variable
# export PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS=8
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner()) # Uses 8 workers from setting
def my_flow():
...
Methods:
cancel_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L442" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cancel_all(self) -> None
duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L324" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>duplicate(self) -> 'ThreadPoolTaskRunner[R]'
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L419" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L427" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L434" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L328" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L337" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L345" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
Submit a task to the task run engine running in a separate thread.
Args:
task: The task to submit.parameters: The parameters to use when running the task.wait_for: A list of futures that the task depends on.Returns:
ProcessPoolTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L658" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A task runner that executes tasks in a separate process pool.
This task runner uses ProcessPoolExecutor to run tasks in separate processes,
providing true parallelism for CPU-bound tasks and process isolation. Tasks
are executed with proper context propagation and error handling.
Attributes:
max_workers: The maximum number of processes to use for executing tasks.
Defaults to multiprocessing.cpu_count() if PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS is not set.Examples:
Use a process pool task runner with a flow:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def compute_heavy_task(n: int) -> int:
# CPU-intensive computation that benefits from process isolation
return sum(i ** 2 for i in range(n))
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def my_flow():
futures = []
for i in range(10):
future = compute_heavy_task.submit(i * 1000)
futures.append(future)
return [future.result() for future in futures]
Use a process pool task runner as a context manager:
from prefect.task_runners import ProcessPoolTaskRunner
@task
def my_task(x: int) -> int:
return x * 2
# Use the runner directly
with ProcessPoolTaskRunner(max_workers=2) as runner:
future1 = runner.submit(my_task, {"x": 1})
future2 = runner.submit(my_task, {"x": 2})
result1 = future1.result() # 2
result2 = future2.result() # 4
Configure max workers via settings:
# Set via environment variable
# export PREFECT_TASKS_RUNNER_PROCESS_POOL_MAX_WORKERS=8
from prefect import flow
from prefect.task_runners import ProcessPoolTaskRunner
@flow(task_runner=ProcessPoolTaskRunner()) # Uses 8 workers from setting
def my_flow():
...
Methods:
cancel_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1146" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cancel_all(self) -> None
duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L763" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>duplicate(self) -> Self
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1123" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1131" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1138" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectConcurrentFuture[R]]
set_subprocess_message_processor_factories <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L791" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_subprocess_message_processor_factories(self, subprocess_message_processor_factories: Iterable[_SubprocessMessageProcessorFactory] | None = None) -> None
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1023" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1032" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1040" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectConcurrentFuture[R]
Submit a task to the task run engine running in a separate process.
Args:
task: The task to submit.parameters: The parameters to use when running the task.wait_for: A list of futures that the task depends on.dependencies: A dictionary of dependencies for the task.Returns:
subprocess_message_processor_factories <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L771" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>subprocess_message_processor_factories(self) -> tuple[_SubprocessMessageProcessorFactory, ...]
subprocess_message_processor_factories <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L777" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>subprocess_message_processor_factories(self, subprocess_message_processor_factories: Iterable[_SubprocessMessageProcessorFactory] | None = None) -> None
PrefectTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1216" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1220" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>duplicate(self) -> 'PrefectTaskRunner[R]'
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1279" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDistributedFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1287" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDistributedFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1294" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDistributedFuture[R]]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1224" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectDistributedFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1233" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectDistributedFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_runners.py#L1241" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, R | CoroutineType[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectDistributedFuture[R]
Submit a task to the task run engine running in a separate thread.
Args:
task: The task to submit.parameters: The parameters to use when running the task.wait_for: A list of futures that the task depends on.Returns: