Back to Prefect

dbt Cloud

docs/integrations/prefect-dbt/dbt-cloud.mdx

3.6.30.dev33.1 KB
Original Source

If you have an existing dbt Cloud job, use the pre-built flow run_dbt_cloud_job to trigger a job run and wait until the job run is finished. If some nodes fail, run_dbt_cloud_job can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a DbtCloudJob block.

<Info> Before creating blocks for the first time, register `prefect-dbt`'s block types:
bash
prefect block register -m prefect_dbt
</Info>

Save dbt Cloud credentials to a block

Blocks can be created through code or through the UI.

To create a dbt Cloud Credentials block:

  1. Log into your dbt Cloud account.
  2. Click API Tokens on the sidebar.
  3. Copy a Service Token.
  4. Copy the account ID from the URL: https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>.
  5. Create and run the following script, replacing the placeholders:
python
from prefect_dbt.cloud import DbtCloudCredentials


DbtCloudCredentials(
    api_key="API-KEY-PLACEHOLDER",
    account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Create a dbt Cloud job block

  1. In dbt Cloud, click on Deploy -> Jobs.
  2. Select a job.
  3. Copy the job ID from the URL: https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>
  4. Create and run the following script, replacing the placeholders.
python
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob


dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
    dbt_cloud_credentials=dbt_cloud_credentials,
    job_id="JOB-ID-PLACEHOLDER",
)
dbt_cloud_job.save("JOB-BLOCK-NAME-PLACEHOLDER")

Run a dbt Cloud job and wait for completion

python
import asyncio

from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
async def run_dbt_job_flow():
    dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
    return await run_dbt_cloud_job(
        dbt_cloud_job=dbt_cloud_job,
        targeted_retries=0,
    )


if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())

Create Prefect assets from a dbt Cloud job

Set create_assets=True to emit Prefect asset materializations for the dbt models, seeds, and snapshots that completed successfully in the dbt Cloud run.

Assets are created from the run's dbt artifacts, so the dbt Cloud job must make manifest.json and run_results.json available.

python
import asyncio

from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
async def run_dbt_job_flow():
    dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER")
    return await run_dbt_cloud_job(
        dbt_cloud_job=dbt_cloud_job,
        targeted_retries=0,
        create_assets=True,
    )


if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())

See also