docs/integrations/prefect-dbt/runner.mdx
Versions 0.7.0 and later of prefect-dbt include the PrefectDbtRunner class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and automatic asset lineage.
from prefect import flow
from prefect_dbt import PrefectDbtRunner
@flow
def run_dbt():
PrefectDbtRunner().invoke(["build"])
if __name__ == "__main__":
run_dbt()
When calling .invoke() in a flow or task, each node in dbt's execution graph is reflected as a task in Prefect's execution graph.
Logs from each node will belong to the corresponding task, and each task's state is determined by the state of that node's execution.
15:54:59.119 | INFO | Flow run 'imposing-partridge' - Found 8 models, 3 seeds, 18 data tests, 543 macros
15:54:59.134 | INFO | Flow run 'imposing-partridge' -
15:54:59.148 | INFO | Flow run 'imposing-partridge' - Concurrency: 1 threads (target='dev')
15:54:59.164 | INFO | Flow run 'imposing-partridge' -
15:54:59.665 | INFO | Task run 'model my_first_dbt_model' - 1 of 29 OK created sql table model main.my_first_dbt_model ..................... [OK in 0.18s]
15:54:59.671 | INFO | Task run 'model my_first_dbt_model' - Finished in state Completed()
...
15:55:02.373 | ERROR | Task run 'model product_metrics' - Runtime Error in model product_metrics (models/marts/product/product_metrics.sql)
Binder Error: Values list "o" does not have a column named "product_id"
LINE 47: on p.product_id = o.product_id
15:55:02.857 | ERROR | Task run 'model product_metrics' - Finished in state Failed('Task run encountered an exception Exception: Node model.demo.product_metrics finished with status error')
Use dbt's native retry functionality in combination with runtime data from prefect to retry failed nodes.
from prefect import flow
from prefect.runtime.flow_run import get_run_count
from prefect_dbt import PrefectDbtRunner
@flow(retries=2)
def run_dbt():
runner = PrefectDbtRunner()
if get_run_count() == 1:
runner.invoke(["build"])
else:
runner.invoke(["retry"])
if __name__ == "__main__":
run_dbt()
Prefect Cloud maintains a graph of assets, objects produced by your workflows.
Any dbt seed, source or model will appear on your asset graph in Prefect Cloud once it has been executed using the PrefectDbtRunner.
The upstream dependencies of an asset materialized by prefect-dbt are derived from the depends_on field in dbt's manifest.json.
The asset's key will be its corresponding dbt resource's relation_name.
The name asset property is derived from the dbt resource's relation_name with adapter-specific quoting characters removed (for example, "dev"."main_marts"."product_metrics" becomes dev.main_marts.product_metrics). The description property is populated from the dbt resource's description.
The owners asset property is populated if there is data assigned to the owner key under a resource's meta config.
models:
- name: product_metrics
description: "Product metrics and categorization"
config:
meta:
owner: "kevin-g"
Asset metadata is collected from the result of the node's execution.
{
"node_path": "marts/product/product_metrics.sql",
"node_name": "product_metrics",
"unique_id": "model.demo.product_metrics",
"resource_type": "model",
"materialized": "table",
"node_status": "error",
"node_started_at": "2025-06-26T20:55:05.661126",
"node_finished_at": "2025-06-26T20:55:05.733257",
"meta": {
"owner": "kevin-g"
},
"node_relation": {
"database": "dev",
"schema": "main_marts",
"alias": "product_metrics",
"relation_name": "\"dev\".\"main_marts\".\"product_metrics\""
}
}
Optionally, the compiled code of a dbt model can be appended to the asset description.
from prefect import flow
from prefect_dbt import PrefectDbtRunner
@flow
def run_dbt():
PrefectDbtRunner(include_compiled_code=True).invoke(["build"])
if __name__ == "__main__":
run_dbt()
The PrefectDbtSettings class, based on Pydantic's BaseSettings class, automatically detects DBT_-prefixed environment variables that have a direct effect on the PrefectDbtRunner class.
If no environment variables are set, dbt's defaults are used.
Provide a PrefectDbtSettings instance to PrefectDbtRunner to customize dbt settings or override environment variables.
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
@flow
def run_dbt():
PrefectDbtRunner(
settings=PrefectDbtSettings(
project_dir="test",
profiles_dir="examples/run_dbt"
)
).invoke(["build"])
if __name__ == "__main__":
run_dbt()
The PrefectDbtRunner class maps all dbt log levels to standard Python logging levels, so filtering for log levels like WARNING or ERROR in the Prefect UI applies to dbt's logs.
By default, the logging level used by dbt is Prefect's logging level, which can be configured using the PREFECT_LOGGING_LEVEL Prefect setting.
The dbt logging level can be set independently from Prefect's by using the DBT_LOG_LEVEL environment variable, setting log_level in PrefectDbtSettings, or passing the --log-level flag or log_level kwarg to .invoke().
Only logging levels of higher severity (more restrictive) than Prefect's logging level will have an effect.
from dbt_common.events.base_types import EventLevel
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
@flow
def run_dbt():
PrefectDbtRunner(
settings=PrefectDbtSettings(
project_dir="test",
profiles_dir="examples/run_dbt",
log_level=EventLevel.ERROR, # explicitly choose a higher log level for dbt
)
).invoke(["build"])
if __name__ == "__main__":
run_dbt()
profiles.yml templatingThe PrefectDbtRunner class supports templating in your profiles.yml file, allowing you to reference Prefect blocks and variables that will be resolved at runtime.
This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace.
For example, a Prefect variable called target can have a different value in development (dev) and production (prod) workspaces.
This allows you to use the same profiles.yml file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.
example:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1
prod:
type: snowflake
account: "{{ prefect.blocks.snowflake-credentials.warehouse-access.account }}"
user: "{{ prefect.blocks.snowflake-credentials.warehouse-access.user }}"
password: "{{ prefect.blocks.snowflake-credentials.warehouse-access.password }}"
database: "{{ prefect.blocks.snowflake-connector.prod-connector.database }}"
schema: "{{ prefect.blocks.snowflake-connector.prod-connector.schema }}"
warehouse: "{{ prefect.blocks.snowflake-connector.prod-connector.warehouse }}"
threads: 4
target: "{{ prefect.variables.target }}"
By default, any dbt node execution failures cause the entire dbt run to raise an exception with a message containing detailed information about the failure.
Failures detected during invocation of dbt command 'build':
Test not_null_my_first_dbt_model_id failed with message: "Got 1 result, configured to fail if != 0"
The PrefectDbtRunner's raise_on_failure option can be set to False to prevent failures in dbt from causing the failure of the flow or task in which .invoke() is called.
from prefect import flow
from prefect_dbt import PrefectDbtRunner
@flow
def run_dbt():
PrefectDbtRunner(
raise_on_failure=False # Failed tests will not fail the flow run
).invoke(["build"])
if __name__ == "__main__":
run_dbt()
You can disable automatic asset lineage detection for all resources in your dbt project config, or for specific resources in their own config:
prefect:
enable_assets: False
PrefectDbtRunner supports decorator-based lifecycle hooks that let you react to events during a dbt invocation.
Hooks are registered on a runner instance and receive a DbtHookContext with information about the event.
from prefect import flow
from prefect_dbt import PrefectDbtRunner, DbtHookContext
runner = PrefectDbtRunner()
@runner.on_run_start
def before_build(ctx: DbtHookContext):
print(f"Starting dbt command: {ctx.command} with args: {ctx.args}")
@runner.post_model(select="tag:critical")
def on_critical_done(ctx: DbtHookContext):
if ctx.status == "error":
print(f"Critical model {ctx.node_id} failed: {ctx.error}")
@runner.on_run_end(select="tag:marts")
def after_marts(ctx: DbtHookContext):
print(f"Run finished with status: {ctx.status}")
if ctx.run_results:
print(f"Matched nodes: {len(ctx.run_results)}")
@flow
def run_dbt():
runner.invoke(["build"])
if __name__ == "__main__":
run_dbt()
| Hook | Level | Accepts select= | When it runs |
|---|---|---|---|
<nobr>@runner.on_run_start</nobr> | Run | No | Before dbt begins executing any nodes |
<nobr>@runner.post_model</nobr> | Node | Yes (optional) | After each model node finishes |
<nobr>@runner.on_run_end</nobr> | Run | Yes (optional) | After all nodes have finished and dbt returns a result |
When select= is provided, the hook only fires for nodes matching the given dbt selector.
For on_run_end, a selector filters by the set of node IDs that participated in the run.
DbtHookContext fieldsEach hook receives a DbtHookContext dataclass. The fields available depend on the hook event:
| Field | Type | on_run_start | post_model | on_run_end |
|---|---|---|---|---|
event | str | "run_start" | "post_model" | "run_end" |
command | str | The dbt command (e.g. "build") | Same | Same |
owner | PrefectDbtRunner | The runner instance | Same | Same |
args | tuple[str, ...] | CLI args passed to invoke | Same | Same |
node | ManifestNode | None | None | The dbt manifest node | None |
node_id | str | None | None | The node's unique_id | None |
status | str | None | None | "success", "error", or "skipped" | "success" or "error" (overall run) |
result | dict | None | None | Event data and message from dbt | {"success": bool} |
run_results | dict | None | None | None | Per-node result artifacts keyed by unique_id |
error | Any | None | Error message if the node failed | Exception if the run failed |
node_ids | tuple[str, ...] | () | Contains the single node ID | All node IDs from run results |
Hooks are best-effort: if a hook raises an exception, Prefect logs a warning and dbt execution continues normally.
Do not use hooks as failure-control mechanisms for dbt — use dbt's own on-run-end hooks
or Prefect automations for critical failure responses.
The examples below use placeholder functions (emit_metric, notify, etc.) for external systems — wire them up to your own metrics or notification client.
Combine post_model(select=...) with ctx.status to react to failures of important models without filtering inside the hook body. The selector uses dbt's node selection syntax, so any tag, FQN, or graph operator your project uses will work.
from prefect_dbt import DbtHookContext, PrefectDbtRunner
def emit_metric(name: str, tags: dict[str, str]) -> None:
"""Replace with your metrics client (Datadog, statsd, OpenTelemetry, ...)."""
...
runner = PrefectDbtRunner()
@runner.post_model(select="tag:critical")
def alert_on_critical_failure(ctx: DbtHookContext) -> None:
if ctx.status != "error":
return
emit_metric(
"dbt.critical_model.failed",
tags={
"node_id": ctx.node_id or "unknown",
"command": ctx.command,
},
)
runner.invoke(["build"])
Use on_run_end to log or post a summary derived from ctx.run_results and ctx.status. The hook fires for both successful and failed runs.
from collections import Counter
from prefect_dbt import DbtHookContext, PrefectDbtRunner
def notify(channel: str, message: str) -> None:
"""Replace with Slack, email, PagerDuty, etc."""
...
runner = PrefectDbtRunner()
@runner.on_run_end
def summarize_run(ctx: DbtHookContext) -> None:
results = ctx.run_results or {}
statuses = Counter(node.get("status") for node in results.values())
notify(
"#data-platform",
f"dbt {ctx.command} finished with status={ctx.status}; "
f"per-node: {dict(statuses)}",
)
runner.invoke(["build"])
run_results safelyctx.run_results is None when dbt could not produce results (for example, a parse failure during compile), and individual entries may be missing fields depending on the dbt version. Use .get() with defaults rather than indexing.
from prefect_dbt import DbtHookContext, PrefectDbtRunner
runner = PrefectDbtRunner()
@runner.on_run_end
def collect_long_running_models(ctx: DbtHookContext) -> None:
if ctx.status == "error" and ctx.error is not None:
# Compile-time errors leave run_results unset.
print(f"dbt run did not produce results: {ctx.error}")
return
slow_models: list[tuple[str, float]] = []
for unique_id, result in (ctx.run_results or {}).items():
if not unique_id.startswith("model."):
continue
execution_time = float(result.get("execution_time") or 0.0)
if execution_time >= 30.0:
slow_models.append((unique_id, execution_time))
for unique_id, execution_time in sorted(
slow_models, key=lambda item: item[1], reverse=True
):
print(f"slow model {unique_id}: {execution_time:.1f}s")
runner.invoke(["build"])
select= with post_model and on_run_endBoth post_model and on_run_end accept dbt selectors. on_run_start does not — it always sees the full set of nodes scheduled for the run, and passing select= raises TypeError.
from prefect_dbt import DbtHookContext, PrefectDbtRunner
runner = PrefectDbtRunner()
@runner.on_run_start
def before_run(ctx: DbtHookContext) -> None:
# Filter inside the hook if you only care about certain nodes.
mart_nodes = [nid for nid in ctx.node_ids if nid.startswith("model.marts.")]
print(f"about to run {len(mart_nodes)} mart models")
@runner.post_model(select="tag:pii")
def audit_pii_models(ctx: DbtHookContext) -> None:
print(f"finished PII model {ctx.node_id} with status={ctx.status}")
@runner.on_run_end(select="tag:marts")
def after_marts(ctx: DbtHookContext) -> None:
# Fires only when at least one mart node participated in the run.
print(f"marts touched in command {ctx.command!r}; status={ctx.status}")
runner.invoke(["build"])
PrefectDbtRunner API.