Back to Prefect

flow_engine

docs/v3/api-ref/python/prefect-flow_engine.mdx

3.6.30.dev318.4 KB
Original Source

prefect.flow_engine

Functions

load_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L217" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_flow_run(flow_run_id: UUID) -> FlowRun

load_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L223" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_flow(flow_run: FlowRun) -> Flow[..., Any]

load_flow_and_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L239" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]

send_heartbeats_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L326" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
send_heartbeats_sync(engine: 'FlowRunEngine[Any, Any]') -> Generator[None, None, None]

send_heartbeats_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L338" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
send_heartbeats_async(engine: 'AsyncFlowRunEngine[Any, Any]') -> AsyncGenerator[None, None]

run_flow_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1811" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]

run_flow_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1835" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]

run_generator_flow_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1859" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_generator_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Generator[R, None, None]

run_generator_flow_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1900" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_generator_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> AsyncGenerator[R, None]

run_flow <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1943" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_flow(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', error_logger: Optional[logging.Logger] = None, context: Optional[dict[str, Any]] = None) -> R | State | None | Coroutine[Any, Any, R | State | None] | Generator[R, None, None] | AsyncGenerator[R, None]

run_flow_in_subprocess <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L2032" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_flow_in_subprocess(flow: 'Flow[..., Any]', flow_run: 'FlowRun | None' = None, parameters: dict[str, Any] | None = None, wait_for: Iterable[PrefectFuture[Any]] | None = None, context: dict[str, Any] | None = None, env: dict[str, str | None] | None = None) -> multiprocessing.context.SpawnProcess

Run a flow in a subprocess.

Note the result of the flow will only be accessible if the flow is configured to persist its result.

Args:

  • flow: The flow to run.
  • flow_run: The flow run object containing run metadata.
  • parameters: The parameters to use when invoking the flow.
  • wait_for: The futures to wait for before starting the flow.
  • context: A serialized context to hydrate before running the flow. If not provided, the current context will be used. A serialized context should be provided if this function is called in a separate memory space from the parent run (e.g. in a subprocess or on another machine).
  • env: Additional environment variables to set in the subprocess.

Returns:

  • A multiprocessing.context.SpawnProcess representing the process that is running the flow.

Classes

FlowRunTimeoutError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L213" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when a flow run exceeds its defined timeout.

BaseFlowRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L346" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

cancel_all_tasks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L393" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cancel_all_tasks(self) -> None

heartbeat_seconds <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L386" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
heartbeat_seconds(self) -> Optional[int]

Get the heartbeat interval from settings.

is_pending <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L380" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_pending(self) -> bool

is_running <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L375" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_running(self) -> bool

state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L372" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
state(self) -> State

FlowRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L523" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

begin_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L573" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
begin_run(self) -> State

call_flow_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1144" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]

Convenience method to call the flow function. Returns a coroutine if the flow is async.

call_hooks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L856" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_hooks(self, state: Optional[State] = None) -> None

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L529" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> SyncPrefectClient

create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L821" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_flow_run(self, client: SyncPrefectClient) -> FlowRun

handle_cancellation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L751" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_cancellation(self, exc: BaseException) -> None

Force this run through Cancelling -> Cancelled.

Used when capture_sigterm has determined (via the cancellation listener) that the SIGTERM was the runner asking for cancellation, not an unrelated termination. Transitioning through Cancelling first The same ownership rule also governs on_cancellation / on_crashed hooks: if the engine owns cancellation/crash handling, it must drive the Cancelling -> Cancelled transitions locally; if an external runner owns them, the child should avoid duplicating state history.

handle_crash <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L742" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_crash(self, exc: BaseException) -> None

handle_exception <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L684" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State

handle_success <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L656" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_success(self, result: R) -> R

handle_timeout <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L717" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_timeout(self, exc: TimeoutError) -> None

initialize_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1005" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
initialize_run(self)

Enters a client context and creates a flow run if needed.

load_subflow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L772" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_subflow_run(self, parent_task_run: TaskRun, client: SyncPrefectClient, context: FlowRunContext) -> Union[FlowRun, None]

This method attempts to load an existing flow run for a subflow task run, if appropriate.

If the parent task run is in a final but not COMPLETED state, and not being rerun, then we attempt to load an existing flow run instead of creating a new one. This will prevent the engine from running the subflow again.

If no existing flow run is found, or if the subflow should be rerun, then no flow run is returned.

result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L630" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'

run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1124" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_context(self)

set_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L612" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state(self, state: State, force: bool = False) -> State

setup_run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L910" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_run_context(self, client: Optional[SyncPrefectClient] = None)

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1112" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self) -> Generator[None, None, None]

AsyncFlowRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1162" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Async version of the flow run engine.

NOTE: This has not been fully asyncified yet which may lead to async flows not being fully asyncified.

Methods:

begin_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1219" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
begin_run(self) -> State

call_flow_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1799" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_flow_fn(self) -> Coroutine[Any, Any, R]

Convenience method to call the flow function. Returns a coroutine if the flow is async.

call_hooks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1495" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_hooks(self, state: Optional[State] = None) -> None

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1175" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> PrefectClient

create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1462" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_flow_run(self, client: PrefectClient) -> FlowRun

handle_cancellation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1396" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_cancellation(self, exc: BaseException) -> None

Force this run through Cancelling -> Cancelled.

Async counterpart to FlowRunEngine.handle_cancellation. Shielded from asyncio cancellation so engine-owned transitions always reach the server.

handle_crash <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1383" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_crash(self, exc: BaseException) -> None

handle_exception <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1326" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State

handle_success <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_success(self, result: R) -> R

handle_timeout <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1357" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_timeout(self, exc: TimeoutError) -> None

initialize_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1642" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
initialize_run(self)

Enters a client context and creates a flow run if needed.

load_subflow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1413" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_subflow_run(self, parent_task_run: TaskRun, client: PrefectClient, context: FlowRunContext) -> Union[FlowRun, None]

This method attempts to load an existing flow run for a subflow task run, if appropriate.

If the parent task run is in a final but not COMPLETED state, and not being rerun, then we attempt to load an existing flow run instead of creating a new one. This will prevent the engine from running the subflow again.

If no existing flow run is found, or if the subflow should be rerun, then no flow run is returned.

result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1276" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'

run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1779" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_context(self)

set_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1258" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state(self, state: State, force: bool = False) -> State

setup_run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1549" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_run_context(self, client: Optional[PrefectClient] = None)

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1767" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self) -> AsyncGenerator[None, None]