docs/v3/how-to-guides/migrate/upgrade-agents-to-workers.mdx
Upgrading from agents to workers significantly enhances the experience of deploying flows by simplifying the specification of each flow's infrastructure and runtime environment.
<Note> This guide is for users who are upgrading from agents (a deployment pattern specific to `prefect>2.0,<3.0`) to workers. If you are new to Prefect, we recommend starting with the [Prefect Quickstart](/v3/get-started/quickstart/). </Note>A worker is the fusion of an agent with an infrastructure block. Like agents, workers poll a work pool for flow runs that are scheduled to start. Like infrastructure blocks, workers are typed. They work with only one kind of infrastructure, and they specify the default configuration for jobs submitted to that infrastructure.
Accordingly, workers are not a drop-in replacement for agents. Using workers requires deploying flows differently. In particular, deploying a flow with a worker does not involve specifying an infrastructure block. Instead, infrastructure configuration is specified on the work pool and passed to each worker that polls work from that pool.
.deploy() or the alternative deployment experience with
prefect.yaml are more flexible and easier to use than block and agent-based deployments.Deployment CLI and Python SDK:
prefect deployment build <entrypoint>/prefect deployment apply --> prefect deploy
Prefect now automatically detects flows in your repo and provides a wizard to guide you through setting required attributes for your deployments.
Deployment.build_from_flow --> flow.deploy
Configuring remote flow code storage:
storage blocks --> pull action
When using the YAML-based deployment API, you can configure a pull action in your prefect.yaml
file to specify how to retrieve flow code for your deployments. You can use configuration from your
existing storage blocks to define your pull action through templating.
When using the Python deployment API, you can pass any storage block to the flow.deploy method to
specify how to retrieve flow code for your deployment.
Configuring flow run infrastructure:
infrastructure blocks --> typed work pool
Default infrastructure config is now set on the typed work pool, and can be overwritten by individual deployments.
Managing multiple deployments:
Create and/or update many deployments at once through a
prefect.yaml
file or use the deploy
function.
You can set storage blocks as the pull action in a prefect.yaml file.
Infrastructure blocks have configuration fields similar to typed work pools.
Deployment-level infrastructure overrides operate in much the same way.
infra_override -> job_variable
The process for starting an agent and starting a worker in your environment are virtually identical.
prefect agent start --pool <work pool name> --> prefect worker start --pool <work pool name>
Worker Helm chart
If you host your agents in a Kubernetes cluster, you can use the Prefect worker Helm chart to host workers in your cluster. </Tip>
If you have existing deployments that use infrastructure blocks, you can quickly upgrade them to be compatible with workers by following these steps:
This new work pool replaces your infrastructure block.
You can use the .publish_as_work_pool
method on any infrastructure block to create a work pool with the same configuration.
{/*
<!-- vale off -->*/}
For example, if you have a KubernetesJob infrastructure block named 'my-k8s-job', you can
create a work pool with the same configuration with this script:
from prefect.infrastructure import KubernetesJob
KubernetesJob.load("my-k8s-job").publish_as_work_pool()
Running this script creates a work pool named 'my-k8s-job' with the same configuration as your infrastructure block.
{/*
<!-- vale on -->*/}
<Tip> **Serving flows** If you are using a `Process` infrastructure block and a `LocalFilesystem` storage block (or aren't using an infrastructure and storage block at all), you can use [`flow.serve`](/v3/deploy/index) to create a deployment without specifying a work pool name or start a worker.This is a quick way to create a deployment for a flow and manage your deployments if you don't need the dynamic infrastructure creation or configuration offered by workers. </Tip>
This worker replaces your agent and polls your new work pool for flow runs to execute.
prefect worker start -p <work pool name>
To deploy your flows to the new work pool, use flow.deploy for a Pythonic deployment
experience or prefect deploy for a YAML-based deployment experience.
If you currently use Deployment.build_from_flow, we recommend using flow.deploy.
If you currently use prefect deployment build and prefect deployment apply, we recommend
using prefect deploy.
flow.deployIf you have a Python script that uses Deployment.build_from_flow to create a deployment, you
can replace it with flow.deploy.
You can translate most arguments to Deployment.build_from_flow directly to flow.deploy,
but here are some possible changes you may need:
infrastructure with work_pool_name.
.publish_as_work_pool method on your infrastructure block, use the
name of the created work pool.infra_overrides with job_variables.storage with a call to flow.from_source.
flow.from_source loads your flow from a remote storage location and makes it deployable.
You can pass your existing storage block to the source argument of flow.from_source.Below are some examples of how to translate Deployment.build_from_flow into flow.deploy.
Using agents and Deployment.build_from_flow to deploy a flow from a local file looked like:
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a Python script!")
if __name__ == "__main__":
Deployment.build_from_flow(
my_flow,
name="my-deployment",
parameters=dict(name="Marvin"),
)
When using workers, you can accomplish the same local-storage deployment with flow.deploy:
from pathlib import Path
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a Python script!")
if __name__ == "__main__":
my_flow.from_source(
source=str(Path(__file__).parent),
entrypoint="example.py:my_flow",
).deploy(
name="my-deployment",
parameters=dict(name="Marvin"),
work_pool_name="local",
)
You can then start a worker to execute scheduled runs, pulling the flow code from example.py:
# starts a worker and creates `local` Process work pool if it doesn't exist
prefect worker start --pool local
If you currently use a storage block to load your flow code but no infrastructure block:
from prefect import flow
from prefect.filesystems import GitHub
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a GitHub repo!")
if __name__ == "__main__":
Deployment.build_from_flow(
my_flow,
name="my-deployment",
storage=GitHub.load("demo-repo"),
parameters=dict(name="Marvin"),
)
You can use flow.from_source to load your flow from the same location and flow.deploy to
create a deployment:
from prefect import flow
from prefect.blocks.system import Secret
from prefect.runner.storage import GitRepository
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a GitHub repo!")
if __name__ == "__main__":
flow.from_source(
source=GitRepository(
url="https://github.com/me/myrepo.git",
credentials={"username": "oauth2", "access_token": Secret.load("my-github-pat")},
),
entrypoint="example.py:my_flow"
).deploy(
name="my-deployment",
parameters=dict(name="Marvin"),
work_pool_name="local", # or the name of your work pool
)
For the code below, you need to create a work pool from your infrastructure block and pass it to
flow.deploy as the work_pool_name argument. You also need to pass your storage block to
flow.from_source as the source argument.
from prefect import flow
from prefect.deployments import Deployment
from prefect.filesystems import GitHub # this block class no longer exists
from prefect.infrastructure.kubernetes import KubernetesJob
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a GitHub repo!")
repo = GitHub.load("demo-repo")
if __name__ == "__main__":
Deployment.build_from_flow(
my_flow,
name="my-deployment",
storage=repo,
entrypoint="example.py:my_flow",
infrastructure=KubernetesJob.load("my-k8s-job"),
infra_overrides=dict(pull_policy="Never"),
parameters=dict(name="Marvin"),
)
The equivalent deployment code using flow.deploy should look like this:
from prefect import flow
if __name__ == "__main__":
flow.from_source(
source="https://github.com/me/myrepo.git",
entrypoint="example.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-k8s-job",
job_variables=dict(pull_policy="Never"),
parameters=dict(name="Marvin"),
)
{/*
<!-- vale off -->*/}
<Note> When using `flow.from_source().deploy()` with a remote `source`such as a `GitHub` block or `str` URL like https://github.com/me/myrepo.git), the flow you're deploying doesn't need to be available locally before running your script. See the [SDK reference](https://reference.prefect.io/prefect/#prefect.Flow.from_source) for more info on `from_source`. </Note>{/*
<!-- vale on -->*/}
If you currently bake your flow code into a Docker image before deploying, you can use the
image argument of flow.deploy to build a Docker image as part of your deployment process:
from prefect import flow
@flow(log_prints=True)
def my_flow(name: str = "world"):
print(f"Hello {name}! I'm a flow from a Docker image!")
if __name__ == "__main__":
my_flow.deploy(
name="my-deployment",
image="my-repo/my-image:latest",
work_pool_name="my-k8s-job",
job_variables=dict(pull_policy="Never"),
parameters=dict(name="Marvin"),
)
You can skip a flow.from_source call when building an image with flow.deploy. Prefect
keeps track of the flow's source code location in the image and loads it from that location when the
flow is executed.
prefect deployWith agents, you may have multiple deployment.yaml files. But under worker deployment
patterns, each repo has a single prefect.yaml file located at the root of the repo
that contains deployment configuration
for all flows in that repo.
</Warning>
To set up a new prefect.yaml file for your deployments, run the following command from the root
level of your repo:
prefect deploy
This starts a wizard that guides you through setting up your deployment.
<Note> **For step 4, select `y` on the last prompt to save the configuration for the deployment.**Saving the configuration for your deployment results in a prefect.yaml file populated
with your first deployment. You can use this YAML file to edit and define multiple deployments
for this repo.
</Note>
You can add more deployments
to the deployments list in your prefect.yaml file and/or by continuing to use the deployment
creation wizard.
For more information on deployments, check out our in-depth guide for deploying flows to work pools.