docs/integrations/prefect-dbt/api-ref/prefect_dbt-cloud-jobs.mdx
prefect_dbt.cloud.jobsModule containing tasks and flows for interacting with dbt Cloud jobs
get_dbt_cloud_job_info <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L170" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_dbt_cloud_job_info(dbt_cloud_credentials: DbtCloudCredentials, job_id: int, order_by: Optional[str] = None) -> Dict
A task to retrieve information about a dbt Cloud job.
Args:
dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.job_id: The ID of the job to get.Returns:
create_dbt_cloud_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L222" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_dbt_cloud_job(dbt_cloud_credentials: DbtCloudCredentials, project_id: int, environment_id: int, name: str, execute_steps: Optional[List[str]] = None, **kwargs: Any) -> Dict
A task to create a new dbt Cloud job.
Args:
dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.project_id: The ID of the project to create the job in.environment_id: The ID of the environment for the job.name: The name of the job.execute_steps: List of dbt commands to execute (e.g. ["dbt run", "dbt test"]).
Defaults to ["dbt build"].**kwargs: Additional job configuration options.Returns:
delete_dbt_cloud_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L295" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>delete_dbt_cloud_job(dbt_cloud_credentials: DbtCloudCredentials, job_id: int) -> None
A task to delete a dbt Cloud job.
Args:
dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.job_id: The ID of the job to delete.trigger_dbt_cloud_job_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L346" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>trigger_dbt_cloud_job_run(dbt_cloud_credentials: DbtCloudCredentials, job_id: int, options: Optional[TriggerJobRunOptions] = None) -> Dict
A task to trigger a dbt Cloud job run.
Args:
dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.job_id: The ID of the job to trigger.options: An optional TriggerJobRunOptions instance to specify overrides
for the triggered job run.Returns:
Examples:
Trigger a dbt Cloud job run:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
@flow
def trigger_dbt_cloud_job_run_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1)
trigger_dbt_cloud_job_run_flow()
Trigger a dbt Cloud job run with overrides:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
from prefect_dbt.cloud.models import TriggerJobRunOptions
@flow
def trigger_dbt_cloud_job_run_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
trigger_dbt_cloud_job_run(
dbt_cloud_credentials=credentials,
job_id=1,
options=TriggerJobRunOptions(
git_branch="staging",
schema_override="dbt_cloud_pr_123",
dbt_version_override="0.18.0",
target_name_override="staging",
timeout_seconds_override=3000,
generate_docs_override=True,
threads_override=8,
steps_override=[
"dbt seed",
"dbt run --fail-fast",
"dbt test --fail-fast",
],
),
)
trigger_dbt_cloud_job_run()
get_run_id <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L446" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_run_id(obj: Dict)
Task that extracts the run ID from a trigger job run API response,
This task is mainly used to maintain dependency tracking between the
trigger_dbt_cloud_job_run task and downstream tasks/flows that use the run ID.
Args:
obj: The JSON body from the trigger job run response.trigger_dbt_cloud_job_run_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L492" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>trigger_dbt_cloud_job_run_and_wait_for_completion(dbt_cloud_credentials: DbtCloudCredentials, job_id: int, trigger_job_run_options: Optional[TriggerJobRunOptions] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, retry_filtered_models_attempts: int = 3) -> Dict
Flow that triggers a job run and waits for the triggered run to complete.
Args:
dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.job_id: The ID of the job to trigger.trigger_job_run_options: An optional TriggerJobRunOptions instance to
specify overrides for the triggered job run.max_wait_seconds: Maximum number of seconds to wait for job to completepoll_frequency_seconds: Number of seconds to wait in between checks for
run completion.retry_filtered_models_attempts: Number of times to retry models selected by retry_status_filters.Raises:
DbtCloudJobRunCancelled: The triggered dbt Cloud job run was cancelled.DbtCloudJobRunFailed: The triggered dbt Cloud job run failed.RuntimeError: The triggered dbt Cloud job run ended in an unexpected state.Returns:
Examples:
Trigger a dbt Cloud job and wait for completion as a stand alone flow:
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
asyncio.run(
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
),
job_id=1
)
)
Trigger a dbt Cloud job and wait for completion as a sub-flow:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
@flow
def my_flow():
...
run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
),
job_id=1
)
...
my_flow()
Trigger a dbt Cloud job with overrides:
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
from prefect_dbt.cloud.models import TriggerJobRunOptions
asyncio.run(
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
),
job_id=1,
trigger_job_run_options=TriggerJobRunOptions(
git_branch="staging",
schema_override="dbt_cloud_pr_123",
dbt_version_override="0.18.0",
target_name_override="staging",
timeout_seconds_override=3000,
generate_docs_override=True,
threads_override=8,
steps_override=[
"dbt seed",
"dbt run --fail-fast",
"dbt test --fail fast",
],
),
)
)
retry_dbt_cloud_job_run_subset_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L798" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>retry_dbt_cloud_job_run_subset_and_wait_for_completion(dbt_cloud_credentials: DbtCloudCredentials, run_id: int, trigger_job_run_options: Optional[TriggerJobRunOptions] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10) -> Dict
Flow that retrys a subset of dbt Cloud job run, filtered by select statuses, and waits for the triggered retry to complete.
Args:
dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.trigger_job_run_options: An optional TriggerJobRunOptions instance to
specify overrides for the triggered job run.max_wait_seconds: Maximum number of seconds to wait for job to completepoll_frequency_seconds: Number of seconds to wait in between checks for
run completion.run_id: The ID of the job run to retry.Raises:
ValueError: If trigger_job_run_options.steps_override is set by the user.Returns:
Examples:
Retry a subset of models in a dbt Cloud job run and wait for completion:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import retry_dbt_cloud_job_run_subset_and_wait_for_completion
@flow
def retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow():
credentials = DbtCloudCredentials.load("MY_BLOCK_NAME")
retry_dbt_cloud_job_run_subset_and_wait_for_completion(
dbt_cloud_credentials=credentials,
run_id=88640123,
)
retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow()
run_dbt_cloud_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1346" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_dbt_cloud_job(dbt_cloud_job: DbtCloudJob, targeted_retries: int = 3, create_assets: bool = False) -> Dict[str, Any]
Flow that triggers and waits for a dbt Cloud job run, retrying a subset of failed nodes if necessary.
Args:
dbt_cloud_job: Block that holds the information and
methods to interact with a dbt Cloud job.targeted_retries: The number of times to retry failed steps.create_assets: Whether to create Prefect asset materializations
for successfully executed dbt models, seeds, and snapshots.Examples:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
@flow
def run_dbt_cloud_job_flow():
dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
dbt_cloud_job = DbtCloudJob(
dbt_cloud_credentials=dbt_cloud_credentials, job_id=154217
)
return run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job)
run_dbt_cloud_job_flow()
DbtCloudJobRun <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L888" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Class that holds the information and methods to interact with the resulting run of a dbt Cloud job.
Methods:
dbt_cloud_credentials <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L904" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>dbt_cloud_credentials(self) -> DbtCloudCredentials
fetch_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L994" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fetch_result(self, step: Optional[int] = None) -> Dict[str, Any]
Gets the results from the job run. Since the results may not be ready, use wait_for_completion before calling this method.
Args:
step: The index of the step in the run to query for artifacts. The
first step in the run has the index 1. If the step parameter is
omitted, then this method will return the artifacts compiled
for the last step in the run.get_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L952" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_run(self) -> Dict[str, Any]
Makes a request to the dbt Cloud API to get the run data.
Returns:
get_run_artifacts <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1031" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_run_artifacts(self, path: Literal['manifest.json', 'catalog.json', 'run_results.json'], step: Optional[int] = None) -> Union[Dict[str, Any], str]
Get an artifact generated for a completed run.
Args:
path: The relative path to the run artifact.step: The index of the step in the run to query for artifacts. The
first step in the run has the index 1. If the step parameter is
omitted, then this method will return the artifacts compiled
for the last step in the run.Returns:
Dict if the
requested artifact is a JSON file and a str otherwise.get_status_code <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L969" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_status_code(self) -> int
Makes a request to the dbt Cloud API to get the run status.
Returns:
retry_failed_steps <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1197" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>retry_failed_steps(self) -> 'DbtCloudJobRun'
Retries steps that did not complete successfully in a run.
Returns:
wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L981" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>wait_for_completion(self) -> None
Waits for the job run to reach a terminal state.
DbtCloudJob <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1222" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Block that holds the information and methods to interact with a dbt Cloud job.
Attributes:
dbt_cloud_credentials: The credentials to use to authenticate with dbt Cloud.job_id: The id of the dbt Cloud job.timeout_seconds: The number of seconds to wait for the job to complete.interval_seconds:
The number of seconds to wait between polling for job completion.trigger_job_run_options: The options to use when triggering a job run.Examples:
Load a configured dbt Cloud job block.
from prefect_dbt.cloud import DbtCloudJob
dbt_cloud_job = DbtCloudJob.load("BLOCK_NAME")
Triggers a dbt Cloud job, waits for completion, and fetches the results.
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
@flow
def dbt_cloud_job_flow():
dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
dbt_cloud_job = DbtCloudJob.load(
dbt_cloud_credentials=dbt_cloud_credentials,
job_id=154217
)
dbt_cloud_job_run = dbt_cloud_job.trigger()
dbt_cloud_job_run.wait_for_completion()
dbt_cloud_job_run.fetch_result()
return dbt_cloud_job_run
dbt_cloud_job_flow()
Methods:
get_job <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1288" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_job(self, order_by: Optional[str] = None) -> Dict[str, Any]
Retrieve information about a dbt Cloud job.
Args:
order_by: The field to order the results by.Returns:
trigger <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cloud/jobs.py#L1309" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>trigger(self, trigger_job_run_options: Optional[TriggerJobRunOptions] = None) -> DbtCloudJobRun
Triggers a dbt Cloud job.
Returns: