Back to Prefect

runner

docs/v3/api-ref/python/prefect-deployments-runner.mdx

3.6.30.dev324.0 KB
Original Source

prefect.deployments.runner

Objects for creating and configuring deployments for flows using serve functionality.

Example: ```python import time from prefect import flow, serve

@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    # to_deployment creates RunnerDeployment instances
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")

    serve(slow_deploy, fast_deploy)
```

Functions

adeploy <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L1426" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
adeploy(*deployments: RunnerDeployment) -> List[UUID]

Deploy the provided list of deployments to dynamic infrastructure via a work pool.

By default, calling this function will build a Docker image for the deployments, push it to a registry, and create each deployment via the Prefect API that will run the corresponding flow on the given schedule.

If you want to use an existing image, you can pass build=False to skip building and pushing an image.

Args:

  • *deployments: A list of deployments to deploy.
  • work_pool_name: The name of the work pool to use for these deployments. Defaults to the value of PREFECT_DEFAULT_WORK_POOL_NAME.
  • image: The name of the Docker image to build, including the registry and repository. Pass a DockerImage instance to customize the Dockerfile used and build arguments.
  • build: Whether or not to build a new image for the flow. If False, the provided image will be used as-is and pulled at runtime.
  • push: Whether or not to skip pushing the built image to a registry.
  • print_next_steps_message: Whether or not to print a message with next steps after deploying the deployments.

Returns:

  • A list of deployment IDs for the created/updated deployments.

Examples:

Deploy a group of flows to a work pool:

python
from prefect import deploy, flow

@flow(log_prints=True)
def local_flow():
    print("I'm a locally defined flow!")

if __name__ == "__main__":
    await adeploy(
        local_flow.to_deployment(name="example-deploy-local-flow"),
        flow.from_source(
            source="https://github.com/org/repo.git",
            entrypoint="flows.py:my_flow",
        ).to_deployment(
            name="example-deploy-remote-flow",
        ),
        work_pool_name="my-work-pool",
        image="my-registry/my-image:dev",
    )

deploy <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L1665" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
deploy(*deployments: RunnerDeployment) -> List[UUID]

Deploy the provided list of deployments to dynamic infrastructure via a work pool.

By default, calling this function will build a Docker image for the deployments, push it to a registry, and create each deployment via the Prefect API that will run the corresponding flow on the given schedule.

If you want to use an existing image, you can pass build=False to skip building and pushing an image.

Args:

  • *deployments: A list of deployments to deploy.
  • work_pool_name: The name of the work pool to use for these deployments. Defaults to the value of PREFECT_DEFAULT_WORK_POOL_NAME.
  • image: The name of the Docker image to build, including the registry and repository. Pass a DockerImage instance to customize the Dockerfile used and build arguments.
  • build: Whether or not to build a new image for the flow. If False, the provided image will be used as-is and pulled at runtime.
  • push: Whether or not to skip pushing the built image to a registry.
  • print_next_steps_message: Whether or not to print a message with next steps after deploying the deployments.

Returns:

  • A list of deployment IDs for the created/updated deployments.

Examples:

Deploy a group of flows to a work pool:

python
from prefect import deploy, flow

@flow(log_prints=True)
def local_flow():
    print("I'm a locally defined flow!")

if __name__ == "__main__":
    deploy(
        local_flow.to_deployment(name="example-deploy-local-flow"),
        flow.from_source(
            source="https://github.com/org/repo.git",
            entrypoint="flows.py:my_flow",
        ).to_deployment(
            name="example-deploy-remote-flow",
        ),
        work_pool_name="my-work-pool",
        image="my-registry/my-image:dev",
    )

Classes

DeploymentApplyError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L148" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when an error occurs while applying a deployment.

RunnerDeployment <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L154" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

A Prefect RunnerDeployment definition, used for specifying and building deployments.

Attributes:

  • name: A name for the deployment (required).
  • version: An optional version for the deployment; defaults to the flow's version
  • description: An optional description of the deployment; defaults to the flow's description
  • tags: An optional list of tags to associate with this deployment; note that tags are used only for organizational purposes. For delegating work to workers, see work_queue_name.
  • schedule: A schedule to run this deployment on, once registered
  • parameters: A dictionary of parameter values to pass to runs created from this deployment
  • path: The path to the working directory for the workflow, relative to remote storage or, if stored on a local filesystem, an absolute path
  • entrypoint: The path to the entrypoint for the workflow, always relative to the path
  • parameter_openapi_schema: The parameter schema of the flow, including defaults.
  • enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment.
  • work_pool_name: The name of the work pool to use for this deployment.
  • work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used.
  • job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings.
  • _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.

Methods:

aapply <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L688" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
aapply(self, schedules: Optional[List[dict[str, Any]]] = None, work_pool_name: Optional[str] = None, image: Optional[str] = None, version_info: Optional[VersionInfo] = None) -> UUID

Registers this deployment with the API and returns the deployment's ID.

Args:

  • work_pool_name: The name of the work pool to use for this deployment.
  • image: The registry, name, and tag of the Docker image to use for this deployment. Only used when the deployment is deployed to a work pool.
  • version_info: The version information to use for the deployment.

Returns: The ID of the created deployment.

afrom_storage <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L1147" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
afrom_storage(cls, storage: RunnerStorage, entrypoint: str, name: str, flow_name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, version_type: Optional[VersionType] = None, enforce_parameter_schema: bool = True, work_pool_name: Optional[str] = None, work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None) -> 'RunnerDeployment'

Create a RunnerDeployment from a flow located at a given entrypoint and stored in a local storage location.

