Back to Prefect

work_queues

docs/v3/api-ref/python/prefect-server-models-work_queues.mdx

3.6.30.dev38.9 KB
Original Source

prefect.server.models.work_queues

Functions for interacting with work queue ORM objects. Intended for internal use by the Prefect REST API.

Functions

create_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L54" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue: Union[schemas.core.WorkQueue, schemas.actions.WorkQueueCreate]) -> orm_models.WorkQueue

Inserts a WorkQueue.

If a WorkQueue with the same name exists, an error will be thrown.

Args:

  • session: a database session
  • work_queue: a WorkQueue model

Returns:

  • orm_models.WorkQueue: the newly-created or updated WorkQueue

read_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L140" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]

Reads a WorkQueue by id.

Args:

  • session: A database session
  • work_queue_id: a WorkQueue id

Returns:

  • orm_models.WorkQueue: the WorkQueue

count_work_queue_active_slots <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L197" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
count_work_queue_active_slots(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> int

Count flow runs occupying concurrency slots for a given work queue.

For standard queues (including pool-backed and default-agent queues), counts Pending/Running/Cancelling flow runs by work_queue_id FK.

For legacy tag-based queues, counts Pending/Running flow runs matching the queue's tag/deployment filter (matching _legacy_get_runs_in_work_queue).

count_work_queue_active_slots_bulk <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
count_work_queue_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_queue_ids: Sequence[UUID]) -> dict[UUID, int]

Count active slots for multiple work queues. Standard queues are counted in a single bulk GROUP BY query; legacy tag-based queues fall back to per-queue counting since each has its own filter criteria.

read_work_queue_by_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L288" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, name: str) -> Optional[orm_models.WorkQueue]

Reads a WorkQueue by id.

Args:

  • session: A database session
  • work_queue_id: a WorkQueue id

Returns:

  • orm_models.WorkQueue: the WorkQueue

read_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L316" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_work_queues(db: PrefectDBInterface, session: AsyncSession, offset: Optional[int] = None, limit: Optional[int] = None, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> Sequence[orm_models.WorkQueue]

Read WorkQueues.

Args:

  • session: A database session
  • offset: Query offset
  • limit: Query limit
  • work_queue_filter: only select work queues matching these filters

Returns: Sequence[orm_models.WorkQueue]: WorkQueues

is_last_polled_recent <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L348" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_last_polled_recent(last_polled: Optional[DateTime]) -> bool

update_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L355" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None) -> bool

Update a WorkQueue by id.

Args:

  • session: A database session
  • work_queue: the work queue data
  • work_queue_id: a WorkQueue id

Returns:

  • whether or not the WorkQueue was updated

delete_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L466" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> bool

Delete a WorkQueue by id.

Args:

  • session: A database session
  • work_queue_id: a WorkQueue id

Returns:

  • whether or not the WorkQueue was deleted

get_runs_in_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L487" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_runs_in_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, limit: Optional[int] = None, scheduled_before: Optional[datetime.datetime] = None) -> Tuple[orm_models.WorkQueue, Sequence[orm_models.FlowRun]]

Get runs from a work queue.

Args:

  • session: A database session. work_queue_id: The work queue id.
  • scheduled_before: Only return runs scheduled to start before this time.
  • limit: An optional limit for the number of runs to return from the queue. This limit applies to the request only. It does not affect the work queue's concurrency limit. If limit exceeds the work queue's concurrency limit, it will be ignored.

ensure_work_queue_exists <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L602" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
ensure_work_queue_exists(session: AsyncSession, name: str) -> orm_models.WorkQueue

Checks if a work queue exists and creates it if it does not.

Useful when working with deployments, agents, and flow runs that automatically create work queues.

Will also create a work pool queue in the default agent pool to facilitate migration to work pools.

read_work_queue_status <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L642" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_work_queue_status(session: AsyncSession, work_queue_id: UUID) -> schemas.core.WorkQueueStatusDetail

Get work queue status by id.

Args:

  • session: A database session
  • work_queue_id: a WorkQueue id

Returns:

  • Information about the status of the work queue.

record_work_queue_polls <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L689" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
record_work_queue_polls(db: PrefectDBInterface, session: AsyncSession, polled_work_queue_ids: Sequence[UUID], ready_work_queue_ids: Sequence[UUID]) -> None

Record that the given work queues were polled, and also update the given ready_work_queue_ids to READY.

mark_work_queues_ready <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L714" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
mark_work_queues_ready() -> None

mark_work_queues_not_ready <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L754" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
mark_work_queues_not_ready(db: PrefectDBInterface, work_queue_ids: Iterable[UUID]) -> None

emit_work_queue_status_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L792" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
emit_work_queue_status_event(db: PrefectDBInterface, work_queue: orm_models.WorkQueue) -> None

Emit an event when work queue fields are updated.

emit_work_queue_updated_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/work_queues.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
emit_work_queue_updated_event(session: AsyncSession, work_queue: orm_models.WorkQueue, changed_fields: Dict[str, Dict[str, Any]]) -> None