Back to Prefect

runner

docs/v3/api-ref/python/prefect-runner-runner.mdx

3.6.30.dev312.8 KB
Original Source

prefect.runner.runner

Runners are responsible for managing the execution of all deployments.

When creating a deployment using either flow.serve or the serve utility, they also will poll for scheduled runs.

Example: ```python import time from prefect import flow, serve

@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")

    # serve generates a Runner instance
    serve(slow_deploy, fast_deploy)
```

Classes

ProcessMapEntry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L168" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Runner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L173" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

aadd_deployment <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L343" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
aadd_deployment(self, deployment: 'RunnerDeployment') -> UUID

Registers the deployment with the Prefect API and will monitor for work once the runner is started. Async version.

Args:

  • deployment: A deployment for the runner to register.

aadd_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L379" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
aadd_flow(self, flow: Flow[Any, Any], name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, datetime.timedelta]], int, float, datetime.timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, enforce_parameter_schema: bool = True, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH) -> UUID

Provides a flow to the runner to be run based on the provided configuration. Async version.

Will create a deployment for the provided flow and register the deployment with the runner.

Args:

  • flow: A flow for the runner to run.
  • name: The name to give the created deployment. Will default to the name of the runner.
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not to set the created deployment as paused.
  • schedule: A schedule object defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • schedules: A list of schedule objects defining when to execute runs of this flow. Used to define multiple schedules or additional scheduling options like timezone.
  • concurrency_limit: The maximum number of concurrent runs of this flow to allow.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow's description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow's version.
  • entrypoint_type: Type of entrypoint to use for the deployment. When using a module path entrypoint, ensure that the module will be importable in the execution environment.

add_deployment <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L364" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
add_deployment(self, deployment: 'RunnerDeployment') -> UUID

Registers the deployment with the Prefect API and will monitor for work once the runner is started.

Args:

  • deployment: A deployment for the runner to register.

add_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L477" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
add_flow(self, flow: Flow[Any, Any], name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, datetime.timedelta]], int, float, datetime.timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, enforce_parameter_schema: bool = True, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH) -> UUID

Provides a flow to the runner to be run based on the provided configuration.

Will create a deployment for the provided flow and register the deployment with the runner.

Args:

  • flow: A flow for the runner to run.
  • name: The name to give the created deployment. Will default to the name of the runner.
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not to set the created deployment as paused.
  • schedule: A schedule object defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • schedules: A list of schedule objects defining when to execute runs of this flow. Used to define multiple schedules or additional scheduling options like timezone.
  • concurrency_limit: The maximum number of concurrent runs of this flow to allow.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow's description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow's version.
  • entrypoint_type: Type of entrypoint to use for the deployment. When using a module path entrypoint, ensure that the module will be importable in the execution environment.

astop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L676" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
astop(self) -> None

Stops the runner's polling cycle. Async version.

cancel_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L673" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cancel_all(self) -> None

execute_bundle <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L798" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute_bundle(self, bundle: SerializedBundle, cwd: Path | str | None = None, env: dict[str, str | None] | None = None) -> None

Executes a bundle in a subprocess.

Deprecated: Use execute_bundle() from prefect._experimental.bundles.execute instead.

execute_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L701" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute_flow_run(self, flow_run_id: UUID, entrypoint: str | None = None, command: str | None = None, cwd: Path | str | None = None, env: dict[str, str | None] | None = None, task_status: anyio.abc.TaskStatus[int] = anyio.TASK_STATUS_IGNORED, stream_output: bool = True) -> anyio.abc.Process | multiprocessing.context.SpawnProcess | None

Executes a single flow run with the given ID.

Deprecated: Use FlowRunExecutorContext with EngineCommandStarter instead.

Execution will wait to monitor for cancellation requests. Exits once the flow run process has exited.

Returns:

  • The flow run process.

execute_in_background <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L662" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
execute_in_background(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> 'concurrent.futures.Future[Any]'

Executes a function in the background.

handle_sigterm <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L579" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_sigterm(self, *args: Any, **kwargs: Any) -> None

Gracefully shuts down the runner when a SIGTERM is received.

has_slots_available <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L1356" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
has_slots_available(self) -> bool

Determine if the flow run limit has been reached.

Returns:

    • bool: True if the limit has not been reached, False otherwise.

last_polled <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L294" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
last_polled(self) -> datetime.datetime | None

last_polled <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L300" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
last_polled(self, value: datetime.datetime | None) -> None

reschedule_current_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L1149" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reschedule_current_flow_runs(self) -> None

Reschedules all flow runs that are currently running.

Deprecated: SIGTERM rescheduling is now handled inline by the CLI execute path.

This should only be called when the runner is shutting down because it kill all child processes and short-circuit the crash detection logic.

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L588" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self, run_once: bool = False, webserver: Optional[bool] = None) -> None

Starts a runner.

The runner will begin monitoring for and executing any scheduled work for all added flows.

Args:

  • run_once: If True, the runner will through one query loop and then exit.
  • webserver: a boolean for whether to start a webserver for this runner. If provided, overrides the default on the runner

Examples:

Initialize a Runner, add two flows, and serve them by starting the Runner:

python
import asyncio
from prefect import flow, Runner

@flow
def hello_flow(name):
    print(f"hello {name}")

@flow
def goodbye_flow(name):
    print(f"goodbye {name}")

if __name__ == "__main__"
    runner = Runner(name="my-runner")

    # Will be runnable via the API
    runner.add_flow(hello_flow)

    # Run on a cron schedule
    runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

    asyncio.run(runner.start())

stop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L697" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
stop(self)

Stops the runner's polling cycle.