docs/v3/api-ref/python/prefect-server-models-workers.mdx
prefect.server.models.workersFunctions for interacting with worker ORM objects. Intended for internal use by the Prefect REST API.
create_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L48" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool: Union[schemas.core.WorkPool, schemas.actions.WorkPoolCreate]) -> orm_models.WorkPool
Creates a work pool.
If a WorkPool with the same name exists, an error will be thrown.
Args:
session: a database sessionwork_pool: a WorkPool modelReturns:
read_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L93" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> Optional[orm_models.WorkPool]
Reads a WorkPool by id.
Args:
session: A database sessionwork_pool_id: a WorkPool idReturns:
read_work_pool_by_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L112" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_pool_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str) -> Optional[orm_models.WorkPool]
Reads a WorkPool by name.
Args:
session: A database sessionwork_pool_name: a WorkPool nameReturns:
read_work_pools <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L131" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkPool]
Read worker configs.
Args:
session: A database sessionoffset: Query offsetlimit: Query limitReturns: List[orm_models.WorkPool]: worker configs
count_work_pools <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L163" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None) -> int
Read worker configs.
Args:
session: A database sessionwork_pool_filter: filter criteria to apply to the countReturns: int: the count of work pools matching the criteria
count_work_pool_active_slots <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L196" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_pool_active_slots(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> int
Count flow runs in slot-occupying states (Pending, Running) for a given work pool. Does not filter on queue pause status — paused queues may still have running/pending runs consuming resources. This matches the behavior of count_work_pool_slot_holders / get_work_pool_slot_holders.
count_work_pool_active_slots_bulk <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L221" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_pool_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Sequence[UUID]) -> dict[UUID, int]
Count active slots for multiple work pools in a single query. Returns a mapping of work_pool_id -> active slot count. Does not filter on queue pause status (see count_work_pool_active_slots).
count_work_queue_active_slots <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L252" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_queue_active_slots(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> int
Count flow runs in slot-occupying states (Pending, Running) for a given work queue under a work pool. Counts by work_queue_id FK only.
count_work_queue_active_slots_bulk <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L274" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_queue_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_queue_ids: Sequence[UUID]) -> dict[UUID, int]
Count active slots for multiple work queues in a single query. Returns a mapping of work_queue_id -> active slot count.
update_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L302" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_pool: schemas.actions.WorkPoolUpdate, emit_status_change: Optional[Callable[[UUID, DateTime, orm_models.WorkPool, orm_models.WorkPool], Awaitable[None]]] = None) -> bool
Update a WorkPool by id.
Args:
session: A database sessionwork_pool_id: a WorkPool idworker: the work queue dataemit_status_change: function to call when work pool
status is changedReturns:
delete_work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L423" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> bool
Delete a WorkPool by id.
Args:
session: A database sessionwork_pool_id: a work pool idReturns:
get_scheduled_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L444" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_scheduled_flow_runs(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Optional[List[UUID]] = None, work_queue_ids: Optional[List[UUID]] = None, scheduled_before: Optional[datetime.datetime] = None, scheduled_after: Optional[datetime.datetime] = None, limit: Optional[int] = None, respect_queue_priorities: Optional[bool] = None) -> Sequence[schemas.responses.WorkerFlowRunResponse]
Get runs from queues in a specific work pool.
Args:
session: a database sessionwork_pool_ids: a list of work pool idswork_queue_ids: a list of work pool queue idsscheduled_before: a datetime to filter runs scheduled beforescheduled_after: a datetime to filter runs scheduled afterrespect_queue_priorities: whether or not to respect queue prioritieslimit: the maximum number of runs to returndb: a database interfaceReturns:
create_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L496" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue: schemas.actions.WorkQueueCreate) -> orm_models.WorkQueue
Creates a work pool queue.
Args:
session: a database sessionwork_pool_id: a work pool idwork_queue: a WorkQueue action modelReturns:
bulk_update_work_queue_priorities <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L553" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>bulk_update_work_queue_priorities(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, new_priorities: Dict[UUID, int]) -> None
This is a brute force update of all work pool queue priorities for a given work pool.
It loads all queues fully into memory, sorts them, and flushes the update to
the orm_models. The algorithm ensures that priorities are unique integers > 0, and
makes the minimum number of changes required to satisfy the provided
new_priorities. For example, if no queues currently have the provided
new_priorities, then they are assigned without affecting other queues. If
they are held by other queues, then those queues' priorities are
incremented as necessary.
Updating queue priorities is not a common operation (happens on the same scale as queue modification, which is significantly less than reading from queues), so while this implementation is slow, it may suffice and make up for that with extreme simplicity.
read_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L619" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkQueue]
Read all work pool queues for a work pool. Results are ordered by ascending priority.
Args:
session: a database sessionwork_pool_id: a work pool idwork_queue_filter: Filter criteria for work pool queuesoffset: Query offsetlimit: Query limitReturns:
count_work_queues <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L660" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> int
Count work pool queues for a work pool.
read_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L679" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]
Read a specific work pool queue.
Args:
session: a database sessionwork_queue_id: a work pool queue idReturns:
read_work_queue_by_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L699" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str, work_queue_name: str) -> Optional[orm_models.WorkQueue]
Reads a WorkQueue by name.
Args:
session: A database sessionwork_pool_name: a WorkPool namework_queue_name: a WorkQueue nameReturns:
update_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L733" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None, default_status: WorkQueueStatus = WorkQueueStatus.NOT_READY) -> bool
Update a work pool queue.
Args:
session: a database sessionwork_queue_id: a work pool queue IDwork_queue: a WorkQueue modelemit_status_change: function to call when work queue
status is changedReturns:
delete_work_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L857" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> bool
Delete a work pool queue.
Args:
session: a database sessionwork_queue_id: a work pool queue IDReturns:
read_workers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L905" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_workers(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_filter: Optional[schemas.filters.WorkerFilter] = None, limit: Optional[int] = None, offset: Optional[int] = None) -> Sequence[orm_models.Worker]
worker_heartbeat <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L934" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>worker_heartbeat(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str, heartbeat_interval_seconds: Optional[int] = None) -> bool
Record a worker process heartbeat.
Args:
session: a database sessionwork_pool_id: a work pool IDworker_name: a worker nameReturns:
delete_worker <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L984" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_worker(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str) -> bool
Delete a work pool's worker.
Args:
session: a database sessionwork_pool_id: a work pool IDworker_name: a worker nameReturns:
count_work_pool_slot_holders <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1089" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> int
Counts flow runs in slot-occupying states for a work pool.
get_work_pool_slot_holders <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1106" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_ids: Optional[List[UUID]] = None, flow_run_limit: Optional[int] = None) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]
Returns flow runs in slot-occupying states for a work pool.
Each result is a tuple of (FlowRun, slot_acquired_at) where slot_acquired_at is when the current slot-occupying sequence began.
Args:
work_pool_id: The work pool to query.work_queue_ids: If provided, only return runs for these queues.flow_run_limit: If provided, cap results per work_queue_id.count_work_pool_slot_holders_by_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1159" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_pool_slot_holders_by_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> dict[UUID, int]
Returns {work_queue_id: count} for slot-holding runs in a pool.
count_work_queue_slot_holders <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1192" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>count_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> int
Counts flow runs in slot-occupying states for a single work queue.
get_work_queue_slot_holders <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1213" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]
Returns flow runs in slot-occupying states for a single work queue.
Each result is a tuple of (FlowRun, slot_acquired_at) where slot_acquired_at is when the current slot-occupying sequence began.
emit_work_pool_updated_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1247" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>emit_work_pool_updated_event(session: AsyncSession, work_pool: orm_models.WorkPool, changed_fields: Dict[str, Dict[str, Any]]) -> None
Emit an event when work pool fields are updated.
emit_work_pool_status_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/workers.py#L1267" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>emit_work_pool_status_event(event_id: UUID, occurred: DateTime, pre_update_work_pool: Optional[orm_models.WorkPool], work_pool: orm_models.WorkPool) -> None