Back to Prefect

flow_runs

docs/v3/api-ref/python/prefect-server-models-flow_runs.mdx

3.6.30.dev310.3 KB
Original Source

prefect.server.models.flow_runs

Functions for interacting with flow run ORM objects. Intended for internal use by the Prefect REST API.

Functions

create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run: schemas.core.FlowRun, orchestration_parameters: Optional[dict[str, Any]] = None) -> orm_models.FlowRun

Creates a new flow run.

If the provided flow run has a state attached, it will also be created.

Args:

  • session: a database session
  • flow_run: a flow run model

Returns:

  • orm_models.FlowRun: the newly-created flow run

update_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L149" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, flow_run: schemas.actions.FlowRunUpdate) -> bool

Updates a flow run.

Args:

  • session: a database session
  • flow_run_id: the flow run id to update
  • flow_run: a flow run model

Returns:

  • whether or not matching rows were found to update

read_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L178" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, for_update: bool = False) -> Optional[orm_models.FlowRun]

Reads a flow run by id.

Args:

  • session: A database session
  • flow_run_id: a flow run id

Returns:

  • orm_models.FlowRun: the flow run

read_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L282" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_runs(db: PrefectDBInterface, session: AsyncSession, columns: Optional[list[str]] = None, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None, sort: schemas.sorting.FlowRunSort = schemas.sorting.FlowRunSort.ID_DESC) -> Sequence[orm_models.FlowRun]

Read flow runs.

Args:

  • session: a database session
  • columns: a list of the flow run ORM columns to load, for performance
  • flow_filter: only select flow runs whose flows match these filters
  • flow_run_filter: only select flow runs match these filters
  • task_run_filter: only select flow runs whose task runs match these filters
  • deployment_filter: only select flow runs whose deployments match these filters
  • offset: Query offset
  • limit: Query limit
  • sort: Query sort

Returns:

  • List[orm_models.FlowRun]: flow runs

cleanup_flow_run_concurrency_slots <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L345" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cleanup_flow_run_concurrency_slots(session: AsyncSession, flow_run: orm_models.FlowRun) -> None

Cleanup flow run related resources, such as releasing concurrency slots. All operations should be idempotent and safe to call multiple times. IMPORTANT: This run may no longer exist in the database when this operation occurs.

read_task_run_dependencies <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L387" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_task_run_dependencies(session: AsyncSession, flow_run_id: UUID) -> List[DependencyResult]

Get a task run dependency map for a given flow run.

count_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L435" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
count_flow_runs(db: PrefectDBInterface, session: AsyncSession, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> int

Count flow runs.

Args:

  • session: a database session
  • flow_filter: only count flow runs whose flows match these filters
  • flow_run_filter: only count flow runs that match these filters
  • task_run_filter: only count flow runs whose task runs match these filters
  • deployment_filter: only count flow runs whose deployments match these filters
  • work_pool_filter: only count flow runs whose work pool matches these filters
  • work_queue_filter: only count flow runs whose work queue matches these filters

Returns:

  • count of flow runs

delete_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L517" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_flow_run(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID) -> bool

Delete a flow run by flow_run_id, handling concurrency limits if applicable.

Args:

  • session: A database session
  • flow_run_id: a flow run id

Returns:

  • whether or not the flow run was deleted

delete_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L548" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_flow_runs(db: PrefectDBInterface, session: AsyncSession, flow_run_ids: List[UUID]) -> List[UUID]

Delete multiple flow runs by their IDs, handling concurrency limits.

Args:

  • session: A database session
  • flow_run_ids: a list of flow run ids to delete

Returns:

  • List[UUID]: the IDs of the flow runs that were deleted

set_flow_run_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L589" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_flow_run_state(session: AsyncSession, flow_run_id: UUID, state: schemas.states.State, force: bool = False, flow_policy: Optional[Type[FlowRunOrchestrationPolicy]] = None, orchestration_parameters: Optional[Dict[str, Any]] = None, client_version: Optional[str] = None) -> OrchestrationResult

Creates a new orchestrated flow run state.

Setting a new state on a run is the one of the principal actions that is governed by Prefect's orchestration logic. Setting a new run state will not guarantee creation, but instead trigger orchestration rules to govern the proposed state input. If the state is considered valid, it will be written to the database. Otherwise, a it's possible a different state, or no state, will be created. A force flag is supplied to bypass a subset of orchestration logic.

Args:

  • session: a database session
  • flow_run_id: the flow run id
  • state: a flow run state model
  • force: if False, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.

Returns:

  • OrchestrationResult object

read_flow_run_graph <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L679" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_run_graph(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, since: datetime.datetime = earliest_possible_datetime()) -> Graph

Given a flow run, return the graph of it's task and subflow runs. If a since datetime is provided, only return items that may have changed since that time.

with_system_labels_for_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L699" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
with_system_labels_for_flow_run(session: AsyncSession, flow_run: Union[schemas.core.FlowRun, schemas.actions.FlowRunCreate]) -> schemas.core.KeyValueLabels

Augment user supplied labels with system default labels for a flow run.

update_flow_run_labels <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L736" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_flow_run_labels(db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID, labels: KeyValueLabels) -> bool

Update flow run labels by patching existing labels with new values. Args: session: A database session flow_run_id: the flow run id to update labels: the new labels to patch into existing labels Returns: bool: whether the update was successful

Classes

DependencyResult <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/flow_runs.py#L374" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

model_validate_list <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/_internal/schemas/bases.py#L56" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
model_validate_list(cls, obj: Any) -> list[Self]

reset_fields <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/_internal/schemas/bases.py#L85" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reset_fields(self: Self) -> Self

Reset the fields of the model that are in the _reset_fields set.

Returns:

  • A new instance of the model with the reset fields.