Back to Prefect

dbt Model Orchestration

docs/v3/examples/run-dbt-with-prefect.mdx

3.6.30.dev315.2 KB
Original Source

{/* This page is automatically generated via the generate_example_pages.py script. Any changes to this page will be overwritten. */} <a href="https://github.com/PrefectHQ/prefect/blob/main/examples/run_dbt_with_prefect.py" target="_blank">View on GitHub</a>

Transform unreliable dbt scripts into production-grade data pipelines with enterprise observability, automatic failure recovery, and zero-downtime deployments.

When you combine Prefect with dbt, you get the perfect marriage of best-in-class analytics tools:

  • Python gives you the flexibility to integrate with any data source, API, or system your analytics need.
  • dbt Core handles the heavy lifting of SQL transformations, testing, and documentation.
  • Prefect wraps the entire workflow in battle-tested orchestration: automatic retries, scheduling, and observability.

The result? Your analytics team gets reliable, observable data pipelines that leverage the strengths of both platforms. Point this combo at any warehouse and it will transform your data while providing enterprise-grade workflow management.

Note: This example uses dbt Core (the open-source CLI). For dbt Cloud integration, see the dbt Cloud guide in the Prefect documentation.

This example demonstrates these Prefect features:

  • @task – wrap dbt commands in retries & observability.
  • log_prints – surface dbt output automatically in Prefect logs.
  • Automatic retries with exponential back-off for flaky network connections.
  • prefect-dbt integration – native dbt execution with enhanced logging and failure handling.

The Scenario: Reliable Analytics Workflows

Your analytics team uses dbt to model data in DuckDB for rapid local development and testing, but deploys to Snowflake in production. You need a workflow that:

  • Anyone can run locally without complex setup (DuckDB)
  • Automatically retries on network failures or temporary dbt errors
  • Provides clear logs and observability for debugging
  • Can be easily scheduled and deployed to production

Our Solution

Write three focused Python functions (download project, run dbt commands, orchestrate workflow), add Prefect decorators, and let Prefect handle retries, logging, and scheduling. The entire example is self-contained – no git client or global dbt configuration required.

For more on integrating Prefect with dbt, see the prefect-dbt guide.

Running the example locally

bash
python examples/run_dbt_with_prefect.py

Watch as Prefect orchestrates the complete dbt lifecycle: downloading the project, running models, executing tests, and materializing results. The flow creates a local DuckDB file you can explore with any SQL tool.

Code walkthrough

  1. Project Setup – Download and cache a demo dbt project from GitHub
  2. dbt CLI Wrapper – Execute dbt commands with automatic retries and logging using prefect-dbt
  3. Orchestration Flow – Run the complete dbt lifecycle in sequence
  4. Execution – Self-contained example that works out of the box
python
import io
import shutil
import urllib.request
import zipfile
from datetime import timedelta
from pathlib import Path
from typing import Any

from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings

from prefect import flow, task

DEFAULT_REPO_ZIP = (
    "https://github.com/PrefectHQ/examples/archive/refs/heads/examples-markdown.zip"
)

PROJECT_DIR = Path(__file__).parent / "prefect_dbt_project"


Project Setup – download and cache dbt project

To keep this example fully self-contained, we download a demo dbt project directly from GitHub as a ZIP file. This means users don't need git installed. Learn more about tasks in the Prefect documentation

python
def _project_cache_key(_context: Any, parameters: dict[str, Any]) -> str:
    """Cache key that invalidates when the extracted project is missing.

    The task returns a local filesystem path, so a URL-only cache key
    would happily replay a stale `Path` pointing at a directory that
    has since been deleted (CI ephemeral storage, a manual `rm -rf`,
    or a workspace cleanup). Fold the directory's existence into the
    key so a missing project forces a fresh download.
    """

    repo_zip_url = parameters.get("repo_zip_url", DEFAULT_REPO_ZIP)
    return f"{repo_zip_url}:exists={PROJECT_DIR.exists()}"


