Back to Prefect

PrefectDbtRunner

docs/integrations/prefect-dbt/runner.mdx

3.6.30.dev316.7 KB
Original Source

Versions 0.7.0 and later of prefect-dbt include the PrefectDbtRunner class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and automatic asset lineage.

<Tip> The `PrefectDbtRunner` is inspired by the `DbtRunner` from dbt Core, and its `invoke` method accepts the same arguments. Refer to the [`DbtRunner` documentation](https://docs.getdbt.com/reference/programmatic-invocations) for more information on how to call `invoke`. </Tip>

Basic usage

python
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner().invoke(["build"])


if __name__ == "__main__":
    run_dbt()

When calling .invoke() in a flow or task, each node in dbt's execution graph is reflected as a task in Prefect's execution graph. Logs from each node will belong to the corresponding task, and each task's state is determined by the state of that node's execution.

bash
15:54:59.119 | INFO    | Flow run 'imposing-partridge' - Found 8 models, 3 seeds, 18 data tests, 543 macros
15:54:59.134 | INFO    | Flow run 'imposing-partridge' -
15:54:59.148 | INFO    | Flow run 'imposing-partridge' - Concurrency: 1 threads (target='dev')
15:54:59.164 | INFO    | Flow run 'imposing-partridge' -
15:54:59.665 | INFO    | Task run 'model my_first_dbt_model' - 1 of 29 OK created sql table model main.my_first_dbt_model ..................... [OK in 0.18s]
15:54:59.671 | INFO    | Task run 'model my_first_dbt_model' - Finished in state Completed()
...
15:55:02.373 | ERROR   | Task run 'model product_metrics' -   Runtime Error in model product_metrics (models/marts/product/product_metrics.sql)
  Binder Error: Values list "o" does not have a column named "product_id"

  LINE 47:         on p.product_id = o.product_id
15:55:02.857 | ERROR   | Task run 'model product_metrics' - Finished in state Failed('Task run encountered an exception Exception: Node model.demo.product_metrics finished with status error')
<Warning> The task runs created by calling `.invoke()` run separately from dbt Core, and do not affect dbt's execution behavior. These tasks do not persist results and cannot be cached.

Use dbt's native retry functionality in combination with runtime data from prefect to retry failed nodes.

python
from prefect import flow
from prefect.runtime.flow_run import get_run_count
from prefect_dbt import PrefectDbtRunner


@flow(retries=2)
def run_dbt():
    runner = PrefectDbtRunner()

    if get_run_count() == 1:
        runner.invoke(["build"])
    else:
        runner.invoke(["retry"])


if __name__ == "__main__":
    run_dbt()
</Warning>

Assets

Prefect Cloud maintains a graph of assets, objects produced by your workflows.

Any dbt seed, source or model will appear on your asset graph in Prefect Cloud once it has been executed using the PrefectDbtRunner. The upstream dependencies of an asset materialized by prefect-dbt are derived from the depends_on field in dbt's manifest.json.

The asset's key will be its corresponding dbt resource's relation_name.

The name asset property is derived from the dbt resource's relation_name with adapter-specific quoting characters removed (for example, "dev"."main_marts"."product_metrics" becomes dev.main_marts.product_metrics). The description property is populated from the dbt resource's description.

The owners asset property is populated if there is data assigned to the owner key under a resource's meta config.

yaml
models:
  - name: product_metrics
    description: "Product metrics and categorization"
    config:
      meta:
        owner: "kevin-g"

Asset metadata is collected from the result of the node's execution.

json
{
  "node_path": "marts/product/product_metrics.sql",
  "node_name": "product_metrics",
  "unique_id": "model.demo.product_metrics",
  "resource_type": "model",
  "materialized": "table",
  "node_status": "error",
  "node_started_at": "2025-06-26T20:55:05.661126",
  "node_finished_at": "2025-06-26T20:55:05.733257",
  "meta": {
    "owner": "kevin-g"
  },
  "node_relation": {
    "database": "dev",
    "schema": "main_marts",
    "alias": "product_metrics",
    "relation_name": "\"dev\".\"main_marts\".\"product_metrics\""
  }
}

Optionally, the compiled code of a dbt model can be appended to the asset description.

python
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(include_compiled_code=True).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

dbt settings

The PrefectDbtSettings class, based on Pydantic's BaseSettings class, automatically detects DBT_-prefixed environment variables that have a direct effect on the PrefectDbtRunner class. If no environment variables are set, dbt's defaults are used.

Provide a PrefectDbtSettings instance to PrefectDbtRunner to customize dbt settings or override environment variables.

python
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt"
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

Logging

The PrefectDbtRunner class maps all dbt log levels to standard Python logging levels, so filtering for log levels like WARNING or ERROR in the Prefect UI applies to dbt's logs.

By default, the logging level used by dbt is Prefect's logging level, which can be configured using the PREFECT_LOGGING_LEVEL Prefect setting.

The dbt logging level can be set independently from Prefect's by using the DBT_LOG_LEVEL environment variable, setting log_level in PrefectDbtSettings, or passing the --log-level flag or log_level kwarg to .invoke(). Only logging levels of higher severity (more restrictive) than Prefect's logging level will have an effect.

python
from dbt_common.events.base_types import EventLevel
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt",
            log_level=EventLevel.ERROR, # explicitly choose a higher log level for dbt
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

profiles.yml templating

The PrefectDbtRunner class supports templating in your profiles.yml file, allowing you to reference Prefect blocks and variables that will be resolved at runtime. This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace.

For example, a Prefect variable called target can have a different value in development (dev) and production (prod) workspaces. This allows you to use the same profiles.yml file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.

yaml
example:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: snowflake
      account: "{{ prefect.blocks.snowflake-credentials.warehouse-access.account }}"
      user: "{{ prefect.blocks.snowflake-credentials.warehouse-access.user }}"
      password: "{{ prefect.blocks.snowflake-credentials.warehouse-access.password }}"
      database: "{{ prefect.blocks.snowflake-connector.prod-connector.database }}"
      schema: "{{ prefect.blocks.snowflake-connector.prod-connector.schema }}"
      warehouse: "{{ prefect.blocks.snowflake-connector.prod-connector.warehouse }}"
      threads: 4

  target: "{{ prefect.variables.target }}"

Failure handling

By default, any dbt node execution failures cause the entire dbt run to raise an exception with a message containing detailed information about the failure.

Failures detected during invocation of dbt command 'build':
Test not_null_my_first_dbt_model_id failed with message: "Got 1 result, configured to fail if != 0"

The PrefectDbtRunner's raise_on_failure option can be set to False to prevent failures in dbt from causing the failure of the flow or task in which .invoke() is called.

python
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(
        raise_on_failure=False  # Failed tests will not fail the flow run
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()

Native dbt configuration

You can disable automatic asset lineage detection for all resources in your dbt project config, or for specific resources in their own config:

yaml
prefect:
  enable_assets: False

Lifecycle hooks

<Note> Lifecycle hooks are available in `prefect-dbt` 0.7.24 and later. </Note>

PrefectDbtRunner supports decorator-based lifecycle hooks that let you react to events during a dbt invocation. Hooks are registered on a runner instance and receive a DbtHookContext with information about the event.

python
from prefect import flow
from prefect_dbt import PrefectDbtRunner, DbtHookContext


runner = PrefectDbtRunner()


@runner.on_run_start
def before_build(ctx: DbtHookContext):
    print(f"Starting dbt command: {ctx.command} with args: {ctx.args}")


@runner.post_model(select="tag:critical")
def on_critical_done(ctx: DbtHookContext):
    if ctx.status == "error":
        print(f"Critical model {ctx.node_id} failed: {ctx.error}")


@runner.on_run_end(select="tag:marts")
def after_marts(ctx: DbtHookContext):
    print(f"Run finished with status: {ctx.status}")
    if ctx.run_results:
        print(f"Matched nodes: {len(ctx.run_results)}")


@flow
def run_dbt():
    runner.invoke(["build"])


if __name__ == "__main__":
    run_dbt()

Available hooks

HookLevelAccepts select=When it runs
<nobr>@runner.on_run_start</nobr>RunNoBefore dbt begins executing any nodes
<nobr>@runner.post_model</nobr>NodeYes (optional)After each model node finishes
<nobr>@runner.on_run_end</nobr>RunYes (optional)After all nodes have finished and dbt returns a result

When select= is provided, the hook only fires for nodes matching the given dbt selector. For on_run_end, a selector filters by the set of node IDs that participated in the run.

DbtHookContext fields

Each hook receives a DbtHookContext dataclass. The fields available depend on the hook event:

FieldTypeon_run_startpost_modelon_run_end
eventstr"run_start""post_model""run_end"
commandstrThe dbt command (e.g. "build")SameSame
ownerPrefectDbtRunnerThe runner instanceSameSame
argstuple[str, ...]CLI args passed to invokeSameSame
nodeManifestNode | NoneNoneThe dbt manifest nodeNone
node_idstr | NoneNoneThe node's unique_idNone
statusstr | NoneNone"success", "error", or "skipped""success" or "error" (overall run)
resultdict | NoneNoneEvent data and message from dbt{"success": bool}
run_resultsdict | NoneNoneNonePer-node result artifacts keyed by unique_id
errorAnyNoneError message if the node failedException if the run failed
node_idstuple[str, ...]()Contains the single node IDAll node IDs from run results

Error handling

Hooks are best-effort: if a hook raises an exception, Prefect logs a warning and dbt execution continues normally. Do not use hooks as failure-control mechanisms for dbt — use dbt's own on-run-end hooks or Prefect automations for critical failure responses.

<Warning> Hooks run synchronously in the runner's process. Long-running hook logic delays dbt node processing and the `on_run_end` hook. Keep hooks lightweight — emit metrics, log information, or enqueue work rather than performing expensive operations inline. </Warning>

Recipes

The examples below use placeholder functions (emit_metric, notify, etc.) for external systems — wire them up to your own metrics or notification client.

Emit a metric when a critical model fails

Combine post_model(select=...) with ctx.status to react to failures of important models without filtering inside the hook body. The selector uses dbt's node selection syntax, so any tag, FQN, or graph operator your project uses will work.

python
from prefect_dbt import DbtHookContext, PrefectDbtRunner


def emit_metric(name: str, tags: dict[str, str]) -> None:
    """Replace with your metrics client (Datadog, statsd, OpenTelemetry, ...)."""
    ...


runner = PrefectDbtRunner()


@runner.post_model(select="tag:critical")
def alert_on_critical_failure(ctx: DbtHookContext) -> None:
    if ctx.status != "error":
        return
    emit_metric(
        "dbt.critical_model.failed",
        tags={
            "node_id": ctx.node_id or "unknown",
            "command": ctx.command,
        },
    )


runner.invoke(["build"])

Send a run summary after dbt completes

Use on_run_end to log or post a summary derived from ctx.run_results and ctx.status. The hook fires for both successful and failed runs.

python
from collections import Counter

from prefect_dbt import DbtHookContext, PrefectDbtRunner


def notify(channel: str, message: str) -> None:
    """Replace with Slack, email, PagerDuty, etc."""
    ...


runner = PrefectDbtRunner()


@runner.on_run_end
def summarize_run(ctx: DbtHookContext) -> None:
    results = ctx.run_results or {}
    statuses = Counter(node.get("status") for node in results.values())
    notify(
        "#data-platform",
        f"dbt {ctx.command} finished with status={ctx.status}; "
        f"per-node: {dict(statuses)}",
    )


runner.invoke(["build"])

Inspect run_results safely

ctx.run_results is None when dbt could not produce results (for example, a parse failure during compile), and individual entries may be missing fields depending on the dbt version. Use .get() with defaults rather than indexing.

python
from prefect_dbt import DbtHookContext, PrefectDbtRunner

runner = PrefectDbtRunner()


@runner.on_run_end
def collect_long_running_models(ctx: DbtHookContext) -> None:
    if ctx.status == "error" and ctx.error is not None:
        # Compile-time errors leave run_results unset.
        print(f"dbt run did not produce results: {ctx.error}")
        return

    slow_models: list[tuple[str, float]] = []
    for unique_id, result in (ctx.run_results or {}).items():
        if not unique_id.startswith("model."):
            continue
        execution_time = float(result.get("execution_time") or 0.0)
        if execution_time >= 30.0:
            slow_models.append((unique_id, execution_time))

    for unique_id, execution_time in sorted(
        slow_models, key=lambda item: item[1], reverse=True
    ):
        print(f"slow model {unique_id}: {execution_time:.1f}s")


runner.invoke(["build"])

Use select= with post_model and on_run_end

Both post_model and on_run_end accept dbt selectors. on_run_start does not — it always sees the full set of nodes scheduled for the run, and passing select= raises TypeError.

python
from prefect_dbt import DbtHookContext, PrefectDbtRunner

runner = PrefectDbtRunner()


@runner.on_run_start
def before_run(ctx: DbtHookContext) -> None:
    # Filter inside the hook if you only care about certain nodes.
    mart_nodes = [nid for nid in ctx.node_ids if nid.startswith("model.marts.")]
    print(f"about to run {len(mart_nodes)} mart models")


@runner.post_model(select="tag:pii")
def audit_pii_models(ctx: DbtHookContext) -> None:
    print(f"finished PII model {ctx.node_id} with status={ctx.status}")


@runner.on_run_end(select="tag:marts")
def after_marts(ctx: DbtHookContext) -> None:
    # Fires only when at least one mart node participated in the run.
    print(f"marts touched in command {ctx.command!r}; status={ctx.status}")


runner.invoke(["build"])
<Note> The selector is resolved against the project manifest once per invocation. If a selector does not match any node in the resolved manifest (or fails to resolve), the hook silently fires for nothing — check your selector with `dbt ls --select <selector>` if a hook isn't running. </Note>

See also