docs/v3/api-ref/python/prefect-runner-runner.mdx
prefect.runner.runnerRunners are responsible for managing the execution of all deployments.
When creating a deployment using either flow.serve or the serve utility,
they also will poll for scheduled runs.
Example: ```python import time from prefect import flow, serve
@flow
def slow_flow(sleep: int = 60):
"Sleepy flow - sleeps the provided amount of time (in seconds)."
time.sleep(sleep)
@flow
def fast_flow():
"Fastest flow this side of the Mississippi."
return
if __name__ == "__main__":
slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
fast_deploy = fast_flow.to_deployment(name="fast")
# serve generates a Runner instance
serve(slow_deploy, fast_deploy)
```
ProcessMapEntry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L168" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Runner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L173" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
aadd_deployment <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L343" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>aadd_deployment(self, deployment: 'RunnerDeployment') -> UUID
Registers the deployment with the Prefect API and will monitor for work once the runner is started. Async version.
Args:
deployment: A deployment for the runner to register.aadd_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L379" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>aadd_flow(self, flow: Flow[Any, Any], name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, datetime.timedelta]], int, float, datetime.timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, enforce_parameter_schema: bool = True, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH) -> UUID
Provides a flow to the runner to be run based on the provided configuration. Async version.
Will create a deployment for the provided flow and register the deployment with the runner.
Args:
flow: A flow for the runner to run.name: The name to give the created deployment. Will default to the name
of the runner.interval: An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.cron: A cron schedule of when to execute runs of this flow.rrule: An rrule schedule of when to execute runs of this flow.paused: Whether or not to set the created deployment as paused.schedule: A schedule object defining when to execute runs of this deployment.
Used to provide additional scheduling options like timezone or parameters.schedules: A list of schedule objects defining when to execute runs of this flow.
Used to define multiple schedules or additional scheduling options like timezone.concurrency_limit: The maximum number of concurrent runs of this flow to allow.triggers: A list of triggers that should kick of a run of this flow.parameters: A dictionary of default parameter values to pass to runs of this flow.description: A description for the created deployment. Defaults to the flow's
description if not provided.tags: A list of tags to associate with the created deployment for organizational
purposes.version: A version for the created deployment. Defaults to the flow's version.entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
entrypoint, ensure that the module will be importable in the execution environment.add_deployment <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L364" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>add_deployment(self, deployment: 'RunnerDeployment') -> UUID
Registers the deployment with the Prefect API and will monitor for work once the runner is started.
Args:
deployment: A deployment for the runner to register.add_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L477" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>add_flow(self, flow: Flow[Any, Any], name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, datetime.timedelta]], int, float, datetime.timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, enforce_parameter_schema: bool = True, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH) -> UUID
Provides a flow to the runner to be run based on the provided configuration.
Will create a deployment for the provided flow and register the deployment with the runner.
Args:
flow: A flow for the runner to run.name: The name to give the created deployment. Will default to the name
of the runner.interval: An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.cron: A cron schedule of when to execute runs of this flow.rrule: An rrule schedule of when to execute runs of this flow.paused: Whether or not to set the created deployment as paused.schedule: A schedule object defining when to execute runs of this deployment.
Used to provide additional scheduling options like timezone or parameters.schedules: A list of schedule objects defining when to execute runs of this flow.
Used to define multiple schedules or additional scheduling options like timezone.concurrency_limit: The maximum number of concurrent runs of this flow to allow.triggers: A list of triggers that should kick of a run of this flow.parameters: A dictionary of default parameter values to pass to runs of this flow.description: A description for the created deployment. Defaults to the flow's
description if not provided.tags: A list of tags to associate with the created deployment for organizational
purposes.version: A version for the created deployment. Defaults to the flow's version.entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
entrypoint, ensure that the module will be importable in the execution environment.astop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L676" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>astop(self) -> None
Stops the runner's polling cycle. Async version.
cancel_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L673" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cancel_all(self) -> None
execute_bundle <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L798" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute_bundle(self, bundle: SerializedBundle, cwd: Path | str | None = None, env: dict[str, str | None] | None = None) -> None
Executes a bundle in a subprocess.
Deprecated: Use execute_bundle() from prefect._experimental.bundles.execute
instead.
execute_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L701" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute_flow_run(self, flow_run_id: UUID, entrypoint: str | None = None, command: str | None = None, cwd: Path | str | None = None, env: dict[str, str | None] | None = None, task_status: anyio.abc.TaskStatus[int] = anyio.TASK_STATUS_IGNORED, stream_output: bool = True) -> anyio.abc.Process | multiprocessing.context.SpawnProcess | None
Executes a single flow run with the given ID.
Deprecated: Use FlowRunExecutorContext with EngineCommandStarter instead.
Execution will wait to monitor for cancellation requests. Exits once the flow run process has exited.
Returns:
execute_in_background <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L662" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>execute_in_background(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> 'concurrent.futures.Future[Any]'
Executes a function in the background.
handle_sigterm <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L579" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_sigterm(self, *args: Any, **kwargs: Any) -> None
Gracefully shuts down the runner when a SIGTERM is received.
has_slots_available <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L1356" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>has_slots_available(self) -> bool
Determine if the flow run limit has been reached.
Returns:
last_polled <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L294" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>last_polled(self) -> datetime.datetime | None
last_polled <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L300" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>last_polled(self, value: datetime.datetime | None) -> None
reschedule_current_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L1149" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reschedule_current_flow_runs(self) -> None
Reschedules all flow runs that are currently running.
Deprecated: SIGTERM rescheduling is now handled inline by the CLI execute path.
This should only be called when the runner is shutting down because it kill all child processes and short-circuit the crash detection logic.
start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L588" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>start(self, run_once: bool = False, webserver: Optional[bool] = None) -> None
Starts a runner.
The runner will begin monitoring for and executing any scheduled work for all added flows.
Args:
run_once: If True, the runner will through one query loop and then exit.webserver: a boolean for whether to start a webserver for this runner. If provided,
overrides the default on the runnerExamples:
Initialize a Runner, add two flows, and serve them by starting the Runner:
import asyncio
from prefect import flow, Runner
@flow
def hello_flow(name):
print(f"hello {name}")
@flow
def goodbye_flow(name):
print(f"goodbye {name}")
if __name__ == "__main__"
runner = Runner(name="my-runner")
# Will be runnable via the API
runner.add_flow(hello_flow)
# Run on a cron schedule
runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})
asyncio.run(runner.start())
stop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L697" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>stop(self)
Stops the runner's polling cycle.