Back to Prefect

cloud_run

docs/integrations/prefect-gcp/api-ref/prefect_gcp-workers-cloud_run.mdx

3.6.30.dev310.1 KB
Original Source

prefect_gcp.workers.cloud_run

<!-- # noqa -->

Module containing the Cloud Run worker used for executing flow runs as Cloud Run jobs.

Get started by creating a Cloud Run work pool:

bash
prefect work-pool create 'my-cloud-run-pool' --type cloud-run

Then start a Cloud Run worker with the following command:

bash
prefect worker start --pool 'my-cloud-run-pool'

Configuration

Read more about configuring work pools here.

Advanced Configuration

!!! example "Using a custom Cloud Run job template" Below is the default job body template used by the Cloud Run Worker: json { "apiVersion": "run.googleapis.com/v1", "kind": "Job", "metadata": { "name": "{{ name }}", "annotations": { "run.googleapis.com/launch-stage": "BETA", } }, "spec": { "template": { "spec": { "template": { "spec": { "containers": [ { "image": "{{ image }}", "args": "{{ args }}", "resources": { "limits": { "cpu": "{{ cpu }}", "memory": "{{ memory }}" }, "requests": { "cpu": "{{ cpu }}", "memory": "{{ memory }}" } } } ], "timeoutSeconds": "{{ timeout }}", "serviceAccountName": "{{ service_account_name }}" } } } } }, "metadata": { "annotations": { "run.googleapis.com/vpc-access-connector": "{{ vpc_connector_name }}" } } }, }, "keep_job": "{{ keep_job }}" } Each values enclosed in {{ }} is a placeholder that will be replaced with a value at runtime on a per-deployment basis. The values that can be used a placeholders are defined by the variables schema defined in the base job template.

The default job body template and available variables can be customized on a work pool
by work pool basis. By editing the default job body template you can:

- Add additional placeholders to the default job template
- Remove placeholders from the default job template
- Pass values to Cloud Run that are not defined in the `variables` schema

### Adding additional placeholders
For example, to allow for extra customization of a new annotation not described in
the default job template, you can add the following:
```json
{
    "apiVersion": "run.googleapis.com/v1",
    "kind": "Job",
    "metadata":
    {
        "name": "{{ name }}",
        "annotations":
        {
            "run.googleapis.com/my-custom-annotation": "{{ my_custom_annotation }}",
            "run.googleapis.com/launch-stage": "BETA",
        },
      ...
    },
  ...
}
```
`my_custom_annotation` can now be used as a placeholder in the job template and set
on a per-deployment basis.

```yaml
# prefect.yaml
deployments:
...
  - name: my-deployment
  ...
  work_pool: my-cloud-run-pool
    job_variables: {"my_custom_annotation": "my-custom-value"}
```

Additionally, fields can be set to prevent configuration at the deployment
level. For example to configure the `vpc_connector_name` field, the placeholder can
be removed and replaced with an actual value. Now all deployments that point to this
work pool will use the same `vpc_connector_name` value.

```json
{
    "apiVersion": "run.googleapis.com/v1",
    "kind": "Job",
    "spec":
    {
        "template":
        {
            "metadata":
            {
                "annotations":
                {
                    "run.googleapis.com/vpc-access-connector": "my-vpc-connector"
                }
            },
            ...
        },
        ...
    }
}
```

Classes

CloudRunWorkerJobConfiguration <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L266" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Configuration class used by the Cloud Run Worker to create a Cloud Run Job.

An instance of this class is passed to the Cloud Run worker's run method for each flow run. It contains all information necessary to execute the flow run as a Cloud Run Job.

Attributes:

  • region: The region where the Cloud Run Job resides.
  • credentials: The GCP Credentials used to connect to Cloud Run.
  • job_body: The job body used to create the Cloud Run Job.
  • timeout: Max allowed duration the job may be active before Cloud Run will actively try to mark it failed and kill associated containers (maximum of 3600 seconds, 1 hour).
  • keep_job: Whether to delete the Cloud Run Job after it completes.
  • prefect_api_key_secret: A GCP secret containing a Prefect API Key.
  • prefect_api_auth_string_secret: A GCP secret containing a Prefect API authorization string.

Methods:

job_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L340" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
job_name(self) -> str

property for accessing the name from the job metadata.

prepare_for_flow_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L361" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
prepare_for_flow_run(self, flow_run: 'FlowRun', deployment: Optional['DeploymentResponse'] = None, flow: Optional['Flow'] = None, work_pool: Optional['WorkPool'] = None, worker_name: Optional[str] = None, worker_id: Optional['UUID'] = None)

Prepares the job configuration for a flow run.

Ensures that necessary values are present in the job body and that the job body is valid.

Args:

  • flow_run: The flow run to prepare the job configuration for
  • deployment: The deployment associated with the flow run used for preparation.
  • flow: The flow associated with the flow run used for preparation.

project <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L335" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
project(self) -> str

property for accessing the project from the credentials.

CloudRunWorkerVariables <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L569" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Default variables for the Cloud Run worker.

The schema for this class is used to populate the variables section of the default base job template.

CloudRunWorkerResult <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L670" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Contains information about the final state of a completed process

CloudRunWorker <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L674" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Prefect worker that executes flow runs within Cloud Run Jobs.

Methods:

kill_infrastructure <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L973" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
kill_infrastructure(self, infrastructure_pid: str, configuration: CloudRunWorkerJobConfiguration, grace_seconds: int = 30) -> None

Kill a Cloud Run Job by deleting it.

Args:

  • infrastructure_pid: The job name.
  • configuration: The job configuration used to connect to GCP.
  • grace_seconds: Not used for Cloud Run (GCP handles graceful shutdown).

Raises:

  • InfrastructureNotFound: If the job doesn't exist.

run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/workers/cloud_run.py#L725" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run(self, flow_run: 'FlowRun', configuration: CloudRunWorkerJobConfiguration, task_status: Optional[anyio.abc.TaskStatus] = None) -> CloudRunWorkerResult

Executes a flow run within a Cloud Run Job and waits for the flow run to complete.

Args:

  • flow_run: The flow run to execute
  • configuration: The configuration to use when executing the flow run.
  • task_status: The task status object for the current flow run. If provided, the task will be marked as started.

Returns:

  • A result object containing information about the final state of the flow run