docs/v3/advanced/generate-custom-sdk.mdx
The prefect sdk generate command creates a typed Python file from your deployments. This gives you IDE autocomplete and static type checking when triggering deployment runs programmatically.
Generate a typed SDK for all deployments in your workspace:
prefect sdk generate --output ./my_sdk.py
Generate an SDK for specific flows:
prefect sdk generate --output ./my_sdk.py --flow my-etl-flow
Generate an SDK for specific deployments:
prefect sdk generate --output ./my_sdk.py --deployment my-flow/production
Combine multiple filters:
prefect sdk generate --output ./my_sdk.py \
--flow etl-flow \
--flow data-sync \
--deployment analytics/daily
The generated SDK provides a deployments.from_name() method that returns a typed deployment object:
from my_sdk import deployments
# Get a deployment by name
deployment = deployments.from_name("my-etl-flow/production")
# Run with parameters
future = deployment.run(
source="s3://my-bucket/data",
batch_size=100,
)
# Get the flow run ID immediately
print(f"Started flow run: {future.flow_run_id}")
# Wait for completion and get result
result = future.result()
Use with_options() to set tags, scheduling, and other run configuration:
from my_sdk import deployments
from datetime import datetime, timedelta
future = deployments.from_name("my-etl-flow/production").with_options(
tags=["manual", "production"],
idempotency_key="daily-run-2024-01-15",
scheduled_time=datetime.now() + timedelta(hours=1),
flow_run_name="custom-run-name",
).run(
source="s3://bucket",
)
Available options:
tags: Tags to apply to the flow runidempotency_key: Unique key to prevent duplicate runswork_queue_name: Override the work queueas_subflow: Run as a subflow of the current flowscheduled_time: Schedule the run for a future timeflow_run_name: Custom name for the flow runUse with_infra() to override work pool job variables:
from my_sdk import deployments
future = deployments.from_name("my-etl-flow/production").with_infra(
image="my-registry/my-image:latest",
cpu_request="2",
memory="8Gi",
).run(
source="s3://bucket",
)
The available job variables depend on your work pool type. The generated SDK provides type hints for the options available on each deployment's work pool.
In an async context, use run_async():
import asyncio
from my_sdk import deployments
async def trigger_deployment():
future = await deployments.from_name("my-etl-flow/production").run_async(
source="s3://bucket",
)
result = await future.result()
return result
# Run it
result = asyncio.run(trigger_deployment())
from my_sdk import deployments
future = (
deployments.from_name("my-etl-flow/production")
.with_options(tags=["production"])
.with_infra(memory="8Gi")
.run(source="s3://bucket", batch_size=100)
)
The SDK is generated from server-side metadata. Regenerate it when:
The generate command overwrites the existing file:
prefect sdk generate --output ./my_sdk.py