plans/2026-01-06-custom-deployment-sdks.md
Create a CLI command that generates a typed Python SDK from workspace deployments, enabling:
Before (current state):
from prefect.deployments import run_deployment
# No autocomplete, no type checking, runtime errors for typos
run_deployment(
name="my-etl-flow/production", # Easy to typo
parameters={"sorce": "s3://bucket"}, # Typo not caught until runtime # codespell:ignore sorce
)
After (with generated SDK):
from my_sdk import deployments
# IDE autocomplete, type checking, errors caught immediately
deployments.from_name("my-etl-flow/production").with_options(
timeout=60, # Run options via method
).with_infra(
memory="8Gi", # Job variables typed per work pool
).run(
source="s3://bucket", # Flow params are direct kwargs
batch_size=100, # Typos caught by type checker
)
Type safety: The from_name() method uses @overload decorators so each deployment name returns the correctly-typed class with its specific parameters.
prefect sdk generate --output ./my_sdk.py [--flow NAME] [--deployment NAME]
Options:
| Flag | Description |
|---|---|
--output, -o | Output file path (required) |
--flow, -f | Filter to specific flow (multiple allowed) |
--deployment, -d | Filter to specific deployment (multiple allowed) |
The SDK file contains:
DeploymentName Literal type - All deployment names for autocomplete: Literal["flow/deploy", ...]{WorkPoolName}JobVariables for typed with_infra() kwargswith_options(), with_infra(), run(), and run_async() methodsdeployments namespace - With from_name() method using @overload for type-safe dispatchUsage: deployments.from_name("flow/deploy").with_options(timeout=60).with_infra(memory="8Gi").run(param=val)
Important: All type information comes from server-side metadata (JSON Schema stored with deployments and work pools). The generator does not inspect flow source code—it works entirely from the Prefect API.
We chose to use direct kwargs for flow parameters, run options, and job variables rather than TypedDict parameters:
# ✅ Chosen approach: direct kwargs
deployments.from_name("my-flow/prod").with_options(
timeout=60,
tags=["production"],
).run(
source="s3://bucket",
batch_size=100,
)
# ❌ Rejected: TypedDict parameters
deployments.from_name("my-flow/prod").run(
parameters={"source": "s3://bucket", "batch_size": 100},
options={"timeout": 60, "tags": ["production"]},
)
Rationale:
@flow(retries=3) and task.with_options(timeout=60)Public Interface (CLI only)
└── prefect sdk generate
Private Implementation (src/prefect/_sdk/)
├── Schema conversion (JSON Schema → TypedDict)
├── Name utilities (safe Python identifiers)
├── Data models (internal representation)
├── API fetching (deployment/work pool data)
├── Template rendering (Jinja2)
└── Generator orchestration
Design Decision: All implementation is in a private _sdk module. Users interact only via CLI with no public Python API.
Outcome: Utility that converts JSON Schema to Python TypedDict definitions
Handles:
| JSON Schema | Python Type | Notes |
|---|---|---|
{"type": "string"} | str | |
{"type": "integer"} | int | |
{"type": "number"} | float | |
{"type": "boolean"} | bool | |
{"type": "null"} | None | |
{"type": "array", "items": {...}} | list[T] | Recursive item type |
{"type": "object", "additionalProperties": true} | dict[str, Any] | Generic dict |
{"type": "object", "additionalProperties": {"type": "string"}} | dict[str, str] | Typed dict values |
{"type": "object", "properties": {...}} | Nested TypedDict | Generate inline or named |
{"anyOf": [{"type": "T"}, {"type": "null"}]} | T | None | Nullable pattern |
{"anyOf": [{"type": "string"}, {"type": "integer"}]} | str | int | Union types |
{"anyOf": [...3+ types...]} | T1 | T2 | T3 | Multi-type union |
{"enum": ["A", "B", "C"], "type": "string"} | Literal["A", "B", "C"] | String enums |
{"enum": [1, 2, 3], "type": "integer"} | Literal[1, 2, 3] | Integer enums |
{"$ref": "#/definitions/Foo"} | Resolved type | See reference resolution |
{"prefixItems": [...], "type": "array"} | tuple[T1, T2, ...] | Fixed-length tuples |
No type key present | Any | Non-Pydantic user classes |
{} (empty schema) | Any | Permissive fallback |
Reference Resolution ($ref and definitions):
"definitions": {"Foo": {...}} with "$ref": "#/definitions/Foo" pointers"$defs" instead of "definitions" - support bothAny with warningRequired vs Optional Logic:
NotRequired) if:
"required" array AND"default" key in its property definitionNotRequired[T]) if:
"required" array, OR"default" key (regardless of required list)This matches how Prefect's server validates parameters (actions.py:287-304).
Status:
Phase 1 Implementation Notes (deviations from plan):
Objects with properties return dict[str, Any], not nested TypedDict
{"type": "object", "properties": {...}} → Nested TypedDictdict[str, Any] insteadCircular reference handling is more nuanced
Any with warning"CircularReferenceError. Indirect circular refs (objects with recursive properties) return dict[str, Any] because object conversion doesn't traverse properties. extract_fields_from_schema() catches CircularReferenceError and emits Any with warning.Union flattening uses bracket/quote-aware splitting
_split_union_top_level() helper that respects [] brackets and '"quotes| corrupts types like list[str | int] or Literal['a | b']Enum formatting uses repr() instead of manual escaping
repr() for proper handling of control characters, unicode, and quotesrepr() correctly handles all edge cases (newlines, tabs, backslashes, etc.)Float enum values are supported
100% test coverage achieved
uv run pytest tests/_sdk/ --cov=src/prefect/_sdk --cov-report=term-missingOutcome: Utilities for converting arbitrary names to valid Python identifiers, plus internal data models
Naming Conversion:
| Input | Identifier | Class Name |
|---|---|---|
my-flow | my_flow | MyFlow |
123-start | _123_start | _123Start |
class | class_ | Class_ |
my_flow | my_flow | MyFlow |
café-data | caf_data | CafData |
🚀-deploy | _deploy | Deploy |
Conversion Rules:
class → class_)_unnamedReserved Names (must be avoided via suffix):
| Context | Reserved Names |
|---|---|
| Flow identifiers | flows (conflicts with root namespace) |
| Deployment identifiers | run, run_async (conflicts with methods) |
| TypedDict class names | Generated class names in same file |
Collision Resolution:
_2, _3, etc.my-flow and my_flow both become my_flow → second becomes my_flow_2Data Models:
WorkPoolInfo - Work pool name, type, variables schema (JSON Schema dict)DeploymentInfo - Deployment name, flow name, parameter schema, work pool referenceFlowInfo - Flow name with list of deploymentsSDKData - Complete data needed for generation (flows, work pools, metadata)Status:
Phase 2 Implementation Notes (deviations from plan):
Unicode handling differs from plan
a—b → a_b not ab) while allowing accented chars to normalize (é → e)café-data → cafe_data (not caf_data), 🚀-deploy → deploy (not _deploy)Class names don't get underscore suffix for keywords
class → Class_class → ClassClass is valid. PascalCase naturally avoids keywords.Expanded reserved names beyond plan
{flows}, Deployment={run, run_async}{flows, deployments, DeploymentName}, Deployment={run, run_async, with_options, with_infra}, Module={all}Reserved names stored in normalized form
"all" not "__all__") since safe_identifier() normalizes before checkingsafe_identifier("__all__", ..., "module") would return "all" (not avoided)WorkPoolInfo.type renamed to pool_type
WorkPoolInfo has type fieldpool_typetypeSDKData.deployment_names is derived, not stored
SDKData has deployment_names as stored fieldflowsDeterministic ordering added
deployment_names and all_deployments() return sorted resultsAdditional SDKData convenience methods
all_deployments(), flow_count, deployment_count, work_pool_countSDKGenerationMetadata.api_url added
api_url fieldGerman ß limitation
straße → straeOutcome: Jinja2 template that produces valid, type-safe Python code
This is the core of the feature—it defines exactly what users receive when they run the generator.
The output file has 5 distinct sections, generated in this order:
┌─────────────────────────────────────────────────────────────┐
│ 1. MODULE HEADER │
│ - Docstring with generation metadata │
│ - Imports (typing, TypedDict, NotRequired, overload) │
│ - TYPE_CHECKING block for FlowRun import │
├─────────────────────────────────────────────────────────────┤
│ 2. DEPLOYMENT NAME LITERAL │
│ - DeploymentName = Literal["flow/deploy", ...] │
│ - Enables autocomplete for all deployment names │
├─────────────────────────────────────────────────────────────┤
│ 3. WORK POOL TYPEDDICTS │
│ - {WorkPoolName}JobVariables TypedDict │
├─────────────────────────────────────────────────────────────┤
│ 4. DEPLOYMENT CLASSES │
│ - One class per deployment │
│ - with_options() for run config (tags, scheduling, etc.) │
│ - with_infra() for job variables (typed per work pool) │
│ - run()/run_async() returns PrefectFlowRunFuture │
├─────────────────────────────────────────────────────────────┤
│ 5. DEPLOYMENT NAMESPACE │
│ - Single `deployments` class with from_name() method │
│ - @overload per deployment for type-safe return types │
│ - This is the public entry point │
└─────────────────────────────────────────────────────────────┘
Generated metadata helps users understand the SDK's origin and freshness:
"""
Prefect SDK - Auto-generated typed client for workspace deployments.
Generated at: 2026-01-06T14:30:00Z
Prefect version: 3.2.0
Workspace: my-workspace (if available)
This file was auto-generated by `prefect sdk generate`.
Do not edit manually - regenerate after deployment changes.
"""
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, Literal, overload
from typing_extensions import NotRequired, TypedDict
if TYPE_CHECKING:
from prefect.futures import PrefectFlowRunFuture
Python 3.10+ Compatibility:
NotRequired is imported from typing_extensions (not typing) for Python 3.10 supportTypedDict from typing_extensions has better feature support than typing.TypedDictLiteral from typing (available since 3.8) for deployment name typesoverload from typing for type-safe from_name() dispatchImport design:
PrefectFlowRunFuture under TYPE_CHECKING avoids circular imports and heavy import chainstyping_extensions and prefect (already a dependency)A Literal type containing all deployment names enables IDE autocomplete:
DeploymentName = Literal[
"my-etl-flow/production",
"my-etl-flow/staging",
"data-sync/daily",
]
Design decisions:
flow-name/deployment-name format (matches run_deployment())DeploymentName = Literal[""] (placeholder, will fail at runtime)Each work pool gets a TypedDict for its job variables:
class KubernetesPoolJobVariables(TypedDict, total=False):
"""Job variables for work pool: kubernetes-pool"""
image: NotRequired[str]
namespace: NotRequired[str]
cpu_request: NotRequired[str]
memory_request: NotRequired[str]
Design decisions:
total=False with explicit NotRequired gives clearest IDE hints{WorkPoolName}JobVariables (PascalCase)Each deployment gets a class with with_options(), with_infra(), run(), and run_async() methods:
class _MyEtlFlowProduction:
"""
Deployment: my-etl-flow/production
Work Pool: kubernetes-pool
"""
def __init__(self) -> None:
self._options: dict[str, Any] = {}
def _copy_with_options(self, options: dict[str, Any]) -> "_MyEtlFlowProduction":
"""Create a new instance with the given options dict."""
new = _MyEtlFlowProduction()
new._options = options
return new
def with_options(
self,
*,
tags: Iterable[str] | None = None,
idempotency_key: str | None = None,
work_queue_name: str | None = None,
as_subflow: bool | None = None,
scheduled_time: datetime | None = None,
flow_run_name: str | None = None,
) -> "_MyEtlFlowProduction":
"""Create a new deployment handle with updated run options.
Returns a new instance with merged options (does not mutate self).
This matches the behavior of Flow.with_options() and Task.with_options().
Note: timeout and poll_interval are not included here - use the
PrefectFlowRunFuture.result(timeout=...) method for waiting.
"""
new_options = self._options.copy()
if tags is not None:
new_options["tags"] = tags
if idempotency_key is not None:
new_options["idempotency_key"] = idempotency_key
if work_queue_name is not None:
new_options["work_queue_name"] = work_queue_name
if as_subflow is not None:
new_options["as_subflow"] = as_subflow
if scheduled_time is not None:
new_options["scheduled_time"] = scheduled_time
if flow_run_name is not None:
new_options["flow_run_name"] = flow_run_name
return self._copy_with_options(new_options)
def with_infra(
self,
*,
# Typed kwargs from work pool's job variables schema
image: str | None = None,
namespace: str | None = None,
cpu_request: str | None = None,
memory: str | None = None,
) -> "_MyEtlFlowProduction":
"""Create a new deployment handle with updated job variables.
Returns a new instance with merged options (does not mutate self).
Job variable types are derived from the work pool schema.
"""
new_options = self._options.copy()
job_variables = new_options.get("job_variables", {}).copy()
if image is not None:
job_variables["image"] = image
if namespace is not None:
job_variables["namespace"] = namespace
if cpu_request is not None:
job_variables["cpu_request"] = cpu_request
if memory is not None:
job_variables["memory"] = memory
if job_variables:
new_options["job_variables"] = job_variables
return self._copy_with_options(new_options)
def run(
self,
source: str, # Required flow param
batch_size: int = 100, # Optional flow param with default
full_refresh: bool = False,
) -> "PrefectFlowRunFuture":
"""Run the my-etl-flow/production deployment synchronously.
Returns a PrefectFlowRunFuture that can be used to:
- Get the flow_run_id immediately
- Call .result() to wait for completion and get the result
- Call .state to check current state
"""
from prefect.deployments import run_deployment
from prefect.futures import PrefectFlowRunFuture
parameters: dict[str, Any] = {"source": source}
if batch_size != 100:
parameters["batch_size"] = batch_size
if full_refresh != False:
parameters["full_refresh"] = full_refresh
flow_run = run_deployment(
name="my-etl-flow/production",
parameters=parameters,
**self._options,
)
return PrefectFlowRunFuture(flow_run_id=flow_run.id)
async def run_async(
self,
source: str,
batch_size: int = 100,
full_refresh: bool = False,
) -> "PrefectFlowRunFuture":
"""Run the my-etl-flow/production deployment asynchronously.
Returns a PrefectFlowRunFuture that can be used to:
- Get the flow_run_id immediately
- Call .result() to wait for completion and get the result
- Call .state to check current state
"""
from prefect.deployments import run_deployment
from prefect.futures import PrefectFlowRunFuture
parameters: dict[str, Any] = {"source": source}
if batch_size != 100:
parameters["batch_size"] = batch_size
if full_refresh != False:
parameters["full_refresh"] = full_refresh
flow_run = await run_deployment(
name="my-etl-flow/production",
parameters=parameters,
**self._options,
)
return PrefectFlowRunFuture(flow_run_id=flow_run.id)
Design decisions:
_{FlowName}{DeploymentName} (underscore prefix = private)with_options() for run config - tags, scheduling, idempotency (timeout/polling handled by future)with_infra() for job variables - typed kwargs derived from work pool schemaFlow.with_options() / Task.with_options() behaviorrun()/run_async() - enables IDE autocomplete and type checkingwith_options() and with_infra() params are keyword-only (after *) and optionalfrom prefect.deployments import run_deploymentrun() and run_async() return PrefectFlowRunFuture for non-blocking access to flow run ID and eventual resultFlow parameter handling:
required array, no default) → required kwargsrequired or has default) → kwargs with defaultsdefault field when presentrun_deployment()Edge cases:
with_infra() method generatedwith_infra() method generatedrun()/run_async() have no paramsThe single public entry point with type-safe from_name() method:
class deployments:
"""
Access deployments by name.
Usage:
from my_sdk import deployments
flow_run = deployments.from_name("my-etl-flow/production").with_options(
timeout=60,
).with_infra(
memory="8Gi",
).run(
source="s3://bucket",
batch_size=100,
)
Available deployments:
- my-etl-flow/production
- my-etl-flow/staging
- data-sync/daily
"""
@overload
@staticmethod
def from_name(name: Literal["my-etl-flow/production"]) -> _MyEtlFlowProduction: ...
@overload
@staticmethod
def from_name(name: Literal["my-etl-flow/staging"]) -> _MyEtlFlowStaging: ...
@overload
@staticmethod
def from_name(name: Literal["data-sync/daily"]) -> _DataSyncDaily: ...
@staticmethod
def from_name(name: DeploymentName) -> _MyEtlFlowProduction | _MyEtlFlowStaging | _DataSyncDaily:
"""Get a deployment by name.
Args:
name: The deployment name in "flow-name/deployment-name" format.
Returns:
A deployment object with run() and run_async() methods.
Raises:
KeyError: If the deployment name is not found.
"""
_deployments: dict[str, Any] = {
"my-etl-flow/production": _MyEtlFlowProduction(),
"my-etl-flow/staging": _MyEtlFlowStaging(),
"data-sync/daily": _DataSyncDaily(),
}
return _deployments[name]
__all__ = ["deployments", "DeploymentName"]
Design decisions:
deployments (not Deployments) for natural usagefrom_name() is a @staticmethod - no instance needed@overload for each deployment enables type-safe return typesLiteral["exact-name"] for precise typingdeployments and DeploymentName for user convenienceThe renderer takes SDKData and produces the output file:
render_sdk(data: SDKData, output_path: Path) -> None
Responsibilities:
Template context includes:
generation_time - ISO timestampprefect_version - Current Prefect versionworkspace_name - Workspace name if availablemodule_name - Output file stem (for docstring examples)work_pools - Dict of work pool data with converted TypedDict fieldsdeployments - List of deployment data with parameter schemasSchema Edge Cases:
| Scenario | Behavior |
|---|---|
parameter_openapi_schema is None | No flow param kwargs, only *, options |
parameter_openapi_schema is {} | No flow param kwargs, only *, options |
parameter_openapi_schema is {"type": "object", "properties": {}} | No flow param kwargs, only *, options |
base_job_template["variables"] missing | Use base RunOptions, no job_variables field |
base_job_template["variables"]["properties"] is {} | Generate {WorkPoolName}RunOptions without job_variables |
Schema has circular $ref | Emit Any type with generation warning |
Schema uses $defs instead of definitions | Support both (Pydantic v2 compatibility) |
Property has no type key | Emit Any type |
definitions key missing from schema | Use empty dict as fallback |
Naming Edge Cases:
| Scenario | Behavior |
|---|---|
| Deployment class name conflicts | Append numeric suffix (_MyEtlFlowProduction2) |
| Deployment name contains quotes | Escape in Literal type string |
| Very long deployment names | Class name truncated, full name in Literal |
| Name is entirely non-ASCII | Use _unnamed with suffix if needed |
| Name is empty string | Use _unnamed with suffix if needed |
| Name is Python keyword | Append underscore (class → class_) |
Structural Edge Cases:
| Scenario | Behavior |
|---|---|
| Deployment with no work pool | No with_infra() method generated |
| No deployments match filter | Error with helpful message |
| Zero deployments in workspace | Error with helpful message |
| Deployments of same flow have different schemas | Use first deployment's schema, log warning |
General rules:
with_infra() method generatedwith_infra() method generatedrun()/run_async() have no params@overload in the deployments classTemplate:
Renderer:
Verification:
ast.parse)deployments.from_name().with_options().with_infra().run()Phase 3 Implementation Notes (deviations from plan):
Single deployment doesn't use @overload
@overload in the deployments class@overload (pyright requires 2+ overloads)run() uses cast() for return type
run() returns FlowRun directly from run_deployment()cast("FlowRun", run_deployment(...))@async_dispatch decorator on run_deployment makes pyright think it returns a union typeWork pool TypedDicts use total=False without NotRequired
NotRequired[T] for optional fieldstotal=False on the TypedDict class (all fields optional)total=False is cleanerTests: 275 tests total, 47 new renderer tests
uv run pytest tests/_sdk/ -vReturn type is PrefectFlowRunFuture[Any]
run() and run_async() return PrefectFlowRunFuturePrefectFlowRunFuture[Any] with explicit type parameterConditional imports based on SDK content
cast only when deployments exist, overload only when 2+ deployments, TypedDict only when work pools existTYPE_CHECKING imports restored
FlowRun and PrefectFlowRunFuture imported under TYPE_CHECKINGSchema descriptions included in docstrings
Template locals use sdk prefix
parameters, options, etc._sdk_params, _sdk_options, _sdk_job_vars, etc.Reserved names simplified to only 'self'
run, run_async, with_options, with_infra for deployment contextself is reserved for deployment contextself would break signaturesWork pools sorted by name
Template loaded via importlib.resources
importlib.resources.files() for template loadingOutcome: API integration and orchestration layer
Prerequisites:
Fetcher Responsibilities:
Generator Responsibilities:
Error Handling Strategy:
| Error | Behavior |
|---|---|
| Not authenticated | Error: "Not authenticated. Run prefect cloud login or configure PREFECT_API_URL." |
| API unreachable | Error: "Could not connect to Prefect API at {url}. Check your configuration." |
| No deployments found | Error: "No deployments found in workspace." |
| No deployments match filter | Error: "No deployments matched filters. Found N deployments total." |
| Work pool fetch fails | Warning, continue without job_variables typing for affected deployments |
| Invalid schema in deployment | Warning, use Any type for affected parameters |
| Single deployment fetch fails | Warning, skip deployment, continue with others |
Partial Failure Policy: The generator should be resilient. Individual deployment or work pool failures should log warnings but not abort the entire generation. Only fail if:
Status:
Outcome: Public prefect sdk generate command
User Feedback:
File Handling:
Example Output (Success):
Fetching deployments...
⚠ Warning: Could not fetch work pool 'old-pool' - job_variables will be untyped for affected deployments
SDK generated successfully!
Flows: 3
Deployments: 12
Work pools: 2
Output: /path/to/my_sdk.py
Usage:
from my_sdk import deployments
Example Output (No Deployments):
Error: No deployments found in workspace.
Make sure you have deployed at least one flow:
prefect deploy
Status:
prefect sdk generate --help shows documentationuv run pytest tests/_sdk/)uv run pytest tests/cli/test_sdk.py)uv run pyright src/prefect/_sdk/ src/prefect/cli/sdk.py)uv run ruff check src/prefect/_sdk/ src/prefect/cli/sdk.py)pyright --strictast.parse() succeeds)--flow and --deployment flagsIn Scope:
Out of Scope:
asyncio.gather for work pool fetches)String Escaping in Generated Code:
The Jinja2 template must handle user-controlled strings (deployment names, flow names, descriptions) safely:
| Context | Handling |
|---|---|
| Docstrings | Escape or strip triple quotes (""" → ''' or filtered) |
| String literals | Escape quotes, backslashes |
| Identifiers | Already sanitized by naming utilities |
Example Problem:
# If flow description contains: """Hello"""
class _MyFlow:
"""
Flow: my-flow
Description: """Hello""" # SYNTAX ERROR!
"""
Solution: Strip or escape problematic characters in user-provided text before template rendering. The naming utilities handle identifiers; a separate text sanitizer handles docstring content.
Compatibility:
run_deployment() which is a stable public APIRecommendation: Regenerate SDK when: