docs/v3/examples/run-dbt-with-prefect.mdx
{/*
This page is automatically generated via the generate_example_pages.py script. Any changes to this page will be overwritten.
*/}
<a href="https://github.com/PrefectHQ/prefect/blob/main/examples/run_dbt_with_prefect.py" target="_blank">View on GitHub</a>
Transform unreliable dbt scripts into production-grade data pipelines with enterprise observability, automatic failure recovery, and zero-downtime deployments.
When you combine Prefect with dbt, you get the perfect marriage of best-in-class analytics tools:
The result? Your analytics team gets reliable, observable data pipelines that leverage the strengths of both platforms. Point this combo at any warehouse and it will transform your data while providing enterprise-grade workflow management.
Note: This example uses dbt Core (the open-source CLI). For dbt Cloud integration, see the dbt Cloud guide in the Prefect documentation.
This example demonstrates these Prefect features:
@task – wrap dbt commands in retries & observability.log_prints – surface dbt output automatically in Prefect logs.Your analytics team uses dbt to model data in DuckDB for rapid local development and testing, but deploys to Snowflake in production. You need a workflow that:
Write three focused Python functions (download project, run dbt commands, orchestrate workflow), add Prefect decorators, and let Prefect handle retries, logging, and scheduling. The entire example is self-contained – no git client or global dbt configuration required.
For more on integrating Prefect with dbt, see the prefect-dbt guide.
python examples/run_dbt_with_prefect.py
Watch as Prefect orchestrates the complete dbt lifecycle: downloading the project, running models, executing tests, and materializing results. The flow creates a local DuckDB file you can explore with any SQL tool.
import io
import shutil
import urllib.request
import zipfile
from datetime import timedelta
from pathlib import Path
from typing import Any
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
from prefect import flow, task
DEFAULT_REPO_ZIP = (
"https://github.com/PrefectHQ/examples/archive/refs/heads/examples-markdown.zip"
)
PROJECT_DIR = Path(__file__).parent / "prefect_dbt_project"
To keep this example fully self-contained, we download a demo dbt project directly from GitHub as a ZIP file. This means users don't need git installed. Learn more about tasks in the Prefect documentation
def _project_cache_key(_context: Any, parameters: dict[str, Any]) -> str:
"""Cache key that invalidates when the extracted project is missing.
The task returns a local filesystem path, so a URL-only cache key
would happily replay a stale `Path` pointing at a directory that
has since been deleted (CI ephemeral storage, a manual `rm -rf`,
or a workspace cleanup). Fold the directory's existence into the
key so a missing project forces a fresh download.
"""
repo_zip_url = parameters.get("repo_zip_url", DEFAULT_REPO_ZIP)
return f"{repo_zip_url}:exists={PROJECT_DIR.exists()}"
@task(
retries=2,
retry_delay_seconds=5,
log_prints=True,
cache_key_fn=_project_cache_key,
cache_expiration=timedelta(hours=1),
)
def build_dbt_project(repo_zip_url: str = DEFAULT_REPO_ZIP) -> Path:
"""Download and extract the demo dbt project, returning its local path.
To keep the example fully self-contained we grab the GitHub archive as a ZIP
so users do **not** need `git` installed. The project is extracted from the
PrefectHQ/examples repository into a sibling directory next to this script
(`prefect_dbt_project`).
The task is cached for one hour via Prefect's task cache keyed on the
archive URL *and* whether the local project directory still exists —
see `_project_cache_key` above. If the directory is deleted between
runs the key changes, so the task re-downloads instead of replaying
a stale `Path`. A local short-circuit inside the task also skips the
network call when the extracted directory is already present.
"""
project_dir = PROJECT_DIR
if project_dir.exists():
print(f"Using cached dbt project at {project_dir}\n")
return project_dir
tmp_extract_base = project_dir.parent / "_tmp_dbt_extract"
if tmp_extract_base.exists():
shutil.rmtree(tmp_extract_base)
print(f"Downloading dbt project archive → {repo_zip_url}\n")
with urllib.request.urlopen(repo_zip_url) as resp:
data = resp.read()
with zipfile.ZipFile(io.BytesIO(data)) as zf:
zf.extractall(tmp_extract_base)
# Find the folder containing dbt_project.yml (in resources/prefect_dbt_project)
candidates = list(
tmp_extract_base.rglob("**/resources/prefect_dbt_project/dbt_project.yml")
)
if not candidates:
raise ValueError(
"dbt_project.yml not found in resources/prefect_dbt_project – structure unexpected"
)
project_root = candidates[0].parent
shutil.move(str(project_root), str(project_dir))
shutil.rmtree(tmp_extract_base)
print(f"Extracted dbt project to {project_dir}\n")
return project_dir
This task creates a simple profiles.yml file for DuckDB so dbt can connect to the database. This keeps the example self-contained.
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def create_dbt_profiles(project_dir: Path) -> None:
"""Create a profiles.yml file for DuckDB connection.
This creates a simple DuckDB profile so dbt can run without external
database configuration. The profile points to a local DuckDB file.
This will overwrite any existing profiles.yml to ensure correct formatting.
"""
profiles_content = f"""demo:
outputs:
dev:
type: duckdb
path: {project_dir}/demo.duckdb
threads: 1
target: dev"""
profiles_path = project_dir / "profiles.yml"
with open(profiles_path, "w") as f:
f.write(profiles_content)
print(f"Created/updated profiles.yml at {profiles_path}")
This task uses the modern PrefectDbtRunner from prefect-dbt integration which provides native dbt execution with enhanced logging, failure handling, and automatic event emission. Learn more about retries in the Prefect documentation
@task(retries=2, retry_delay_seconds=5, log_prints=True)
def run_dbt_commands(commands: list[str], project_dir: Path) -> None:
"""Run dbt commands using the modern prefect-dbt integration.
Uses PrefectDbtRunner which provides enhanced logging, failure handling,
and automatic Prefect event emission for dbt node status changes.
This is much more robust than subprocess calls and integrates natively
with Prefect's observability features.
"""
print(f"Running dbt commands: {commands}\n")
# Configure dbt settings to point to our project directory
settings = PrefectDbtSettings(
project_dir=str(project_dir),
profiles_dir=str(project_dir), # Use project dir for profiles too
)
# Create runner and execute commands. `raise_on_failure=True` (the
# default) turns any failed dbt node into a Python exception, which
# triggers the enclosing Prefect task's retries and marks the task
# — and ultimately the flow — as Failed if the retries are exhausted.
runner = PrefectDbtRunner(settings=settings)
for command in commands:
print(f"Executing: dbt {command}")
runner.invoke(command.split())
print(f"Completed: dbt {command}\n")
This flow orchestrates the standard dbt workflow: deps → seed → run → test. Each step is a separate task run in Prefect, providing granular observability and automatic retry handling for any step that fails. Now using the flexible prefect-dbt integration for enhanced dbt execution. Learn more about flows in the Prefect documentation
@flow(name="dbt_flow", log_prints=True)
def dbt_flow(repo_zip_url: str = DEFAULT_REPO_ZIP) -> None:
"""Run the demo dbt project with Prefect using prefect-dbt integration.
Steps executed:
1. Download and setup the dbt project
2. Create profiles.yml for DuckDB connection
3. `dbt deps` – download any package dependencies (none for this tiny demo).
4. `dbt seed` – load seed CSVs if they exist (safe to run even when empty).
5. `dbt run` – build the model(s) defined under `models/`.
6. `dbt test` – execute any tests declared in the project.
Each step runs as a separate Prefect task with automatic retries and logging.
Uses the modern prefect-dbt integration for enhanced observability and
native dbt execution.
"""
project_dir = build_dbt_project(repo_zip_url)
create_dbt_profiles(project_dir)
# dbt commands – executed sequentially using prefect-dbt integration
run_dbt_commands(["deps"], project_dir)
run_dbt_commands(["seed"], project_dir)
run_dbt_commands(["run"], project_dir)
run_dbt_commands(["test"], project_dir)
# Let users know where the DuckDB file was written for exploration
duckdb_path = project_dir / "demo.duckdb"
print(f"\nDone! DuckDB file located at: {duckdb_path.resolve()}")
Here's the sequence of events when you run this flow:
cache_key_fn that folds the archive URL together with the local directory's existence, so repeat runs inside an hour reuse the prior download but a deleted prefect_dbt_project/ directory invalidates the cache and forces a fresh fetch.profiles.yml pointing dbt at the on-disk DuckDB file.deps, seed, run, and test.prefect-dbt for enhanced logging, failure handling, and automatic event emission.prefect_dbt_project/demo.duckdb ready for analysis.Prefect + prefect-dbt transformed a series of shell commands into a resilient, observable workflow — no hand-authored YAML, no cron jobs, just Python with enterprise-grade dbt integration. (dbt's own profiles.yml still lives next to the project; this flow writes it programmatically so you never have to edit it by hand.)
Traditional dbt orchestration often involves brittle shell scripts, complex YAML configurations, or heavyweight workflow tools. Prefect with the prefect-dbt integration gives you enterprise-grade orchestration with zero operational overhead:
PrefectDbtRunner provides enhanced logging, failure handling, and event emissionThis pattern scales from prototype analytics to production data platforms. Whether you're running dbt against DuckDB for rapid local iteration or Snowflake for enterprise analytics, Prefect ensures your workflows are reliable, observable, and maintainable.
To learn more about orchestrating analytics workflows with Prefect, check out:
if __name__ == "__main__":
dbt_flow()