Args:

  • entrypoint: The path to a file containing a flow and the name of the flow function in the format ./path/to/file.py\:flow_func_name, or a module path to a flow function in the format module.path.flow_func_name.
  • name: A name for the deployment
  • flow_name: The name of the flow to deploy
  • storage: A storage object to use for retrieving flow code. If not provided, a URL must be provided.
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not the deployment is paused.
  • schedule: A schedule object defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • schedules: A list of schedule objects defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow's description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow's version.
  • version_type: The type of version information to use for the deployment. The version type will be inferred if not provided.
  • enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment.
  • work_pool_name: The name of the work pool to use for this deployment.
  • work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used.
  • job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings.
  • _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.

apply <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L734" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
apply(self, schedules: Optional[List[dict[str, Any]]] = None, work_pool_name: Optional[str] = None, image: Optional[str] = None, version_info: Optional[VersionInfo] = None) -> UUID

Registers this deployment with the API and returns the deployment's ID.

Args:

  • work_pool_name: The name of the work pool to use for this deployment.
  • image: The registry, name, and tag of the Docker image to use for this deployment. Only used when the deployment is deployed to a work pool.
  • version_info: The version information to use for the deployment.

Returns: The ID of the created deployment.

entrypoint_type <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L292" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
entrypoint_type(self) -> EntrypointType

from_entrypoint <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L1043" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
from_entrypoint(cls, entrypoint: str, name: str, flow_name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, enforce_parameter_schema: bool = True, work_pool_name: Optional[str] = None, work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None) -> 'RunnerDeployment'

Configure a deployment for a given flow located at a given entrypoint.

Args:

  • entrypoint: The path to a file containing a flow and the name of the flow function in the format ./path/to/file.py\:flow_func_name.
  • name: A name for the deployment
  • flow_name: The name of the flow to deploy
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not to set this deployment as paused.
  • schedules: A list of schedule objects defining when to execute runs of this deployment. Used to define multiple schedules or additional scheduling options like timezone.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow's description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow's version.
  • enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment.
  • work_pool_name: The name of the work pool to use for this deployment.
  • work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used.
  • job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings.
  • _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.

from_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L892" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
from_flow(cls, flow: 'Flow[..., Any]', name: str, interval: Optional[Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, version_type: Optional[VersionType] = None, enforce_parameter_schema: bool = True, work_pool_name: Optional[str] = None, work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None) -> 'RunnerDeployment'

Configure a deployment for a given flow.

Args:

  • flow: A flow function to deploy
  • name: A name for the deployment
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not to set this deployment as paused.
  • schedule: A schedule object defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • schedules: A list of schedule objects defining when to execute runs of this deployment. Used to define multiple schedules or additional scheduling options like timezone.
  • concurrency_limit: The maximum number of concurrent runs this deployment will allow.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow's description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow's version.
  • version_type: The type of version information to use for the deployment.
  • enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment.
  • work_pool_name: The name of the work pool to use for this deployment.
  • work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used.
  • job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings.
  • _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.

from_storage <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L1288" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
from_storage(cls, storage: RunnerStorage, entrypoint: str, name: str, flow_name: Optional[str] = None, interval: Optional[Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]] = None, cron: Optional[Union[Iterable[str], str]] = None, rrule: Optional[Union[Iterable[str], str]] = None, paused: Optional[bool] = None, schedule: Optional[Schedule] = None, schedules: Optional['FlexibleScheduleList'] = None, concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, parameters: Optional[dict[str, Any]] = None, triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, version: Optional[str] = None, version_type: Optional[VersionType] = None, enforce_parameter_schema: bool = True, work_pool_name: Optional[str] = None, work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None) -> 'RunnerDeployment'

Create a RunnerDeployment from a flow located at a given entrypoint and stored in a local storage location.

Args:

  • entrypoint: The path to a file containing a flow and the name of the flow function in the format ./path/to/file.py\:flow_func_name, or a module path to a flow function in the format module.path.flow_func_name.
  • name: A name for the deployment
  • flow_name: The name of the flow to deploy
  • storage: A storage object to use for retrieving flow code. If not provided, a URL must be provided.
  • interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.
  • cron: A cron schedule of when to execute runs of this flow.
  • rrule: An rrule schedule of when to execute runs of this flow.
  • paused: Whether or not the deployment is paused.
  • schedule: A schedule object defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • schedules: A list of schedule objects defining when to execute runs of this deployment. Used to provide additional scheduling options like timezone or parameters.
  • triggers: A list of triggers that should kick of a run of this flow.
  • parameters: A dictionary of default parameter values to pass to runs of this flow.
  • description: A description for the created deployment. Defaults to the flow's description if not provided.
  • tags: A list of tags to associate with the created deployment for organizational purposes.
  • version: A version for the created deployment. Defaults to the flow's version.
  • version_type: The type of version information to use for the deployment. The version type will be inferred if not provided.
  • enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment.
  • work_pool_name: The name of the work pool to use for this deployment.
  • work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used.
  • job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings.
  • _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.

full_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L296" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
full_name(self) -> str

reconcile_paused <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L347" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reconcile_paused(cls, values: dict[str, Any]) -> dict[str, Any]

reconcile_schedules <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L352" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reconcile_schedules(cls, values: dict[str, Any]) -> dict[str, Any]

validate_automation_names <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L321" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
validate_automation_names(self)

Ensure that each trigger has a name for its automation if none is provided.

validate_deployment_parameters <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L330" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
validate_deployment_parameters(self) -> Self

Update the parameter schema to mark frozen parameters as readonly.

validate_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments/runner.py#L315" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
validate_name(cls, value: str) -> str