| title | workers |
|---|---|
| sidebarTitle | workers |
Functions for interacting with worker ORM objects. Intended for internal use by the Prefect REST API.
create_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool: Union[schemas.core.WorkPool, schemas.actions.WorkPoolCreate]) -> orm_models.WorkPoolCreates a work pool.
If a WorkPool with the same name exists, an error will be thrown.
Args:
session: a database sessionwork_pool: a WorkPool model
Returns:
- orm_models.WorkPool: the newly-created WorkPool
read_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> Optional[orm_models.WorkPool]Reads a WorkPool by id.
Args:
session: A database sessionwork_pool_id: a WorkPool id
Returns:
- orm_models.WorkPool: the WorkPool
read_work_pool_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str) -> Optional[orm_models.WorkPool]Reads a WorkPool by name.
Args:
session: A database sessionwork_pool_name: a WorkPool name
Returns:
- orm_models.WorkPool: the WorkPool
read_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkPool]Read worker configs.
Args:
session: A database sessionoffset: Query offsetlimit: Query limit
Returns: List[orm_models.WorkPool]: worker configs
count_work_pools(db: PrefectDBInterface, session: AsyncSession, work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None) -> intRead worker configs.
Args:
session: A database sessionwork_pool_filter: filter criteria to apply to the count
Returns: int: the count of work pools matching the criteria
count_work_pool_active_slots(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> intCount flow runs in slot-occupying states (Pending, Running) for a given work pool. Does not filter on queue pause status — paused queues may still have running/pending runs consuming resources. This matches the behavior of count_work_pool_slot_holders / get_work_pool_slot_holders.
count_work_pool_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Sequence[UUID]) -> dict[UUID, int]Count active slots for multiple work pools in a single query. Returns a mapping of work_pool_id -> active slot count. Does not filter on queue pause status (see count_work_pool_active_slots).
count_work_queue_active_slots(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> intCount flow runs in slot-occupying states (Pending, Running) for a given work queue under a work pool. Counts by work_queue_id FK only.
count_work_queue_active_slots_bulk(db: PrefectDBInterface, session: AsyncSession, work_queue_ids: Sequence[UUID]) -> dict[UUID, int]Count active slots for multiple work queues in a single query. Returns a mapping of work_queue_id -> active slot count.
update_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_pool: schemas.actions.WorkPoolUpdate, emit_status_change: Optional[Callable[[UUID, DateTime, orm_models.WorkPool, orm_models.WorkPool], Awaitable[None]]] = None) -> boolUpdate a WorkPool by id.
Args:
session: A database sessionwork_pool_id: a WorkPool idworker: the work queue dataemit_status_change: function to call when work pool status is changed
Returns:
- whether or not the worker was updated
delete_work_pool(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> boolDelete a WorkPool by id.
Args:
session: A database sessionwork_pool_id: a work pool id
Returns:
- whether or not the WorkPool was deleted
get_scheduled_flow_runs(db: PrefectDBInterface, session: AsyncSession, work_pool_ids: Optional[List[UUID]] = None, work_queue_ids: Optional[List[UUID]] = None, scheduled_before: Optional[datetime.datetime] = None, scheduled_after: Optional[datetime.datetime] = None, limit: Optional[int] = None, respect_queue_priorities: Optional[bool] = None) -> Sequence[schemas.responses.WorkerFlowRunResponse]Get runs from queues in a specific work pool.
Args:
session: a database sessionwork_pool_ids: a list of work pool idswork_queue_ids: a list of work pool queue idsscheduled_before: a datetime to filter runs scheduled beforescheduled_after: a datetime to filter runs scheduled afterrespect_queue_priorities: whether or not to respect queue prioritieslimit: the maximum number of runs to returndb: a database interface
Returns:
- List[WorkerFlowRunResponse]: the runs, as well as related work pool details
create_work_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue: schemas.actions.WorkQueueCreate) -> orm_models.WorkQueueCreates a work pool queue.
Args:
session: a database sessionwork_pool_id: a work pool idwork_queue: a WorkQueue action model
Returns:
- orm_models.WorkQueue: the newly-created WorkQueue
bulk_update_work_queue_priorities(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, new_priorities: Dict[UUID, int]) -> NoneThis is a brute force update of all work pool queue priorities for a given work pool.
It loads all queues fully into memory, sorts them, and flushes the update to
the orm_models. The algorithm ensures that priorities are unique integers > 0, and
makes the minimum number of changes required to satisfy the provided
new_priorities. For example, if no queues currently have the provided
new_priorities, then they are assigned without affecting other queues. If
they are held by other queues, then those queues' priorities are
incremented as necessary.
Updating queue priorities is not a common operation (happens on the same scale as queue modification, which is significantly less than reading from queues), so while this implementation is slow, it may suffice and make up for that with extreme simplicity.
read_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[orm_models.WorkQueue]Read all work pool queues for a work pool. Results are ordered by ascending priority.
Args:
session: a database sessionwork_pool_id: a work pool idwork_queue_filter: Filter criteria for work pool queuesoffset: Query offsetlimit: Query limit
Returns:
- List[orm_models.WorkQueue]: the WorkQueues
count_work_queues(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None) -> intCount work pool queues for a work pool.
read_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]) -> Optional[orm_models.WorkQueue]Read a specific work pool queue.
Args:
session: a database sessionwork_queue_id: a work pool queue id
Returns:
- orm_models.WorkQueue: the WorkQueue
read_work_queue_by_name(db: PrefectDBInterface, session: AsyncSession, work_pool_name: str, work_queue_name: str) -> Optional[orm_models.WorkQueue]Reads a WorkQueue by name.
Args:
session: A database sessionwork_pool_name: a WorkPool namework_queue_name: a WorkQueue name
Returns:
- orm_models.WorkQueue: the WorkQueue
update_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, work_queue: schemas.actions.WorkQueueUpdate, emit_status_change: Optional[Callable[[orm_models.WorkQueue], Awaitable[None]]] = None, default_status: WorkQueueStatus = WorkQueueStatus.NOT_READY) -> boolUpdate a work pool queue.
Args:
session: a database sessionwork_queue_id: a work pool queue IDwork_queue: a WorkQueue modelemit_status_change: function to call when work queue status is changed
Returns:
- whether or not the WorkQueue was updated
delete_work_queue(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> boolDelete a work pool queue.
Args:
session: a database sessionwork_queue_id: a work pool queue ID
Returns:
- whether or not the WorkQueue was deleted
read_workers(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_filter: Optional[schemas.filters.WorkerFilter] = None, limit: Optional[int] = None, offset: Optional[int] = None) -> Sequence[orm_models.Worker]worker_heartbeat(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str, heartbeat_interval_seconds: Optional[int] = None) -> boolRecord a worker process heartbeat.
Args:
session: a database sessionwork_pool_id: a work pool IDworker_name: a worker name
Returns:
- whether or not the worker was updated
delete_worker(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, worker_name: str) -> boolDelete a work pool's worker.
Args:
session: a database sessionwork_pool_id: a work pool IDworker_name: a worker name
Returns:
- whether or not the Worker was deleted
count_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> intCounts flow runs in slot-occupying states for a work pool.
get_work_pool_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID, work_queue_ids: Optional[List[UUID]] = None, flow_run_limit: Optional[int] = None) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]Returns flow runs in slot-occupying states for a work pool.
Each result is a tuple of (FlowRun, slot_acquired_at) where slot_acquired_at is when the current slot-occupying sequence began.
Args:
work_pool_id: The work pool to query.work_queue_ids: If provided, only return runs for these queues.flow_run_limit: If provided, cap results per work_queue_id.
count_work_pool_slot_holders_by_queue(db: PrefectDBInterface, session: AsyncSession, work_pool_id: UUID) -> dict[UUID, int]Returns {work_queue_id: count} for slot-holding runs in a pool.
count_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID) -> intCounts flow runs in slot-occupying states for a single work queue.
get_work_queue_slot_holders(db: PrefectDBInterface, session: AsyncSession, work_queue_id: UUID, offset: Optional[int] = None, limit: Optional[int] = None) -> Sequence[tuple[orm_models.FlowRun, Optional[DateTime]]]Returns flow runs in slot-occupying states for a single work queue.
Each result is a tuple of (FlowRun, slot_acquired_at) where slot_acquired_at is when the current slot-occupying sequence began.
emit_work_pool_updated_event(session: AsyncSession, work_pool: orm_models.WorkPool, changed_fields: Dict[str, Dict[str, Any]]) -> NoneEmit an event when work pool fields are updated.
emit_work_pool_status_event(event_id: UUID, occurred: DateTime, pre_update_work_pool: Optional[orm_models.WorkPool], work_pool: orm_models.WorkPool) -> None