Back to Prefect

jobs

docs/integrations/prefect-dbt/api-ref/prefect_dbt-cloud-jobs.mdx

3.6.30.dev317.0 KB
Original Source

prefect_dbt.cloud.jobs

Module containing tasks and flows for interacting with dbt Cloud jobs

Functions

get_dbt_cloud_job_info <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L170" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_dbt_cloud_job_info(dbt_cloud_credentials: DbtCloudCredentials, job_id: int, order_by: Optional[str] = None) -> Dict

A task to retrieve information about a dbt Cloud job.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • job_id: The ID of the job to get.

Returns:

  • The job data returned by the dbt Cloud administrative API.

create_dbt_cloud_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L222" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_dbt_cloud_job(dbt_cloud_credentials: DbtCloudCredentials, project_id: int, environment_id: int, name: str, execute_steps: Optional[List[str]] = None, **kwargs: Any) -> Dict

A task to create a new dbt Cloud job.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • project_id: The ID of the project to create the job in.
  • environment_id: The ID of the environment for the job.
  • name: The name of the job.
  • execute_steps: List of dbt commands to execute (e.g. ["dbt run", "dbt test"]). Defaults to ["dbt build"].
  • **kwargs: Additional job configuration options.

Returns:

  • The job data returned by the dbt Cloud administrative API.

delete_dbt_cloud_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L295" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
delete_dbt_cloud_job(dbt_cloud_credentials: DbtCloudCredentials, job_id: int) -> None

A task to delete a dbt Cloud job.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • job_id: The ID of the job to delete.

trigger_dbt_cloud_job_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L346" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
trigger_dbt_cloud_job_run(dbt_cloud_credentials: DbtCloudCredentials, job_id: int, options: Optional[TriggerJobRunOptions] = None) -> Dict

A task to trigger a dbt Cloud job run.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • job_id: The ID of the job to trigger.
  • options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run.

Returns:

  • The run data returned from the dbt Cloud administrative API.

Examples:

Trigger a dbt Cloud job run:

python
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run


@flow
def trigger_dbt_cloud_job_run_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1)


trigger_dbt_cloud_job_run_flow()

Trigger a dbt Cloud job run with overrides:

python
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
from prefect_dbt.cloud.models import TriggerJobRunOptions


@flow
def trigger_dbt_cloud_job_run_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    trigger_dbt_cloud_job_run(
        dbt_cloud_credentials=credentials,
        job_id=1,
        options=TriggerJobRunOptions(
            git_branch="staging",
            schema_override="dbt_cloud_pr_123",
            dbt_version_override="0.18.0",
            target_name_override="staging",
            timeout_seconds_override=3000,
            generate_docs_override=True,
            threads_override=8,
            steps_override=[
                "dbt seed",
                "dbt run --fail-fast",
                "dbt test --fail-fast",
            ],
        ),
    )


trigger_dbt_cloud_job_run()

get_run_id <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L446" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_run_id(obj: Dict)

Task that extracts the run ID from a trigger job run API response,

This task is mainly used to maintain dependency tracking between the trigger_dbt_cloud_job_run task and downstream tasks/flows that use the run ID.

Args:

  • obj: The JSON body from the trigger job run response.

trigger_dbt_cloud_job_run_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L492" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
trigger_dbt_cloud_job_run_and_wait_for_completion(dbt_cloud_credentials: DbtCloudCredentials, job_id: int, trigger_job_run_options: Optional[TriggerJobRunOptions] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, retry_filtered_models_attempts: int = 3) -> Dict

Flow that triggers a job run and waits for the triggered run to complete.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • job_id: The ID of the job to trigger.
  • trigger_job_run_options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run.
  • max_wait_seconds: Maximum number of seconds to wait for job to complete
  • poll_frequency_seconds: Number of seconds to wait in between checks for run completion.
  • retry_filtered_models_attempts: Number of times to retry models selected by retry_status_filters.

Raises:

  • DbtCloudJobRunCancelled: The triggered dbt Cloud job run was cancelled.
  • DbtCloudJobRunFailed: The triggered dbt Cloud job run failed.
  • RuntimeError: The triggered dbt Cloud job run ended in an unexpected state.

Returns:

  • The run data returned by the dbt Cloud administrative API.

Examples:

Trigger a dbt Cloud job and wait for completion as a stand alone flow:

python
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion

asyncio.run(
    trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1
    )
)

Trigger a dbt Cloud job and wait for completion as a sub-flow:

python
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion

@flow
def my_flow():
    ...
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1
    )
    ...

my_flow()

Trigger a dbt Cloud job with overrides:

python
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
from prefect_dbt.cloud.models import TriggerJobRunOptions

asyncio.run(
    trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1,
        trigger_job_run_options=TriggerJobRunOptions(
            git_branch="staging",
            schema_override="dbt_cloud_pr_123",
            dbt_version_override="0.18.0",
            target_name_override="staging",
            timeout_seconds_override=3000,
            generate_docs_override=True,
            threads_override=8,
            steps_override=[
                "dbt seed",
                "dbt run --fail-fast",
                "dbt test --fail fast",
            ],
        ),
    )
)

