Back to Prefect

utils

docs/integrations/prefect-dbt/api-ref/prefect_dbt-cloud-utils.mdx

3.6.30.dev34.3 KB
Original Source

prefect_dbt.cloud.utils

Utilities for common interactions with the dbt Cloud API

Functions

extract_user_message <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/utils.py#L12" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
extract_user_message(ex: HTTPStatusError) -> Optional[str]

Extracts user message from a error response from the dbt Cloud administrative API.

Args:

  • ex: An HTTPStatusError raised by httpx

Returns:

  • user_message from dbt Cloud administrative API response or None if a
  • user_message cannot be extracted

extract_developer_message <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/utils.py#L28" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
extract_developer_message(ex: HTTPStatusError) -> Optional[str]

Extracts developer message from a error response from the dbt Cloud administrative API.

Args:

  • ex: An HTTPStatusError raised by httpx

Returns:

  • developer_message from dbt Cloud administrative API response or None if a
  • developer_message cannot be extracted

call_dbt_cloud_administrative_api_endpoint <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/utils.py#L55" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_dbt_cloud_administrative_api_endpoint(dbt_cloud_credentials: DbtCloudCredentials, path: str, http_method: str, params: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None) -> Any

Task that calls a specified endpoint in the dbt Cloud administrative API. Use this task if a prebuilt one is not yet available.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • path: The partial path for the request (e.g. /projects/). Will be appended onto the base URL as determined by the client configuration.
  • http_method: HTTP method to call on the endpoint.
  • params: Query parameters to include in the request.
  • json: JSON serializable body to send in the request.

Returns:

  • The body of the response. If the body is JSON serializable, then the result of json.loads with the body as the input will be returned. Otherwise, the body will be returned directly.

Examples:

List projects for an account:

python
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint

@flow
def get_projects_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/projects/",
        http_method="GET",
    )
    return result["data"]

get_projects_flow()

Create a new job:

python
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint


@flow
def create_job_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/jobs/",
        http_method="POST",
        json={
            "id": None,
            "account_id": 123456789,
            "project_id": 100,
            "environment_id": 10,
            "name": "Nightly run",
            "dbt_version": None,
            "triggers": {"github_webhook": True, "schedule": True},
            "execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
            "settings": {"threads": 4, "target_name": "prod"},
            "state": 1,
            "schedule": {
                "date": {"type": "every_day"},
                "time": {"type": "every_hour", "interval": 1},
            },
        },
    )
    return result["data"]

create_job_flow()

Classes

DbtCloudAdministrativeApiCallFailed <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/utils.py#L45" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when a call to dbt Cloud administrative API fails.