docs/v3/api-ref/python/prefect-utilities-engine.mdx
prefect.utilities.engineis_prefect_sigterm_handler_installed <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L74" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>is_prefect_sigterm_handler_installed() -> bool
Return whether Prefect's SIGTERM bridge is currently installed.
can_ack_control_intent <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L84" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>can_ack_control_intent() -> bool
Return whether the child can safely acknowledge a queued control intent.
The check is protected by the same lock used when capture_sigterm()
installs and restores Prefect's SIGTERM bridge. On POSIX, the runner's
subsequent real SIGTERM is the actual cancellation trigger, so the
child only needs to verify that Prefect still owns the live bridge before
advertising readiness with b"a".
commit_control_intent_and_ack <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L100" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>commit_control_intent_and_ack(commit_intent: Callable[[], None], clear_intent: Callable[[], None], send_ack: Callable[[], None], trigger_cancel: Callable[[], None] | None = None) -> bool
Atomically commit control intent and acknowledge it to the runner.
The SIGTERM bridge check, intent commit, and ack write must share the same
lock used by capture_sigterm() to install and restore Prefect's SIGTERM
handler. Otherwise, teardown can restore the original handler after the
child decides it is safe to ack but before the runner observes b"a".
collect_task_run_inputs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L139" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>collect_task_run_inputs(expr: Any, max_depth: int = -1) -> set[Union[TaskRunResult, FlowRunResult]]
This function recurses through an expression to generate a set of any discernible task run inputs it finds in the data structure. It produces a set of all inputs found.
Examples:
```python
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
}
```
collect_task_run_inputs_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L190" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>collect_task_run_inputs_sync(expr: Any, future_cls: Any = PrefectFuture, max_depth: int = -1) -> set[Union[TaskRunResult, FlowRunResult]]
This function recurses through an expression to generate a set of any discernible task run inputs it finds in the data structure. It produces a set of all inputs found.
Examples:
task_inputs = {
k: collect_task_run_inputs_sync(v) for k, v in parameters.items()
}
capture_sigterm <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L245" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>capture_sigterm() -> Generator[None, Any, None]
Install a SIGTERM handler that raises TerminationSignal.
Only the outermost Prefect flow engine in a process installs the handler
by default. Nested subflow engines reuse that existing Prefect-owned
handler when it is still active; if user or library code temporarily
replaced SIGTERM, the nested scope reinstalls Prefect's bridge for the
duration of that scope. This guard is based on explicit local ownership
state plus the currently installed handler, not on FlowRunContext: a
fresh subprocess may hydrate a parent flow context before its own engine
starts, and still needs to install a SIGTERM bridge for the child process.
The handler does not need to interpret intent. The engine's
except TerminationSignal block consults
prefect._internal.control_listener.get_intent() directly when
dispatching (today: handle_cancellation vs handle_crash; in a
future PR: plus handle_suspension).
The runner control listener only connects while this context is active. Cancels that land before the bridge is armed fall back to the runner's existing crash-style termination path; once this context is active, the child can acknowledge control intent and treat the later SIGTERM as an intentional cancellation.
resolve_inputs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L356" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>resolve_inputs(parameters: dict[str, Any], return_data: bool = True, max_depth: int = -1) -> dict[str, Any]
Resolve any Quote, PrefectFuture, or State types nested in parameters into
data.
Returns:
Raises:
UpstreamTaskError: If any of the upstream states are not COMPLETEDpropose_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L480" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>propose_state(client: 'PrefectClient', state: State[Any], flow_run_id: UUID, force: bool = False) -> State[Any]
Propose a new state for a flow run, invoking Prefect orchestration logic.
If the proposed state is accepted, the provided state will be augmented with
details and returned.
If the proposed state is rejected, a new state returned by the Prefect API will be returned.
If the proposed state results in a WAIT instruction from the Prefect API, the function will sleep and attempt to propose the state again.
If the proposed state results in an ABORT instruction from the Prefect API, an error will be raised.
Args:
state: a new state for a flow runflow_run_id: an optional flow run id, used when proposing flow run statesReturns:
Raises:
prefect.exceptions.Abort: if an ABORT instruction is received from
the Prefect APIpropose_state_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L578" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>propose_state_sync(client: 'SyncPrefectClient', state: State[Any], flow_run_id: UUID, force: bool = False) -> State[Any]
Propose a new state for a flow run, invoking Prefect orchestration logic.
If the proposed state is accepted, the provided state will be augmented with
details and returned.
If the proposed state is rejected, a new state returned by the Prefect API will be returned.
If the proposed state results in a WAIT instruction from the Prefect API, the function will sleep and attempt to propose the state again.
If the proposed state results in an ABORT instruction from the Prefect API, an error will be raised.
Args:
state: a new state for the flow runflow_run_id: an optional flow run id, used when proposing flow run statesReturns:
Raises:
ValueError: if flow_run_id is not providedprefect.exceptions.Abort: if an ABORT instruction is received from
the Prefect APIget_state_for_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L672" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_state_for_result(obj: Any) -> Optional[tuple[State, RunType]]
Get the state related to a result object.
link_state_to_result must have been called first.
For objects that support __weakref__, the entry stored by
link_state_to_result carries a weak reference back to the original
object. We verify here that the entry's weak reference still points
to the same object that registered the entry — not just to some
object that happens to share its id(). This prevents stale hits
caused by CPython recycling a freed memory address. Stale entries
are evicted on detection.
For objects that do not support __weakref__ (plain dict, list,
set, str, int, tuple, ...), the entry has no weak reference
and we fall back to the legacy id()-only lookup. This preserves
today's behavior for those types — including the latent stale-id
bug — and isolates the limitation to a single named code path.
link_state_to_flow_run_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L715" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>link_state_to_flow_run_result(state: State, result: Any) -> None
Creates a link between a state and flow run result
link_state_to_task_run_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L720" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>link_state_to_task_run_result(state: State, result: Any) -> None
Creates a link between a state and task run result
link_state_to_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L725" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>link_state_to_result(state: State, result: Any, run_type: RunType) -> None
Caches a link between a state and a result and its components using
the id of the components to map to the state. The cache is persisted to the
current flow run context since task relationships are limited to within a flow run.
This allows dependency tracking to occur when results are passed around.
Note: Because id is used, we cannot cache links between singleton objects.
We only cache the relationship between components 1-layer deep. Example: Given the result [1, ["a","b"], ("c",)], the following elements will be mapped to the state: - [1, ["a","b"], ("c",)] - ["a","b"] - ("c",)
Note: the int `1` will not be mapped to the state because it is a singleton.
Other Notes: We do not hash the result because:
We do not set an attribute, e.g. __prefect_state__, on the result because:
should_log_prints <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L815" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>should_log_prints(flow_or_task: Union['Flow[..., Any]', 'Task[..., Any]']) -> bool
check_api_reachable <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L827" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>check_api_reachable(client: 'PrefectClient', fail_message: str) -> None
emit_task_run_state_change_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L845" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>emit_task_run_state_change_event(task_run: TaskRun, initial_state: Optional[State[Any]], validated_state: State[Any], follows: Optional[Event] = None) -> Optional[Event]
resolve_to_final_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L943" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>resolve_to_final_result(expr: Any, context: dict[str, Any]) -> Any
Resolve any PrefectFuture, or State types nested in parameters into
data. Designed to be use with visit_collection.
resolve_inputs_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine/__init__.py#L1021" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>resolve_inputs_sync(parameters: dict[str, Any], return_data: bool = True, max_depth: int = -1) -> dict[str, Any]
Resolve any Quote, PrefectFuture, or State types nested in parameters into
data.
Returns:
Raises:
UpstreamTaskError: If any of the upstream states are not COMPLETED