docs/integrations/prefect-dbt/dbt-cloud.mdx
If you have an existing dbt Cloud job, use the pre-built flow run_dbt_cloud_job to trigger a job run and wait until the job run is finished. If some nodes fail, run_dbt_cloud_job can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a DbtCloudJob block.
prefect block register -m prefect_dbt
Blocks can be created through code or through the UI.
To create a dbt Cloud Credentials block:
https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>.from prefect_dbt.cloud import DbtCloudCredentials
DbtCloudCredentials(
api_key="API-KEY-PLACEHOLDER",
account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
dbt_cloud_credentials=dbt_cloud_credentials,
job_id="JOB-ID-PLACEHOLDER",
)
dbt_cloud_job.save("JOB-BLOCK-NAME-PLACEHOLDER")
import asyncio
from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
@flow
async def run_dbt_job_flow():
dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
return await run_dbt_cloud_job(
dbt_cloud_job=dbt_cloud_job,
targeted_retries=0,
)
if __name__ == "__main__":
asyncio.run(run_dbt_job_flow())
Set create_assets=True to emit Prefect asset materializations for the dbt models, seeds, and snapshots that completed successfully in the dbt Cloud run.
Assets are created from the run's dbt artifacts, so the dbt Cloud job must make manifest.json and run_results.json available.
import asyncio
from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
@flow
async def run_dbt_job_flow():
dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
return await run_dbt_cloud_job(
dbt_cloud_job=dbt_cloud_job,
targeted_retries=0,
create_assets=True,
)
if __name__ == "__main__":
asyncio.run(run_dbt_job_flow())
prefect_dbt.cloud API.