docs/v3/api-ref/python/prefect-server-models-work_queues.mdx
prefect.server.models.work_queuesFunctions for interacting with work queue ORM objects. Intended for internal use by the Prefect REST API.
create_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L54" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue: Union[schemas.core.WorkQueue, schemas.actions.WorkQueueCreate]) -> orm_models.WorkQueue
Inserts a WorkQueue.
If a WorkQueue with the same name exists, an error will be thrown.
Args:
session: a database sessionwork_queue: a WorkQueue modelReturns:
read_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L140" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by id.
Args:
session: A database sessionwork_queue_id: a WorkQueue idReturns:
count_work_queue_active_slots <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L197" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_queue_active_slots(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> int
Count flow runs occupying concurrency slots for a given work queue.
For standard queues (including pool-backed and default-agent queues), counts Pending/Running/Cancelling flow runs by work_queue_id FK.
For legacy tag-based queues, counts Pending/Running flow runs matching the queue's tag/deployment filter (matching _legacy_get_runs_in_work_queue).
count_work_queue_active_slots_bulk <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_queue_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_queue_ids: Sequence[UUID]) -> dict[UUID, int]
Count active slots for multiple work queues. Standard queues are counted in a single bulk GROUP BY query; legacy tag-based queues fall back to per-queue counting since each has its own filter criteria.
read_work_queue_by_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L288" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, name: str) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by id.
Args:
session: A database sessionwork_queue_id: a WorkQueue idReturns:
read_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L316" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queues(db: PrefectDBInterface, session: AsyncSession, offset: Optional[int] = None, limit: Optional[int] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> Sequence[orm_models.WorkQueue]
Read WorkQueues.
Args:
session: A database sessionoffset: Query offsetlimit: Query limitwork_queue_filter: only select work queues matching these filtersReturns: Sequence[orm_models.WorkQueue]: WorkQueues
is_last_polled_recent <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L348" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>is_last_polled_recent(last_polled: Optional[DateTime]) -> bool
update_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None) -> bool
Update a WorkQueue by id.
Args:
session: A database sessionwork_queue: the work queue datawork_queue_id: a WorkQueue idReturns:
delete_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L466" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> bool
Delete a WorkQueue by id.
Args:
session: A database sessionwork_queue_id: a WorkQueue idReturns:
get_runs_in_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L487" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_runs_in_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, limit: Optional[int] = None, scheduled_before: Optional[datetime.datetime] = None) -> Tuple[orm_models.WorkQueue, Sequence[orm_models.FlowRun]]
Get runs from a work queue.
Args:
session: A database session. work_queue_id: The work queue id.scheduled_before: Only return runs scheduled to start before this time.limit: An optional limit for the number of runs to return from the
queue. This limit applies to the request only. It does not affect
the work queue's concurrency limit. If limit exceeds the work
queue's concurrency limit, it will be ignored.ensure_work_queue_exists <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L602" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>ensure_work_queue_exists(session: AsyncSession, name: str) -> orm_models.WorkQueue
Checks if a work queue exists and creates it if it does not.
Useful when working with deployments, agents, and flow runs that automatically create work queues.
Will also create a work pool queue in the default agent pool to facilitate migration to work pools.
read_work_queue_status <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L642" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queue_status(session: AsyncSession, work_queue_id: UUID) -> schemas.core.WorkQueueStatusDetail
Get work queue status by id.
Args:
session: A database sessionwork_queue_id: a WorkQueue idReturns:
record_work_queue_polls <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L689" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>record_work_queue_polls(db: PrefectDBInterface, session: AsyncSession, polled_work_queue_ids: Sequence[UUID], ready_work_queue_ids: Sequence[UUID]) -> None
Record that the given work queues were polled, and also update the given ready_work_queue_ids to READY.
mark_work_queues_ready <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L714" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>mark_work_queues_ready() -> None
mark_work_queues_not_ready <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L754" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>mark_work_queues_not_ready(db: PrefectDBInterface, work_queue_ids: Iterable[UUID]) -> None
emit_work_queue_status_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L792" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>emit_work_queue_status_event(db: PrefectDBInterface, work_queue: orm_models.WorkQueue) -> None
Emit an event when work queue fields are updated.
emit_work_queue_updated_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>emit_work_queue_updated_event(session: AsyncSession, work_queue: orm_models.WorkQueue, changed_fields: Dict[str, Dict[str, Any]]) -> None