docs/v3/how-to-guides/workflows/state-change-hooks.mdx
| Type | Flow | Task | Description |
|---|---|---|---|
on_completion | ✓ | ✓ | Executes when a flow or task run enters a Completed state. |
on_failure | ✓ | ✓ | Executes when a flow or task run enters a Failed state. |
<span class="no-wrap">on_cancellation</span> | ✓ | - | Executes when a flow run enters a Cancelling state. |
on_crashed | ✓ | - | Executes when a flow run enters a Crashed state. |
on_running | ✓ | ✓ | Executes when a flow or task run enters a Running state. |
To send a notification when a flow or task run fails, you can specify a on_failure hook.
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import get_current_settings
@flow(retries=1)
def failing_flow():
raise ValueError("oops!")
@failing_flow.on_failure
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load(
"slack-webhook/my-slack-webhook"
)
PREFECT_API_URL = get_current_settings().api.url
slack_webhook_block.notify(
(
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
)
)
if __name__ == "__main__":
failing_flow()
Retries are configured in this example, so the on_failure hook will not run until all retries have completed and the flow run enters a Failed state.
State change hooks run in the same process as your workflow and execution cannot be guaranteed. For more robust execution of logic in response to state changes, use an Automation. </Warning>
The on_running hook executes when a task enters a Running state, before the task body executes. This is useful for logging, metrics, or setting up runtime state:
from prefect import flow, task
import time
def record_start_time(task, task_run, state):
print(f"Task {task.name} started at {time.time()}")
@task(on_running=[record_start_time])
def my_task():
return "hello"
# Or use the decorator pattern
@task
def another_task():
return "world"
@another_task.on_running
def log_start(task, task_run, state):
print(f"Starting {task.name}!")
@flow
def my_flow():
my_task()
another_task()
Note that on_running hooks execute synchronously before the task body runs. If your hook takes 10 seconds, the task waits 10 seconds before starting.
When retries are configured, on_running hooks fire on each retry attempt, including the initial run. For example, a task configured with retries=2 will trigger its on_running hooks up to three times: once on the initial run and once for each retry attempt.
State change hooks run outside the active flow or task run context. This means
get_run_logger() will raise a MissingContextError if called inside a hook.
To emit logs from a hook that appear in the Prefect UI, use flow_run_logger (or
task_run_logger for task hooks) from prefect.logging.loggers. These functions
create a logger tied to the run using the parameters your hook already receives:
from prefect import flow
from prefect.logging.loggers import flow_run_logger
def my_hook(flow, flow_run, state):
logger = flow_run_logger(flow_run, flow)
logger.info(f"Flow {flow_run.name} entered state {state.name}")
@flow(on_failure=[my_hook])
def failing_flow():
raise ValueError("oops!")
kwargs to state change hooksYou can compose the with_options method to effectively pass arbitrary **kwargs to your hooks:
from functools import partial
from prefect import flow, task
data = {}
def my_hook(task, task_run, state, **kwargs):
data.update(state=state, **kwargs)
@task
def bad_task():
raise ValueError("meh")
@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
bad_task_with_a_hook = bad_task.with_options(
on_failure=[partial(my_hook, **dict(x=x, y=y))]
)
# return a tuple of "bar" and the task run state
# to avoid raising the task's exception
return "bar", bad_task_with_a_hook(return_state=True)
_, task_run_state = ok_with_failure_flow()
assert data == {"x": "foo", "y": 42, "state": task_run_state}