Back to Prefect

base

docs/v3/api-ref/python/prefect-workers-base.mdx

3.6.30.dev312.0 KB
Original Source

prefect.workers.base

Classes

BaseJobConfiguration <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L116" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

from_template_and_values <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L179" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
from_template_and_values(cls, base_job_template: dict[str, Any], values: dict[str, Any], client: 'PrefectClient | None' = None)

Creates a valid worker configuration object from the provided base configuration and overrides.

Important: this method expects that the base_job_template was already validated server-side.

is_using_a_runner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L149" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_using_a_runner(self) -> bool

json_template <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L248" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_template(cls) -> dict[str, Any]

Returns a dict with job configuration as keys and the corresponding templates as values

Defaults to using the job configuration parameter name as the template variable name.

e.g.

python
{
    key1: '{{ key1 }}',     # default variable template
    key2: '{{ template2 }}', # `template2` specifically provide as template
}

prepare_for_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L272" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None, worker_id: 'UUID | None' = None) -> None

Prepare the job configuration for a flow run.

This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run.

Args:

  • flow_run: The flow run to be executed.
  • deployment: The deployment that the flow run is associated with.
  • flow: The flow that the flow run is associated with.
  • work_pool: The work pool that the flow run is running in.
  • worker_name: The name of the worker that is submitting the flow run.
  • worker_id: The backend ID of the worker that is submitting the flow run.

BaseVariables <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L473" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

model_json_schema <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L497" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
model_json_schema(cls, by_alias: bool = True, ref_template: str = '#/definitions/{model}', schema_generator: Type[GenerateJsonSchema] = GenerateJsonSchema, mode: Literal['validation', 'serialization'] = 'validation') -> dict[str, Any]

TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?

BaseWorkerResult <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L526" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

BaseWorker <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L541" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L622" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> PrefectClient

get_all_available_worker_types <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L689" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_all_available_worker_types() -> list[str]

Returns all worker types available in the local registry.

get_and_submit_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1185" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_and_submit_flow_runs(self) -> list['FlowRun']

get_default_base_job_template <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L658" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_default_base_job_template(cls) -> dict[str, Any]

get_description <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L654" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_description(cls) -> str

get_documentation_url <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L646" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_documentation_url(cls) -> str

get_flow_run_logger <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L702" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_flow_run_logger(self, flow_run: 'FlowRun') -> PrefectLogAdapter

get_logo_url <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L650" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_logo_url(cls) -> str

get_name_slug <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L699" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_name_slug(self) -> str

get_status <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1573" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_status(self) -> dict[str, Any]

Retrieves the status of the current worker including its name, current worker pool, the work pool queues it is polling, and its local settings.

get_worker_class_from_type <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L676" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_worker_class_from_type(type: str) -> Optional[Type['BaseWorker[Any, Any, Any]']]

Returns the worker class for a given worker type. If the worker type is not recognized, returns None.

is_worker_still_polling <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1156" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_worker_still_polling(self, query_interval_seconds: float) -> bool

This method is invoked by a webserver healthcheck handler and returns a boolean indicating if the worker has recorded a scheduled flow run poll within a variable amount of time.

The query_interval_seconds is the same value that is used by the loop services - we will evaluate if the _last_polled_time was within that interval x 30 (so 10s -> 5m)

The instance property self._last_polled_time is currently set/updated in get_and_submit_flow_runs()

kill_infrastructure <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1825" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
kill_infrastructure(self, infrastructure_pid: str, configuration: C, grace_seconds: int = 30) -> None

Kill infrastructure for a flow run.

Override this method in subclasses to implement infrastructure-specific termination logic.

Args:

  • infrastructure_pid: The infrastructure identifier from the flow run.
  • configuration: The job configuration for connecting to infrastructure.
  • grace_seconds: Time to allow for graceful shutdown before force killing.

Raises:

  • NotImplementedError: If the worker doesn't support killing infrastructure.
  • InfrastructureNotFound: If the infrastructure doesn't exist.
  • InfrastructureNotAvailable: If the infrastructure can't be killed by this worker.

limiter <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L638" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
limiter(self) -> anyio.CapacityLimiter

run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L811" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run(self, flow_run: 'FlowRun', configuration: C, task_status: Optional[anyio.abc.TaskStatus[int]] = None) -> R

Runs a given flow run on the current worker.

setup <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1082" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup(self) -> None

Prepares the worker to run.

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L718" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self, run_once: bool = False, with_healthcheck: bool = False, printer: Callable[..., None] = print) -> None

Starts the worker and runs the main worker loops.

By default, the worker will run loops to poll for scheduled/cancelled flow runs and sync with the Prefect API server.

If run_once is set, the worker will only run each loop once and then return.

If with_healthcheck is set, the worker will start a healthcheck server which can be used to determine if the worker is still polling for flow runs and restart the worker if necessary.

Args:

  • run_once: If set, the worker will only run each loop once then return.
  • with_healthcheck: If set, the worker will start a healthcheck server.
  • printer: A print-like function where logs will be reported.

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L843" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, flow: 'Flow[..., FR]', parameters: dict[str, Any] | None = None, job_variables: dict[str, Any] | None = None, flow_run: 'FlowRun | None' = None) -> 'PrefectFlowRunFuture[FR]'

EXPERIMENTAL: The interface for this method is subject to change.

Submits a flow to run via the worker.

Args:

  • flow: The flow to submit
  • parameters: The parameters to pass to the flow
  • job_variables: Job variables for infrastructure configuration
  • flow_run: Optional existing flow run to retry (reuses ID instead of creating new)

Returns:

  • A flow run future

sync_with_backend <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1307" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
sync_with_backend(self) -> None

Updates the worker's local information about it's current work pool and queues. Sends a worker heartbeat to the API.

teardown <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1130" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
teardown(self, *exc_info: Any) -> None

Cleans up resources after the worker is stopped.

work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L630" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
work_pool(self) -> WorkPool