Back to Prefect

prefect-kubernetes

docs/integrations/prefect-kubernetes/index.mdx

3.6.30.dev34.6 KB
Original Source

prefect-kubernetes contains Prefect tasks, flows, and blocks enabling orchestration, observation and management of Kubernetes resources.

This library is most commonly used for installation with a Kubernetes worker. See the Prefect docs on deploying with Kubernetes to learn how to create and run deployments in Kubernetes.

Prefect provides a Helm chart for deploying a worker, a self-hosted Prefect server instance, and other resources to a Kubernetes cluster. See the Prefect Helm chart for more information.

Kubernetes Worker

The Kubernetes worker executes flow runs as Kubernetes Jobs. When you create a Kubernetes work pool, you can customize the base job template to control how jobs are created.

<Warning> **Important**: When customizing a work pool's base job template, variables defined in the `variables` section must be explicitly referenced in `job_configuration` using `{{ variable_name }}` syntax to take effect. If you add or modify a variable in `variables` but don't reference it in `job_configuration`, its value (including defaults) will not be passed to the worker.

For example, if you set a default for cluster_config in variables, ensure your job_configuration includes "cluster_config": "{{ cluster_config }}". </Warning>

See the Kubernetes deployment guide for complete setup instructions.

Getting started

Prerequisites

Install prefect-kubernetes

The following command will install a version of prefect-kubernetes compatible with your installed version of prefect. If you don't already have prefect installed, it will install the newest version of prefect as well.

bash
pip install "prefect[kubernetes]"

Upgrade to the latest versions of prefect and prefect-kubernetes:

bash
pip install -U "prefect[kubernetes]"

Register newly installed block types

Register the block types in the prefect-kubernetes module to make them available for use.

bash
prefect block register -m prefect_kubernetes

Examples

Use with_options to customize options on an existing task or flow

python
from prefect_kubernetes.flows import run_namespaced_job

customized_run_namespaced_job = run_namespaced_job.with_options(
    name="My flow running a Kubernetes Job",
    retries=2,
    retry_delay_seconds=10,
) # this is now a new flow object that can be called

Specify and run a Kubernetes Job from a YAML file

python
from prefect import flow, get_run_logger
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob

k8s_creds = KubernetesCredentials.load("k8s-creds")

job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
    credentials=k8s_creds,
    manifest_path="path/to/job.yaml",
)

job.save("my-k8s-job", overwrite=True)


@flow
def kubernetes_orchestrator():
    # run the flow and send logs to the parent flow run's logger
    logger = get_run_logger()
    run_namespaced_job(job, print_func=logger.info)


if __name__ == "__main__":
    kubernetes_orchestrator()

As with all Prefect flows and tasks, you can call the underlying function directly if you don't need Prefect features:

python
run_namespaced_job.fn(job, print_func=print)

Generate a resource-specific client from KubernetesClusterConfig

python
# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)

with k8s_credentials.get_client("core") as v1_core_client:
    for namespace in v1_core_client.list_namespace().items:
        print(namespace.metadata.name)

List jobs in a namespace

python
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.jobs import list_namespaced_job


@flow
def kubernetes_orchestrator():
    v1_job_list = list_namespaced_job(
        kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
        namespace="my-namespace",
    )

For assistance using Kubernetes, consult the Kubernetes documentation.

Refer to the prefect-kubernetes SDK documentation to explore all the capabilities of the prefect-kubernetes library.