Back to Prefect

base

docs/v3/api-ref/python/prefect-workers-base.mdx

3.7.012.0 KB
Original Source

prefect.workers.base

Classes

BaseJobConfiguration <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L115" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

from_template_and_values <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L178" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
from_template_and_values(cls, base_job_template: dict[str, Any], values: dict[str, Any], client: 'PrefectClient | None' = None)

Creates a valid worker configuration object from the provided base configuration and overrides.

Important: this method expects that the base_job_template was already validated server-side.

is_using_a_runner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L148" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_using_a_runner(self) -> bool

json_template <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L247" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
json_template(cls) -> dict[str, Any]

Returns a dict with job configuration as keys and the corresponding templates as values

Defaults to using the job configuration parameter name as the template variable name.

e.g.

python
{
    key1: '{{ key1 }}',     # default variable template
    key2: '{{ template2 }}', # `template2` specifically provide as template
}

prepare_for_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L271" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: 'DeploymentResponse | None' = None, flow: 'APIFlow | None' = None, work_pool: 'WorkPool | None' = None, worker_name: str | None = None, worker_id: 'UUID | None' = None) -> None

Prepare the job configuration for a flow run.

This method is called by the worker before starting a flow run. It should be used to set any configuration values that are dependent on the flow run.

Args:

  • flow_run: The flow run to be executed.
  • deployment: The deployment that the flow run is associated with.
  • flow: The flow that the flow run is associated with.
  • work_pool: The work pool that the flow run is running in.
  • worker_name: The name of the worker that is submitting the flow run.
  • worker_id: The backend ID of the worker that is submitting the flow run.

BaseVariables <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L472" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

model_json_schema <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L496" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
model_json_schema(cls, by_alias: bool = True, ref_template: str = '#/definitions/{model}', schema_generator: Type[GenerateJsonSchema] = GenerateJsonSchema, mode: Literal['validation', 'serialization'] = 'validation') -> dict[str, Any]

TODO: stop overriding this method - use GenerateSchema in ConfigDict instead?

BaseWorkerResult <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L525" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

BaseWorker <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L540" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L621" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> PrefectClient

get_all_available_worker_types <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L688" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_all_available_worker_types() -> list[str]

Returns all worker types available in the local registry.

get_and_submit_flow_runs <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1199" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_and_submit_flow_runs(self) -> list['FlowRun']

get_default_base_job_template <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L657" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_default_base_job_template(cls) -> dict[str, Any]

get_description <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L653" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_description(cls) -> str

get_documentation_url <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L645" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_documentation_url(cls) -> str

get_flow_run_logger <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L701" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_flow_run_logger(self, flow_run: 'FlowRun') -> PrefectLogAdapter

get_logo_url <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L649" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_logo_url(cls) -> str

get_name_slug <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L698" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_name_slug(self) -> str

get_status <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1587" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_status(self) -> dict[str, Any]

Retrieves the status of the current worker including its name, current worker pool, the work pool queues it is polling, and its local settings.

get_worker_class_from_type <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L675" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_worker_class_from_type(type: str) -> Optional[Type['BaseWorker[Any, Any, Any]']]

Returns the worker class for a given worker type. If the worker type is not recognized, returns None.

is_worker_still_polling <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1170" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_worker_still_polling(self, query_interval_seconds: float) -> bool

This method is invoked by a webserver healthcheck handler and returns a boolean indicating if the worker has recorded a scheduled flow run poll within a variable amount of time.

The query_interval_seconds is the same value that is used by the loop services - we will evaluate if the _last_polled_time was within that interval x 30 (so 10s -> 5m)

The instance property self._last_polled_time is currently set/updated in get_and_submit_flow_runs()

kill_infrastructure <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1839" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
kill_infrastructure(self, infrastructure_pid: str, configuration: C, grace_seconds: int = 30) -> None

Kill infrastructure for a flow run.

Override this method in subclasses to implement infrastructure-specific termination logic.

Args:

  • infrastructure_pid: The infrastructure identifier from the flow run.
  • configuration: The job configuration for connecting to infrastructure.
  • grace_seconds: Time to allow for graceful shutdown before force killing.

Raises:

  • NotImplementedError: If the worker doesn't support killing infrastructure.
  • InfrastructureNotFound: If the infrastructure doesn't exist.
  • InfrastructureNotAvailable: If the infrastructure can't be killed by this worker.

limiter <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L637" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
limiter(self) -> anyio.CapacityLimiter

run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L810" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run(self, flow_run: 'FlowRun', configuration: C, task_status: Optional[anyio.abc.TaskStatus[int]] = None) -> R

Runs a given flow run on the current worker.

setup <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1096" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup(self) -> None

Prepares the worker to run.

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L717" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self, run_once: bool = False, with_healthcheck: bool = False, printer: Callable[..., None] = print) -> None

Starts the worker and runs the main worker loops.

By default, the worker will run loops to poll for scheduled/cancelled flow runs and sync with the Prefect API server.

If run_once is set, the worker will only run each loop once and then return.

If with_healthcheck is set, the worker will start a healthcheck server which can be used to determine if the worker is still polling for flow runs and restart the worker if necessary.

Args:

  • run_once: If set, the worker will only run each loop once then return.
  • with_healthcheck: If set, the worker will start a healthcheck server.
  • printer: A print-like function where logs will be reported.

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L842" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, flow: 'Flow[..., FR]', parameters: dict[str, Any] | None = None, job_variables: dict[str, Any] | None = None, flow_run: 'FlowRun | None' = None) -> 'PrefectFlowRunFuture[FR]'

EXPERIMENTAL: The interface for this method is subject to change.

Submits a flow to run via the worker.

Args:

  • flow: The flow to submit
  • parameters: The parameters to pass to the flow
  • job_variables: Job variables for infrastructure configuration
  • flow_run: Optional existing flow run to retry (reuses ID instead of creating new)

Returns:

  • A flow run future

sync_with_backend <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1321" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
sync_with_backend(self) -> None

Updates the worker's local information about it's current work pool and queues. Sends a worker heartbeat to the API.

teardown <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L1144" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
teardown(self, *exc_info: Any) -> None

Cleans up resources after the worker is stopped.

work_pool <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/base.py#L629" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
work_pool(self) -> WorkPool