@task(
    retries=2,
    retry_delay_seconds=5,
    log_prints=True,
    cache_key_fn=_project_cache_key,
    cache_expiration=timedelta(hours=1),
)
def build_dbt_project(repo_zip_url: str = DEFAULT_REPO_ZIP) -> Path:
    """Download and extract the demo dbt project, returning its local path.

    To keep the example fully self-contained we grab the GitHub archive as a ZIP
    so users do **not** need `git` installed. The project is extracted from the
    PrefectHQ/examples repository into a sibling directory next to this script
    (`prefect_dbt_project`).

    The task is cached for one hour via Prefect's task cache keyed on the
    archive URL *and* whether the local project directory still exists —
    see `_project_cache_key` above. If the directory is deleted between
    runs the key changes, so the task re-downloads instead of replaying
    a stale `Path`. A local short-circuit inside the task also skips the
    network call when the extracted directory is already present.
    """

    project_dir = PROJECT_DIR
    if project_dir.exists():
        print(f"Using cached dbt project at {project_dir}\n")
        return project_dir

    tmp_extract_base = project_dir.parent / "_tmp_dbt_extract"
    if tmp_extract_base.exists():
        shutil.rmtree(tmp_extract_base)

    print(f"Downloading dbt project archive → {repo_zip_url}\n")
    with urllib.request.urlopen(repo_zip_url) as resp:
        data = resp.read()

    with zipfile.ZipFile(io.BytesIO(data)) as zf:
        zf.extractall(tmp_extract_base)

    # Find the folder containing dbt_project.yml (in resources/prefect_dbt_project)
    candidates = list(
        tmp_extract_base.rglob("**/resources/prefect_dbt_project/dbt_project.yml")
    )
    if not candidates:
        raise ValueError(
            "dbt_project.yml not found in resources/prefect_dbt_project – structure unexpected"
        )

    project_root = candidates[0].parent
    shutil.move(str(project_root), str(project_dir))
    shutil.rmtree(tmp_extract_base)

    print(f"Extracted dbt project to {project_dir}\n")
    return project_dir



Create profiles.yml for DuckDB – needed for dbt to work

This task creates a simple profiles.yml file for DuckDB so dbt can connect to the database. This keeps the example self-contained.

python
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def create_dbt_profiles(project_dir: Path) -> None:
    """Create a profiles.yml file for DuckDB connection.

    This creates a simple DuckDB profile so dbt can run without external
    database configuration. The profile points to a local DuckDB file.
    This will overwrite any existing profiles.yml to ensure correct formatting.
    """

    profiles_content = f"""demo:
  outputs:
    dev:
      type: duckdb
      path: {project_dir}/demo.duckdb
      threads: 1
  target: dev"""

    profiles_path = project_dir / "profiles.yml"
    with open(profiles_path, "w") as f:
        f.write(profiles_content)

    print(f"Created/updated profiles.yml at {profiles_path}")



dbt CLI Wrapper – execute commands with retries and logging using prefect-dbt

This task uses the modern PrefectDbtRunner from prefect-dbt integration which provides native dbt execution with enhanced logging, failure handling, and automatic event emission. Learn more about retries in the Prefect documentation

python
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def run_dbt_commands(commands: list[str], project_dir: Path) -> None:
    """Run dbt commands using the modern prefect-dbt integration.

    Uses PrefectDbtRunner which provides enhanced logging, failure handling,
    and automatic Prefect event emission for dbt node status changes.
    This is much more robust than subprocess calls and integrates natively
    with Prefect's observability features.
    """

    print(f"Running dbt commands: {commands}\n")

    # Configure dbt settings to point to our project directory
    settings = PrefectDbtSettings(
        project_dir=str(project_dir),
        profiles_dir=str(project_dir),  # Use project dir for profiles too
    )

    # Create runner and execute commands. `raise_on_failure=True` (the
    # default) turns any failed dbt node into a Python exception, which
    # triggers the enclosing Prefect task's retries and marks the task
    # — and ultimately the flow — as Failed if the retries are exhausted.
    runner = PrefectDbtRunner(settings=settings)

    for command in commands:
        print(f"Executing: dbt {command}")
        runner.invoke(command.split())
        print(f"Completed: dbt {command}\n")



Orchestration Flow – run the complete dbt lifecycle

