Back to Prefect

flow_runs

docs/v3/api-ref/python/prefect-server-api-flow_runs.mdx

3.6.30.dev315.4 KB
Original Source

prefect.server.api.flow_runs

Routes for interacting with flow run objects.

Functions

create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L69" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_flow_run(flow_run: schemas.actions.FlowRunCreate, db: PrefectDBInterface = Depends(provide_database_interface), response: Response = None, created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), worker_lookups: WorkerLookups = Depends(WorkerLookups)) -> schemas.responses.FlowRunResponse

Create a flow run. If a flow run with the same flow_id and idempotency key already exists, the existing flow run will be returned.

If no state is provided, the flow run will be created in a PENDING state.

For more information, see https://docs.prefect.io/v3/concepts/flows.

update_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L135" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_flow_run(flow_run: schemas.actions.FlowRunUpdate, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

Updates a flow run.

count_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L189" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
count_flow_runs(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> int

Query for flow runs.

average_flow_run_lateness <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L214" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
average_flow_run_lateness(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> Optional[float]

Query for average flow-run lateness in seconds.

flow_run_history <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L275" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
flow_run_history(history_start: DateTime = Body(..., description="The history's start time."), history_end: DateTime = Body(..., description="The history's end time."), history_interval_seconds: float = Body(..., description='The size of each history interval, in seconds. Must be at least 1 second.', json_schema_extra={'format': 'time-delta'}), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.HistoryResponse]

Query for flow run history data across a given range and interval.

read_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L323" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_run(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.FlowRunResponse

Get a flow run by id.

read_flow_run_graph_v1 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L342" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_run_graph_v1(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[DependencyResult]

Get a task run dependency map for a given flow run.

read_flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L356" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_run_graph_v2(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), since: datetime.datetime = Query(default=jsonable_encoder(earliest_possible_datetime()), description='Only include runs that start or end after this time.'), db: PrefectDBInterface = Depends(provide_database_interface)) -> Graph

Get a graph of the tasks and subflow runs for the given flow run

resume_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L382" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
resume_flow_run(response: Response, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface), run_input: Optional[dict[str, Any]] = Body(default=None, embed=True), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), task_policy: type[TaskRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_task_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> OrchestrationResult

Resume a paused flow run.

read_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L535" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), limit: int = dependencies.LimitBody(), offset: int = Body(0, ge=0), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.FlowRunResponse]

Query for flow runs.

delete_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L581" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_flow_run(docket: dependencies.Docket, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

Delete a flow run by id.

delete_flow_run_logs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L603" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_flow_run_logs() -> None

bulk_delete_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L622" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
bulk_delete_flow_runs(docket: dependencies.Docket, flow_runs: Optional[schemas.filters.FlowRunFilter] = Body(None, description='Filter criteria for flow runs to delete'), limit: int = Body(BULK_OPERATION_LIMIT, ge=1, le=BULK_OPERATION_LIMIT, description=f'Maximum number of flow runs to delete. Defaults to {BULK_OPERATION_LIMIT}.'), db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunBulkDeleteResponse

Bulk delete flow runs matching the specified filter criteria.

Returns the IDs of flow runs that were deleted.

bulk_set_flow_run_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L670" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
bulk_set_flow_run_state(flow_runs: Optional[schemas.filters.FlowRunFilter] = Body(None, description='Filter criteria for flow runs to update'), state: schemas.actions.StateCreate = Body(..., description='The state to set'), force: bool = Body(False, description='If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.'), limit: int = Body(BULK_OPERATION_LIMIT, ge=1, le=BULK_OPERATION_LIMIT, description=f'Maximum number of flow runs to update. Defaults to {BULK_OPERATION_LIMIT}.'), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> FlowRunBulkSetStateResponse

Bulk set state for flow runs matching the specified filter criteria.

Returns the orchestration results for each flow run.

set_flow_run_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L755" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_flow_run_state(response: Response, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), state: schemas.actions.StateCreate = Body(..., description='The intended state.'), force: bool = Body(False, description='If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.'), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> OrchestrationResult

Set a flow run state, invoking any orchestration rules.

create_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L808" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Body(..., description='The input key'), value: bytes = Body(..., description='The value of the input'), sender: Optional[str] = Body(None, description='The sender of the input'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

Create a key/value input for a flow run.

filter_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L844" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
filter_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), prefix: str = Body(..., description='The input key prefix', embed=True), limit: int = Body(1, description='The maximum number of results to return', embed=True), exclude_keys: List[str] = Body([], description='Exclude inputs with these keys', embed=True), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.FlowRunInput]

Filter flow run inputs by key prefix

read_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L869" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Path(..., description='The input key', alias='key'), db: PrefectDBInterface = Depends(provide_database_interface)) -> PlainTextResponse

Create a value from a flow run input

delete_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L892" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Path(..., description='The input key', alias='key'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

Delete a flow run input

paginate_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L914" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
paginate_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), limit: int = dependencies.LimitBody(), page: int = Body(1, ge=1), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunPaginationResponse

Pagination query for flow runs.

download_logs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L987" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
download_logs(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> StreamingResponse

Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.

update_flow_run_labels <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L1053" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_flow_run_labels(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), labels: Dict[str, Any] = Body(..., description='The labels to update'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None

Update the labels of a flow run.