docs/v3/advanced/form-building.mdx
Parameterizing workflows is a critical part of orchestration. It allows you to create contracts between modular workflows in your organization and empower less-technical users to interact with your workflows intuitively.
Pydantic is a powerful library for data validation using Python type annotations, which is used by Prefect to build a parameter schema for your workflow.
This allows you to:
In this tutorial, we'll craft a workflow signature that the Prefect UI will render as a self-documenting form.
Let's say you have a workflow that triggers a marketing email blast which looks like:
@flow
def send_marketing_email(
mailing_lists: list[str],
subject: str,
body: str,
test_mode: bool = False,
attachments: list[str] | None = None
):
"""
Send a marketing email blast to the given lists.
Args:
mailing_lists: A list of lists to email.
subject: The subject of the email.
body: The body of the email.
test_mode: Whether to send a test email.
attachments: A list of attachments to include in the email.
"""
...
When you deploy this flow, Prefect will automatically inspect your function signature and generate a form for you:
This is good enough for many cases, but consider these additional constraints that could arise from business needs or tech stack restrictions:
mailing_listssubject must not exceed 30 charactersattachments are allowedYou can simply check these constraints in the body of your flow function:
@flow
def send_marketing_email(...):
if len(subject) > 30:
raise ValueError("Subject must be less than 30 characters")
if mailing_lists not in ["newsletter", "customers", "beta-testers"]:
raise ValueError("Invalid list to email")
if len(attachments) > 5:
raise ValueError("Too many attachments")
# etc...
but there are several downsides to this:
To improve on this, we will use pydantic to build a convenient, self-documenting, and reusable flow signature that the Prefect UI can build a better form from.
Let's address the constraints on mailing_lists, subject, and attachments.
Literal to restrict valid valuesthere are only a few valid values for
mailing_lists
Say our valid mailing lists are: ["newsletter", "customers", "beta-testers"]
We can define a Literal to specify the valid values for the mailing_lists parameter.
from typing import Literal
MailingList = Literal["newsletter", "customers", "beta-testers"]
from enum import Enum
class MailingList(Enum):
NEWSLETTER = "newsletter"
CUSTOMERS = "customers"
BETA_TESTERS = "beta-testers"
BaseModel subclass to group and constrain parametersBoth the subject and attachments parameters have constraints that we want to enforce.
the
subjectmust not exceed 30 characters
the
attachmentsmust not exceed 5 items
Additionally, the subject, body, and attachments parameters are all related to the same thing: the content of the email.
We can define a BaseModel subclass to group these parameters together and apply these constraints.
from pydantic import BaseModel, Field
class EmailContent(BaseModel):
subject: str = Field(max_length=30)
body: str = Field(default=...)
attachments: list[str] = Field(default_factory=list, max_length=5)
Similarly, you can:
title to Field to override the field name in the formEmailContent to add a description to this group of parameters in the formNow that we have defined the MailingList and EmailContent types, we can use them in our flow signature:
@flow
def send_marketing_email(
mailing_lists: list[MailingList],
content: EmailContent,
test_mode: bool = False,
):
...
The resulting form looks like this:
where the mailing_lists parameter renders as a multi-select dropdown that only allows the Literal values from our MailingList type.
and any constraints you've defined on the EmailContent fields will be enforced before the run is submitted.
from prefect import flow from pydantic import BaseModel, Field
MailingList = Literal["newsletter", "customers", "beta-testers"]
class EmailContent(BaseModel): subject: str = Field(max_length=30) body: str = Field(default=...) attachments: list[str] = Field(default_factory=list, max_length=5)
@flow def send_marketing_email( mailing_list: list[MailingList], content: EmailContent, test_mode: bool = False, ): pass
if name == "main": send_marketing_email.serve()
</Accordion>
### Using `json_schema_extra` to order fields in the form
By default, your flow parameters are rendered in the order defined by your `@flow` function signature.
Within a given `BaseModel` subclass, parameters are rendered in the following order:
- parameters with a `default` value are rendered first, alphabetically
- parameters without a `default` value are rendered next, alphabetically
You can control the order of the parameters within a `BaseModel` subclass by passing `json_schema_extra` to the `Field` constructor with a `position` key.
Taking our `EmailContent` model from the previous example, let's enforce that `subject` should be displayed first, then `body`, then `attachments`.
```python
class EmailContent(BaseModel):
subject: str = Field(
max_length=30,
description="The subject of the email",
json_schema_extra=dict(position=0),
)
body: str = Field(default=..., json_schema_extra=dict(position=1))
attachments: list[str] = Field(
default_factory=list,
max_length=5,
json_schema_extra=dict(position=2),
)
The resulting form looks like this:
If your parameter model includes Callable or Type fields, Prefect can't serialize them
to JSON. The UI shows a placeholder like <MyParams> instead of actual values, and
automation templates can't access individual fields.
Pydantic's ImportString
type solves this. It accepts a dotted import path as a string (e.g. "mymodule.my_func"),
resolves it to the real Python object at validation time, and serializes back to a string
for JSON.
For example, an order ingestion flow that needs a different normalizer per vendor:
from datetime import datetime
from typing import Any
from pydantic import BaseModel
class Order(BaseModel):
order_id: str
customer_email: str
total_cents: int
currency: str
placed_at: datetime
class StripeCharge(BaseModel):
id: str
receipt_email: str
amount: int
currency: str
created: int
class ShopifyOrder(BaseModel):
name: str
email: str
total_price: str
currency: str
created_at: str
def normalize_stripe(records: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"order_id": r["id"],
"customer_email": r["receipt_email"],
"total_cents": r["amount"],
"currency": r["currency"],
"placed_at": datetime.fromtimestamp(r["created"]).isoformat(),
}
for r in records
]
def normalize_shopify(records: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"order_id": r["name"],
"customer_email": r["email"],
"total_cents": int(float(r["total_price"]) * 100),
"currency": r["currency"],
"placed_at": r["created_at"],
}
for r in records
]
Use ImportString in the parameter model so the normalizer and raw schema are editable
strings in the UI, but resolve to real Python objects at runtime:
from typing import Any, Callable, Type
from pydantic import BaseModel, ImportString, TypeAdapter
from prefect import flow, task
class IngestConfig(BaseModel):
vendor: str
normalizer: ImportString[Callable[[list[dict[str, Any]]], list[dict[str, Any]]]]
raw_schema: ImportString[Type[BaseModel]]
@task
def fetch_raw_records(vendor: str) -> list[dict[str, Any]]:
...
@task
def validate_raw(
records: list[dict[str, Any]], schema: type[BaseModel]
) -> list[BaseModel]:
adapter = TypeAdapter(list[schema])
return adapter.validate_python(records)
@task
def normalize(
records: list[dict[str, Any]],
normalizer: Callable[[list[dict[str, Any]]], list[dict[str, Any]]],
) -> list[dict[str, Any]]:
return normalizer(records)
@flow(flow_run_name="ingest-{config.vendor}", log_prints=True)
def ingest_orders(config: IngestConfig):
raw = fetch_raw_records(config.vendor)
validated_raw = validate_raw(raw, config.raw_schema)
orders = normalize(raw, config.normalizer)
print(f"vendor: {config.vendor}, validated: {len(validated_raw)}, produced: {len(orders)}")
if __name__ == "__main__":
ingest_orders.serve(
name="order-ingestion",
parameters={
"config": {
"vendor": "stripe",
"normalizer": "vendors.normalize_stripe",
"raw_schema": "vendors.StripeCharge",
}
},
)
Anyone can override the vendor config when triggering a run:
prefect deployment run 'ingest-orders/order-ingestion' \
-p 'config={
"vendor": "shopify",
"normalizer": "vendors.normalize_shopify",
"raw_schema": "vendors.ShopifyOrder"
}'
The server stores clean JSON that the UI and automations can read:
{
"config": {
"vendor": "shopify",
"normalizer": "vendors.normalize_shopify",
"raw_schema": "vendors.ShopifyOrder"
}
}
We have now embedded the constraints on our parameters in the types that describe our flow signature, which means:
As you craft a schema for your flow signature, you may want to inspect the raw OpenAPI schema that pydantic generates, as it is what the Prefect UI uses to build the form.
Call model_json_schema() on your BaseModel subclass to inspect the raw schema.
from rich import print as pprint
from pydantic import BaseModel, Field
class EmailContent(BaseModel):
subject: str = Field(max_length=30)
body: str = Field(default=...)
attachments: list[str] = Field(default_factory=list, max_length=5)
pprint(EmailContent.model_json_schema())
{
'properties': {
'subject': {'maxLength': 30, 'title': 'Subject', 'type': 'string'},
'body': {'title': 'Body', 'type': 'string'},
'attachments': {'items': {'type': 'string'}, 'maxItems': 5, 'title': 'Attachments', 'type': 'array'}
},
'required': ['subject', 'body'],
'title': 'EmailContent',
'type': 'object'
}
For more on constrained types and validation features available in pydantic, see their documentation on models and types.