docs/v3/advanced/submit-flows-directly-to-dynamic-infrastructure.mdx
This feature is currently in beta. While we encourage you to try it out and provide feedback, please be aware that the API may change in future releases, potentially including breaking changes. </Warning>
Prefect allows you to submit workflows directly to different infrastructure types without requiring a deployment. This enables you to dynamically choose where your workflows run based on their requirements, such as:
Submitting workflows directly to dynamic infrastructure provides several advantages:
Direct submission of workflows is currently supported for the following infrastructures:
| Infrastructure | Required Package | Decorator |
|---|---|---|
| Docker | prefect-docker | @docker |
| Kubernetes | prefect-kubernetes | @kubernetes |
| AWS ECS | prefect-aws | @ecs |
| Google Cloud Run | prefect-gcp | @cloud_run |
| Google Vertex AI | prefect-gcp | @vertex_ai |
| Azure Container Instances | prefect-azure | @azure_container_instance |
Each package can be installed using pip, for example:
pip install prefect-docker
Before submitting workflows to specific infrastructure, you need:
Create work pools for each infrastructure type using the Prefect CLI:
prefect work-pool create NAME --type WORK_POOL_TYPE
For detailed information on creating and configuring work pools, refer to the work pools documentation.
To enable Prefect to run workflows in remote infrastructure, work pools need an associated storage location to store serialized versions of submitted workflows and results from workflow runs.
Configure storage for your work pools using one of the supported storage types:
<CodeGroup> ```bash S3 prefect work-pool storage configure s3 WORK_POOL_NAME \ --bucket BUCKET_NAME \ --aws-credentials-block-name BLOCK_NAME ```prefect work-pool storage configure gcs WORK_POOL_NAME \
--bucket BUCKET_NAME \
--gcp-credentials-block-name BLOCK_NAME
prefect work-pool storage configure azure-blob-storage WORK_POOL_NAME \
--container CONTAINER_NAME \
--azure-blob-storage-credentials-block-name BLOCK_NAME
To allow Prefect to upload and download serialized workflows, you can create a block containing credentials with permission to access your configured storage location.
If a credentials block is not provided, Prefect will use the default credentials (for example, a local profile or an IAM role) as determined by the corresponding cloud provider.
You can inspect your storage configuration using:
prefect work-pool storage inspect WORK_POOL_NAME
When using the @docker decorator with a local Docker engine, you can use volume mounts to share data between your Docker container and host machine.
Here's an example:
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect_docker.experimental import docker
result_storage = LocalFileSystem(basepath="/tmp/results")
result_storage.save("result-storage", overwrite=True)
@docker(
work_pool="above-ground",
volumes=["/tmp/results:/tmp/results"],
)
@flow(result_storage=result_storage)
def run_in_docker(name: str):
return(f"Hello, {name}!")
print(run_in_docker("world")) # prints "Hello, world!"
To use local storage, ensure that:
LocalFileSystem block's basepath matches the path specified in the volume mount
</Note>
An infrastructure-bound flow supports three execution modes: direct calling, .submit(), and .submit_to_work_pool(). Each mode targets a different use case depending on whether you need blocking or non-blocking execution and whether the submitting machine has direct access to the target infrastructure.
| Method | Blocking | Requires local infrastructure access | Requires a running worker |
|---|---|---|---|
| Direct call | Yes | Yes | No |
.submit() | No | Yes | No |
.submit_to_work_pool() | No | No | Yes |
Calling an infrastructure-bound flow directly submits it to remote infrastructure and blocks until the run completes. Prefect spins up a temporary local worker to create the infrastructure and monitor the run.
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes
@kubernetes(work_pool="olympic")
@flow
def my_remote_flow(name: str):
print(f"Hello {name}!")
@flow
def my_flow():
# Blocks until my_remote_flow completes on Kubernetes
my_remote_flow("Marvin")
my_flow()
When you run this code on your machine, my_flow executes locally, while my_remote_flow is submitted to run in a Kubernetes job. The call blocks until the Kubernetes job finishes.
.submit()Use .submit() when you want to submit a flow to remote infrastructure without blocking the caller. Like a direct call, .submit() spins up a temporary local worker to create the infrastructure, but it returns a PrefectFlowRunFuture immediately so you can continue running other work.
Use .submit() when:
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes
@kubernetes(work_pool="olympic")
@flow
def train_model(dataset: str):
print(f"Training on {dataset}")
return {"accuracy": 0.95}
@flow
def orchestrator():
# Submit two training jobs without waiting
future_a = train_model.submit(dataset="dataset-a")
future_b = train_model.submit(dataset="dataset-b")
# Retrieve results when needed
result_a = future_a.result()
result_b = future_b.result()
print(f"Results: {result_a}, {result_b}")
orchestrator()
In this example, both training jobs are submitted to Kubernetes concurrently. The orchestrator flow continues executing and only blocks when it calls .result() on each future.
.submit_to_work_pool()Use .submit_to_work_pool() when you want to submit a flow to remote infrastructure but the submitting machine does not have direct access to create that infrastructure. Instead of spinning up a local worker, this method creates a flow run and places it in the work pool for an already-running worker to pick up.
Use .submit_to_work_pool() when:
from prefect import flow
from prefect_aws.experimental import ecs
@ecs(work_pool="my-ecs-pool")
@flow
def process_data(source: str):
print(f"Processing {source}")
return {"rows": 1000}
# Submit to the work pool for an existing worker to execute
future = process_data.submit_to_work_pool(source="s3://my-bucket/data.csv")
# Retrieve the result once the worker completes the run
result = future.result()
print(result)
Before calling .submit_to_work_pool(), start a worker that polls the target work pool:
prefect worker start --pool my-ecs-pool
PrefectFlowRunFutureBoth .submit() and .submit_to_work_pool() return a PrefectFlowRunFuture. Use this object to check the status of the flow run, wait for it to finish, or retrieve the result.
future = my_flow.submit(name="Marvin")
# Check the current state without blocking
print(future.state)
# Block until the run completes
future.wait()
# Retrieve the result (blocks if the run is still in progress)
result = future.result()
Parameters passed to infrastructure-bound flows are serialized with cloudpickle to allow them to be transported to the destination infrastructure.
Most Python objects can be serialized with cloudpickle, but objects like database connections cannot be serialized. For parameters that cannot be serialized, create the object inside your infrastructure-bound workflow.
</Note>
You can override the default configuration by providing additional kwargs to the infrastructure decorator:
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes
@kubernetes(
work_pool="my-kubernetes-pool",
namespace="custom-namespace"
)
@flow
def custom_namespace_flow():
pass
Any kwargs passed to the infrastructure decorator will override the corresponding default value in the base job template for the specified work pool.
When a flow runs on remote infrastructure, your code is serialized and sent to the execution environment. However, non-Python files such as configuration files, data files, or model artifacts are not included by default. Use the include_files parameter on any infrastructure decorator to bundle additional files alongside your flow.
from prefect import flow
from prefect_docker.experimental import docker
@docker(
work_pool="my-pool",
include_files=["config.yaml", "data/"]
)
@flow
def my_flow():
import yaml
with open("config.yaml") as f:
config = yaml.safe_load(f)
print(config)
The include_files parameter accepts a list of relative paths and glob patterns. Paths are resolved relative to the directory containing the flow file.
| Pattern | Description |
|---|---|
"config.yaml" | A single file |
"data/" | All files in a directory (recursive) |
"*.yaml" | Glob pattern matching files in the flow directory |
"data/**/*.csv" | Recursive glob pattern |
"!*.test.py" | Negation pattern to exclude previously matched files |
Patterns are processed in order. Negation patterns (prefixed with !) remove files already matched by earlier patterns:
from prefect import flow
from prefect_kubernetes.experimental import kubernetes
@kubernetes(
work_pool="my-pool",
include_files=["*.json", "!fixtures/*.json"]
)
@flow
def process_json():
...
This example includes all JSON files except those in the fixtures/ directory.
.prefectignoreIf a .prefectignore file exists in the flow file's directory or at the project root (detected via pyproject.toml), its patterns are applied to filter out matching files. The .prefectignore file uses gitignore-style syntax:
# .prefectignore
*.log
tmp/
__pycache__/
Files that match a .prefectignore pattern are excluded from the bundle even if they match an include_files pattern.
Certain common directories and file types are always excluded from directory and glob collection, even without a .prefectignore file:
__pycache__/, *.pyc, *.pyo.git/, .hg/, .svn/node_modules/, .venv/, venv/.idea/, .vscode/.DS_Store, Thumbs.dbHidden files and directories (names starting with .) are also excluded when collecting directories.
By default, Prefect uses uv run to execute bundle upload and execution commands. If your execution environment already has the required dependencies installed—for example, a custom Docker image, a Poetry-managed environment, or a system-level Python interpreter—you can override the default launcher to skip the uv run wrapper entirely.
You can configure launchers at two levels:
launcher parameter on an infrastructure decoratorPass the launcher parameter to any infrastructure decorator to override the command prefix used for both bundle upload and execution:
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes
@kubernetes(
work_pool="my-kubernetes-pool",
launcher=["python"],
)
@flow
def my_flow():
print("Running with system Python!")
The launcher parameter accepts:
list[str] that applies to both upload and execution (for example, ["python"] or ["poetry", "run", "python"])dict with "upload" and/or "execution" keys when you need different launchers for each phase:from prefect import flow
from prefect_docker.experimental import docker
@docker(
work_pool="my-docker-pool",
launcher={
"upload": ["python"],
"execution": ["poetry", "run", "python"],
},
)
@flow
def my_flow():
print("Different launchers for upload and execution!")
When an execution launcher override is provided, Prefect skips both the uv run wrapper and the uv pip freeze dependency snapshot. Ensure your execution environment has all required dependencies pre-installed. If you only override the upload launcher, Prefect still captures the dependency snapshot and uses uv run at execution time.
You can also override the launcher with .with_options():
custom_flow = my_flow.with_options(launcher=["python3.12"])
Configure a launcher for all flows that use a work pool by passing launcher flags during storage configuration:
prefect work-pool storage configure s3 my-pool \
--bucket my-bucket \
--aws-credentials-block-name my-creds \
--launcher python
This sets python as the launcher for both upload and execution steps on the work pool. To pass additional arguments to the launcher executable, use --launcher-arg (repeatable):
prefect work-pool storage configure s3 my-pool \
--bucket my-bucket \
--aws-credentials-block-name my-creds \
--launcher python \
--launcher-arg -X \
--launcher-arg utf8
To configure different launchers for the upload and execution phases, use the --upload-launcher and --execution-launcher flags:
prefect work-pool storage configure s3 my-pool \
--bucket my-bucket \
--aws-credentials-block-name my-creds \
--upload-launcher python \
--execution-launcher poetry \
--execution-launcher-arg run \
--execution-launcher-arg python
You can also combine --launcher with phase-specific overrides. The shared launcher serves as the base, and phase-specific flags replace the executable for that phase:
prefect work-pool storage configure gcs my-pool \
--bucket my-bucket \
--gcp-credentials-block-name my-creds \
--launcher python \
--execution-launcher poetry \
--execution-launcher-arg run \
--execution-launcher-arg python
In this example, upload uses python while execution uses poetry run python.
After configuring a launcher, verify the storage settings with:
prefect work-pool storage inspect my-pool
Use --output json for machine-readable output that includes the launcher configuration for both upload and execution steps:
prefect work-pool storage inspect my-pool --output json