retry_dbt_cloud_job_run_subset_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L798" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
retry_dbt_cloud_job_run_subset_and_wait_for_completion(dbt_cloud_credentials: DbtCloudCredentials, run_id: int, trigger_job_run_options: Optional[TriggerJobRunOptions] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10) -> Dict

Flow that retrys a subset of dbt Cloud job run, filtered by select statuses, and waits for the triggered retry to complete.

Args:

  • dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
  • trigger_job_run_options: An optional TriggerJobRunOptions instance to specify overrides for the triggered job run.
  • max_wait_seconds: Maximum number of seconds to wait for job to complete
  • poll_frequency_seconds: Number of seconds to wait in between checks for run completion.
  • run_id: The ID of the job run to retry.

Raises:

  • ValueError: If trigger_job_run_options.steps_override is set by the user.

Returns:

  • The run data returned by the dbt Cloud administrative API.

Examples:

Retry a subset of models in a dbt Cloud job run and wait for completion:

python
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import retry_dbt_cloud_job_run_subset_and_wait_for_completion

@flow
def retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow():
    credentials = DbtCloudCredentials.load("MY_BLOCK_NAME")
    retry_dbt_cloud_job_run_subset_and_wait_for_completion(
        dbt_cloud_credentials=credentials,
        run_id=88640123,
    )

retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow()

run_dbt_cloud_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1346" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_dbt_cloud_job(dbt_cloud_job: DbtCloudJob, targeted_retries: int = 3, create_assets: bool = False) -> Dict[str, Any]

Flow that triggers and waits for a dbt Cloud job run, retrying a subset of failed nodes if necessary.

Args:

  • dbt_cloud_job: Block that holds the information and methods to interact with a dbt Cloud job.
  • targeted_retries: The number of times to retry failed steps.
  • create_assets: Whether to create Prefect asset materializations for successfully executed dbt models, seeds, and snapshots.

Examples:

python
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
def run_dbt_cloud_job_flow():
    dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
    dbt_cloud_job = DbtCloudJob(
        dbt_cloud_credentials=dbt_cloud_credentials, job_id=154217
    )
    return run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job)

run_dbt_cloud_job_flow()

Classes

DbtCloudJobRun <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L888" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Class that holds the information and methods to interact with the resulting run of a dbt Cloud job.

Methods:

dbt_cloud_credentials <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L904" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
dbt_cloud_credentials(self) -> DbtCloudCredentials

fetch_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L994" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fetch_result(self, step: Optional[int] = None) -> Dict[str, Any]

Gets the results from the job run. Since the results may not be ready, use wait_for_completion before calling this method.

Args:

  • step: The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run.

get_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L952" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_run(self) -> Dict[str, Any]

Makes a request to the dbt Cloud API to get the run data.

Returns:

  • The run data.

get_run_artifacts <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1031" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_run_artifacts(self, path: Literal['manifest.json', 'catalog.json', 'run_results.json'], step: Optional[int] = None) -> Union[Dict[str, Any], str]

Get an artifact generated for a completed run.

Args:

  • path: The relative path to the run artifact.
  • step: The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run.

Returns:

  • The contents of the requested manifest. Returns a Dict if the requested artifact is a JSON file and a str otherwise.

get_status_code <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L969" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_status_code(self) -> int

Makes a request to the dbt Cloud API to get the run status.

Returns:

  • The run status code.

retry_failed_steps <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1197" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
retry_failed_steps(self) -> 'DbtCloudJobRun'

Retries steps that did not complete successfully in a run.

Returns:

  • A representation of the dbt Cloud job run.

wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L981" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
wait_for_completion(self) -> None

Waits for the job run to reach a terminal state.

DbtCloudJob <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1222" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Block that holds the information and methods to interact with a dbt Cloud job.

Attributes:

  • dbt_cloud_credentials: The credentials to use to authenticate with dbt Cloud.
  • job_id: The id of the dbt Cloud job.
  • timeout_seconds: The number of seconds to wait for the job to complete.
  • interval_seconds: The number of seconds to wait between polling for job completion.
  • trigger_job_run_options: The options to use when triggering a job run.

Examples:

Load a configured dbt Cloud job block.

python
from prefect_dbt.cloud import DbtCloudJob

dbt_cloud_job = DbtCloudJob.load("BLOCK_NAME")

Triggers a dbt Cloud job, waits for completion, and fetches the results.

python
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob

@flow
def dbt_cloud_job_flow():
    dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
    dbt_cloud_job = DbtCloudJob.load(
        dbt_cloud_credentials=dbt_cloud_credentials,
        job_id=154217
    )
    dbt_cloud_job_run = dbt_cloud_job.trigger()
    dbt_cloud_job_run.wait_for_completion()
    dbt_cloud_job_run.fetch_result()
    return dbt_cloud_job_run

dbt_cloud_job_flow()

Methods:

get_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1288" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_job(self, order_by: Optional[str] = None) -> Dict[str, Any]

Retrieve information about a dbt Cloud job.

Args:

  • order_by: The field to order the results by.

Returns:

  • The job data.

trigger <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1309" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
trigger(self, trigger_job_run_options: Optional[TriggerJobRunOptions] = None) -> DbtCloudJobRun

Triggers a dbt Cloud job.

Returns:

  • A representation of the dbt Cloud job run.