docs/v3/api-ref/python/prefect-flow_engine.mdx
prefect.flow_engineload_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L217" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_flow_run(flow_run_id: UUID) -> FlowRun
load_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L223" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_flow(flow_run: FlowRun) -> Flow[..., Any]
load_flow_and_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L239" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]
send_heartbeats_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L326" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>send_heartbeats_sync(engine: 'FlowRunEngine[Any, Any]') -> Generator[None, None, None]
send_heartbeats_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L338" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>send_heartbeats_async(engine: 'AsyncFlowRunEngine[Any, Any]') -> AsyncGenerator[None, None]
run_flow_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1811" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
run_flow_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1835" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
run_generator_flow_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1859" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_generator_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Generator[R, None, None]
run_generator_flow_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1900" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_generator_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> AsyncGenerator[R, None]
run_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1943" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_flow(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', error_logger: Optional[logging.Logger] = None, context: Optional[dict[str, Any]] = None) -> R | State | None | Coroutine[Any, Any, R | State | None] | Generator[R, None, None] | AsyncGenerator[R, None]
run_flow_in_subprocess <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L2032" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_flow_in_subprocess(flow: 'Flow[..., Any]', flow_run: 'FlowRun | None' = None, parameters: dict[str, Any] | None = None, wait_for: Iterable[PrefectFuture[Any]] | None = None, context: dict[str, Any] | None = None, env: dict[str, str | None] | None = None) -> multiprocessing.context.SpawnProcess
Run a flow in a subprocess.
Note the result of the flow will only be accessible if the flow is configured to persist its result.
Args:
flow: The flow to run.flow_run: The flow run object containing run metadata.parameters: The parameters to use when invoking the flow.wait_for: The futures to wait for before starting the flow.context: A serialized context to hydrate before running the flow. If not provided,
the current context will be used. A serialized context should be provided if
this function is called in a separate memory space from the parent run (e.g.
in a subprocess or on another machine).env: Additional environment variables to set in the subprocess.Returns:
FlowRunTimeoutError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L213" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when a flow run exceeds its defined timeout.
BaseFlowRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L346" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
cancel_all_tasks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L393" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cancel_all_tasks(self) -> None
heartbeat_seconds <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L386" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>heartbeat_seconds(self) -> Optional[int]
Get the heartbeat interval from settings.
is_pending <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L380" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>is_pending(self) -> bool
is_running <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L375" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>is_running(self) -> bool
state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L372" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>state(self) -> State
FlowRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L523" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
begin_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L573" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>begin_run(self) -> State
call_flow_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1144" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]
Convenience method to call the flow function. Returns a coroutine if the flow is async.
call_hooks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L856" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>call_hooks(self, state: Optional[State] = None) -> None
client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L529" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>client(self) -> SyncPrefectClient
create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L821" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_flow_run(self, client: SyncPrefectClient) -> FlowRun
handle_cancellation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L751" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_cancellation(self, exc: BaseException) -> None
Force this run through Cancelling -> Cancelled.
Used when capture_sigterm has determined (via the cancellation
listener) that the SIGTERM was the runner asking for cancellation,
not an unrelated termination. Transitioning through Cancelling first
The same ownership rule also governs on_cancellation / on_crashed
hooks: if the engine owns cancellation/crash handling, it must
drive the Cancelling -> Cancelled transitions locally; if an
external runner owns them, the child should avoid duplicating
state history.
handle_crash <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L742" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_crash(self, exc: BaseException) -> None
handle_exception <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L684" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
handle_success <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L656" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_success(self, result: R) -> R
handle_timeout <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L717" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_timeout(self, exc: TimeoutError) -> None
initialize_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1005" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>initialize_run(self)
Enters a client context and creates a flow run if needed.
load_subflow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L772" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_subflow_run(self, parent_task_run: TaskRun, client: SyncPrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
This method attempts to load an existing flow run for a subflow task run, if appropriate.
If the parent task run is in a final but not COMPLETED state, and not being rerun, then we attempt to load an existing flow run instead of creating a new one. This will prevent the engine from running the subflow again.
If no existing flow run is found, or if the subflow should be rerun, then no flow run is returned.
result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L630" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1124" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_context(self)
set_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L612" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state(self, state: State, force: bool = False) -> State
setup_run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L910" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>setup_run_context(self, client: Optional[SyncPrefectClient] = None)
start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1112" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>start(self) -> Generator[None, None, None]
AsyncFlowRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1162" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Async version of the flow run engine.
NOTE: This has not been fully asyncified yet which may lead to async flows not being fully asyncified.
Methods:
begin_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1219" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>begin_run(self) -> State
call_flow_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1799" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>call_flow_fn(self) -> Coroutine[Any, Any, R]
Convenience method to call the flow function. Returns a coroutine if the flow is async.
call_hooks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1495" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>call_hooks(self, state: Optional[State] = None) -> None
client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1175" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>client(self) -> PrefectClient
create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1462" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_flow_run(self, client: PrefectClient) -> FlowRun
handle_cancellation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1396" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_cancellation(self, exc: BaseException) -> None
Force this run through Cancelling -> Cancelled.
Async counterpart to FlowRunEngine.handle_cancellation. Shielded
from asyncio cancellation so engine-owned transitions always reach
the server.
handle_crash <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1383" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_crash(self, exc: BaseException) -> None
handle_exception <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1326" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
handle_success <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_success(self, result: R) -> R
handle_timeout <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1357" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>handle_timeout(self, exc: TimeoutError) -> None
initialize_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1642" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>initialize_run(self)
Enters a client context and creates a flow run if needed.
load_subflow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1413" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_subflow_run(self, parent_task_run: TaskRun, client: PrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
This method attempts to load an existing flow run for a subflow task run, if appropriate.
If the parent task run is in a final but not COMPLETED state, and not being rerun, then we attempt to load an existing flow run instead of creating a new one. This will prevent the engine from running the subflow again.
If no existing flow run is found, or if the subflow should be rerun, then no flow run is returned.
result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1276" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1779" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_context(self)
set_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1258" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_state(self, state: State, force: bool = False) -> State
setup_run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1549" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>setup_run_context(self, client: Optional[PrefectClient] = None)
start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1767" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>start(self) -> AsyncGenerator[None, None]