docs/v3/how-to-guides/workflows/write-and-run.mdx
Make a Python function a workflow by adding the @flow decorator to it:
from prefect import flow
@flow
def my_workflow() -> str:
return "Hello, world!"
Run the workflow by calling it like a normal Python function:
my_workflow()
You can create a workflow from a method on a class:
from prefect import flow
class MyClass:
@flow
def my_instance_method(self):
return "Hello, from an instance method!"
@flow
@classmethod
def my_class_method(cls):
return "Hello, from a class method!"
@flow
@staticmethod
def my_static_method():
return "Hello, from a static method!"
MyClass().my_instance_method()
MyClass.my_class_method()
MyClass.my_static_method()
or from a generator function:
from prefect import flow
@flow
def generator():
for i in range(10):
yield i
for val in generator():
print(val)
You can view all workflow runs in the Prefect UI.
You can create tasks and child flows to organize your workflow logic.
from prefect import flow, task
@task
def generate_a_number():
return random.randint(0, 100)
@flow
def is_number_even(number: int):
return number % 2 == 0
@flow
def even_or_odd():
number = generate_a_number()
if is_number_even(number):
print("The number is even")
else:
print("The number is odd")
Each task and child flow is a separate unit of work and is displayed in the Prefect UI.
To apply a timeout to a flow or task to prevent it from running for too long, use the timeout_seconds keyword argument.
from prefect import flow
import time
@flow(timeout_seconds=1, log_prints=True)
def show_timeouts():
print("I will execute")
time.sleep(5)
print("I will not execute")
Task timeouts work differently depending on how the task is executed:
Async tasks: Timeouts use cooperative cancellation at await points. When the timeout is reached, the task is cancelled at the next await statement. This provides reliable timeout behavior for async code.
Sync tasks called directly: When a sync task is called directly (not via .submit()), it typically runs on the main thread where timeouts can use OS signals to interrupt execution, including blocking operations.
Sync tasks via .submit() with ThreadPoolTaskRunner: When a sync task is submitted using to a ThreadPoolTaskRunner (the default), it runs in a worker thread. In this context, timeouts cannot interrupt blocking operations like time.sleep(), network requests, or file I/O. The timeout will only take effect after the blocking operation completes naturally.
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
import time
# This timeout may not interrupt the sleep when submitted with ThreadPoolTaskRunner
@task(timeout_seconds=1)
def sync_task_with_blocking():
time.sleep(10) # Will complete before timeout triggers
# This timeout works reliably with async
@task(timeout_seconds=1)
async def async_task_with_sleep():
import anyio
await anyio.sleep(10) # Will be interrupted at timeout
@flow(task_runner=ThreadPoolTaskRunner())
def flow_with_threads():
sync_task_with_blocking.submit() # Timeout may not interrupt blocking operations
@flow(task_runner=ProcessPoolTaskRunner())
def flow_with_processes():
sync_task_with_blocking.submit() # Timeout can interrupt because the task runs on the main thread of the new process
All flows can be configured by passing arguments to the decorator. Flows accept the following optional settings:
| Argument | Description |
|---|---|
description | An optional string description for the flow. If not provided, the description is pulled from the docstring for the decorated function. |
name | An optional name for the flow. If not provided, the name is inferred from the function. |
retries | An optional number of times to retry on flow run failure. |
<span class="no-wrap">retry_delay_seconds</span> | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero. |
flow_run_name | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow's parameters as variables; you can also provide this name as a function that returns a string. |
task_runner | An optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ThreadPoolTaskRunner is used. |
timeout_seconds | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called. |
validate_parameters | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is True. |
version | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |
Tasks allow for customization through optional arguments that can be provided to the task decorator.
| Argument | Description |
|---|---|
name | An optional name for the task. If not provided, the name is inferred from the function name. |
description | An optional string description for the task. If not provided, the description is pulled from the docstring for the decorated function. |
tags | An optional set of tags associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime. |
timeout_seconds | An optional number of seconds indicating a maximum runtime for the task. If the task exceeds this runtime, it will be marked as failed. Note: For sync tasks using ThreadPoolTaskRunner (the default), the timeout cannot interrupt blocking operations. See Task timeout behavior for details. |
cache_key_fn | An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again. |
cache_policy | An optional policy that determines what information is used to generate cache keys. Available policies include INPUTS, TASK_SOURCE, RUN_ID, FLOW_PARAMETERS, and NO_CACHE. Can be combined using the + operator. |
cache_expiration | An optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire. |
retries | An optional number of times to retry on task run failure. |
retry_delay_seconds | An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero. |
log_prints | An optional boolean indicating whether to log print statements. |
See all possible options in the Python SDK docs.
For example, provide optional name and description arguments to a task:
from prefect import task
@task(name="hello-task", description="This task says hello.")
def my_task():
print("Hello, I'm a task")
Distinguish runs of this task by providing a task_run_name.
Python's standard string formatting syntax applies:
import datetime
from prefect import flow, task
@task(name="My Example Task",
description="An example task for a tutorial.",
task_run_name="hello-{name}-on-{date:%A}")
def my_task(name, date):
pass
@flow
def my_flow():
# creates a run with a name like "hello-marvin-on-Thursday"
my_task(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))
if __name__ == "__main__":
my_flow()
Additionally, this setting accepts a function that returns a string for the task run name:
import datetime
from prefect import flow, task
def generate_task_name():
date = datetime.datetime.now(datetime.timezone.utc)
return f"{date:%A}-is-a-lovely-day"
@task(name="My Example Task",
description="An example task for the docs.",
task_run_name=generate_task_name)
def my_task(name):
pass
@flow
def my_flow():
# creates a run with a name like "Thursday-is-a-lovely-day"
my_task(name="marvin")
if __name__ == "__main__":
my_flow()