This flow orchestrates the standard dbt workflow: deps → seed → run → test. Each step is a separate task run in Prefect, providing granular observability and automatic retry handling for any step that fails. Now using the flexible prefect-dbt integration for enhanced dbt execution. Learn more about flows in the Prefect documentation

python
@flow(name="dbt_flow", log_prints=True)
def dbt_flow(repo_zip_url: str = DEFAULT_REPO_ZIP) -> None:
    """Run the demo dbt project with Prefect using prefect-dbt integration.

    Steps executed:
    1. Download and setup the dbt project
    2. Create profiles.yml for DuckDB connection
    3. `dbt deps`   – download any package dependencies (none for this tiny demo).
    4. `dbt seed`   – load seed CSVs if they exist (safe to run even when empty).
    5. `dbt run`    – build the model(s) defined under `models/`.
    6. `dbt test`   – execute any tests declared in the project.

    Each step runs as a separate Prefect task with automatic retries and logging.
    Uses the modern prefect-dbt integration for enhanced observability and
    native dbt execution.
    """

    project_dir = build_dbt_project(repo_zip_url)
    create_dbt_profiles(project_dir)

    # dbt commands – executed sequentially using prefect-dbt integration
    run_dbt_commands(["deps"], project_dir)
    run_dbt_commands(["seed"], project_dir)
    run_dbt_commands(["run"], project_dir)
    run_dbt_commands(["test"], project_dir)

    # Let users know where the DuckDB file was written for exploration
    duckdb_path = project_dir / "demo.duckdb"
    print(f"\nDone! DuckDB file located at: {duckdb_path.resolve()}")


What Just Happened?

Here's the sequence of events when you run this flow:

  1. Project Download – Prefect registered a task run to download and extract the dbt project from GitHub. The task uses Prefect's task cache with a custom cache_key_fn that folds the archive URL together with the local directory's existence, so repeat runs inside an hour reuse the prior download but a deleted prefect_dbt_project/ directory invalidates the cache and forces a fresh fetch.
  2. Profiles Setup – A second task run wrote a local profiles.yml pointing dbt at the on-disk DuckDB file.
  3. dbt Lifecycle – Four further task runs executed the standard dbt workflow: deps, seed, run, and test.
  4. Native dbt Integration – Each dbt command was executed through prefect-dbt for enhanced logging, failure handling, and automatic event emission.
  5. Automatic Retries – Each dbt command would automatically retry on failure (network issues, temporary dbt errors, etc.).
  6. Centralized Logging – All dbt output streamed directly to Prefect logs with proper log level mapping.
  7. Event Emission – Prefect automatically emitted events for each dbt node execution, enabling advanced monitoring and alerting.
  8. Local Results – A DuckDB file appeared at prefect_dbt_project/demo.duckdb ready for analysis.

Prefect + prefect-dbt transformed a series of shell commands into a resilient, observable workflow — no hand-authored YAML, no cron jobs, just Python with enterprise-grade dbt integration. (dbt's own profiles.yml still lives next to the project; this flow writes it programmatically so you never have to edit it by hand.)

Why This Matters

Traditional dbt orchestration often involves brittle shell scripts, complex YAML configurations, or heavyweight workflow tools. Prefect with the prefect-dbt integration gives you enterprise-grade orchestration with zero operational overhead:

  • Reliability: Automatic retries with exponential backoff handle transient failures
  • Native Integration: PrefectDbtRunner provides enhanced logging, failure handling, and event emission
  • Observability: Every dbt command and node is logged, timed, and searchable in the Prefect UI with proper log level mapping
  • Event-Driven: Automatic Prefect events for dbt node status changes enable advanced monitoring and alerting
  • Portability: The same Python file runs locally, in CI/CD, and in production
  • Composability: Easily extend this flow with data quality checks, Slack alerts, or downstream dependencies

This pattern scales from prototype analytics to production data platforms. Whether you're running dbt against DuckDB for rapid local iteration or Snowflake for enterprise analytics, Prefect ensures your workflows are reliable, observable, and maintainable.

To learn more about orchestrating analytics workflows with Prefect, check out:

python
if __name__ == "__main__":
    dbt_flow()