docs/v3/api-ref/python/prefect-server-api-flow_runs.mdx
prefect.server.api.flow_runsRoutes for interacting with flow run objects.
create_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L69" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_flow_run(flow_run: schemas.actions.FlowRunCreate, db: PrefectDBInterface = Depends(provide_database_interface), response: Response = None, created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), worker_lookups: WorkerLookups = Depends(WorkerLookups)) -> schemas.responses.FlowRunResponse
Create a flow run. If a flow run with the same flow_id and idempotency key already exists, the existing flow run will be returned.
If no state is provided, the flow run will be created in a PENDING state.
For more information, see https://docs.prefect.io/v3/concepts/flows.
update_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L135" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_flow_run(flow_run: schemas.actions.FlowRunUpdate, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Updates a flow run.
count_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L189" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_flow_runs(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> int
Query for flow runs.
average_flow_run_lateness <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L214" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>average_flow_run_lateness(flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> Optional[float]
Query for average flow-run lateness in seconds.
flow_run_history <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L275" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>flow_run_history(history_start: DateTime = Body(..., description="The history's start time."), history_end: DateTime = Body(..., description="The history's end time."), history_interval_seconds: float = Body(..., description='The size of each history interval, in seconds. Must be at least 1 second.', json_schema_extra={'format': 'time-delta'}), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.HistoryResponse]
Query for flow run history data across a given range and interval.
read_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L323" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_flow_run(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> schemas.responses.FlowRunResponse
Get a flow run by id.
read_flow_run_graph_v1 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L342" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_flow_run_graph_v1(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[DependencyResult]
Get a task run dependency map for a given flow run.
read_flow_run_graph_v2 <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L356" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_flow_run_graph_v2(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), since: datetime.datetime = Query(default=jsonable_encoder(earliest_possible_datetime()), description='Only include runs that start or end after this time.'), db: PrefectDBInterface = Depends(provide_database_interface)) -> Graph
Get a graph of the tasks and subflow runs for the given flow run
resume_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L382" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>resume_flow_run(response: Response, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface), run_input: Optional[dict[str, Any]] = Body(default=None, embed=True), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), task_policy: type[TaskRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_task_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> OrchestrationResult
Resume a paused flow run.
read_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L535" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), limit: int = dependencies.LimitBody(), offset: int = Body(0, ge=0), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.responses.FlowRunResponse]
Query for flow runs.
delete_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L581" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_flow_run(docket: dependencies.Docket, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a flow run by id.
delete_flow_run_logs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L603" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_flow_run_logs() -> None
bulk_delete_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L622" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>bulk_delete_flow_runs(docket: dependencies.Docket, flow_runs: Optional[schemas.filters.FlowRunFilter] = Body(None, description='Filter criteria for flow runs to delete'), limit: int = Body(BULK_OPERATION_LIMIT, ge=1, le=BULK_OPERATION_LIMIT, description=f'Maximum number of flow runs to delete. Defaults to {BULK_OPERATION_LIMIT}.'), db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunBulkDeleteResponse
Bulk delete flow runs matching the specified filter criteria.
Returns the IDs of flow runs that were deleted.
bulk_set_flow_run_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L670" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>bulk_set_flow_run_state(flow_runs: Optional[schemas.filters.FlowRunFilter] = Body(None, description='Filter criteria for flow runs to update'), state: schemas.actions.StateCreate = Body(..., description='The state to set'), force: bool = Body(False, description='If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.'), limit: int = Body(BULK_OPERATION_LIMIT, ge=1, le=BULK_OPERATION_LIMIT, description=f'Maximum number of flow runs to update. Defaults to {BULK_OPERATION_LIMIT}.'), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> FlowRunBulkSetStateResponse
Bulk set state for flow runs matching the specified filter criteria.
Returns the orchestration results for each flow run.
set_flow_run_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L755" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>set_flow_run_state(response: Response, flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), state: schemas.actions.StateCreate = Body(..., description='The intended state.'), force: bool = Body(False, description='If false, orchestration rules will be applied that may alter or prevent the state transition. If True, orchestration rules are not applied.'), db: PrefectDBInterface = Depends(provide_database_interface), flow_policy: type[FlowRunOrchestrationPolicy] = Depends(orchestration_dependencies.provide_flow_policy), orchestration_parameters: Dict[str, Any] = Depends(orchestration_dependencies.provide_flow_orchestration_parameters), api_version: str = Depends(dependencies.provide_request_api_version), client_version: Optional[str] = Depends(dependencies.get_prefect_client_version)) -> OrchestrationResult
Set a flow run state, invoking any orchestration rules.
create_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L808" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Body(..., description='The input key'), value: bytes = Body(..., description='The value of the input'), sender: Optional[str] = Body(None, description='The sender of the input'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Create a key/value input for a flow run.
filter_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L844" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>filter_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), prefix: str = Body(..., description='The input key prefix', embed=True), limit: int = Body(1, description='The maximum number of results to return', embed=True), exclude_keys: List[str] = Body([], description='Exclude inputs with these keys', embed=True), db: PrefectDBInterface = Depends(provide_database_interface)) -> List[schemas.core.FlowRunInput]
Filter flow run inputs by key prefix
read_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L869" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Path(..., description='The input key', alias='key'), db: PrefectDBInterface = Depends(provide_database_interface)) -> PlainTextResponse
Create a value from a flow run input
delete_flow_run_input <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L892" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_flow_run_input(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), key: str = Path(..., description='The input key', alias='key'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Delete a flow run input
paginate_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L914" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>paginate_flow_runs(sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), limit: int = dependencies.LimitBody(), page: int = Body(1, ge=1), flows: Optional[schemas.filters.FlowFilter] = None, flow_runs: Optional[schemas.filters.FlowRunFilter] = None, task_runs: Optional[schemas.filters.TaskRunFilter] = None, deployments: Optional[schemas.filters.DeploymentFilter] = None, work_pools: Optional[schemas.filters.WorkPoolFilter] = None, work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface)) -> FlowRunPaginationResponse
Pagination query for flow runs.
download_logs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L987" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>download_logs(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), db: PrefectDBInterface = Depends(provide_database_interface)) -> StreamingResponse
Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.
update_flow_run_labels <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/api/flow_runs.py#L1053" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_flow_run_labels(flow_run_id: UUID = Path(..., description='The flow run id', alias='id'), labels: Dict[str, Any] = Body(..., description='The labels to update'), db: PrefectDBInterface = Depends(provide_database_interface)) -> None
Update the labels of a flow run.