docs/v3/how-to-guides/workflows/test-workflows.mdx
Test Prefect flows and tasks by running them against an isolated, temporary backend or by calling the underlying function directly.
Use prefect_test_harness as a context manager to run flows and tasks against a
temporary local SQLite database:
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
@flow
def my_favorite_flow():
return 42
def test_my_favorite_flow():
with prefect_test_harness():
# run the flow against a temporary testing database
assert my_favorite_flow() == 42
For more extensive testing, use prefect_test_harness as a fixture in your unit testing framework. For example, when using pytest:
from prefect import flow
import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@flow
def my_favorite_flow():
return 42
def test_my_favorite_flow():
assert my_favorite_flow() == 42
In this example, the fixture is scoped to run once for the entire test session. In most cases, you do not need a clean database for each test. Just isolate your test runs to a test database. Creating a new test database per test creates significant overhead, so scope the fixture to the session. If you need to isolate some tests fully, place them in a separate module with a function-scoped harness fixture (see Stale state between tests). </Note>
When you enter the prefect_test_harness context manager, the following happens:
A temporary directory is created and PREFECT_API_DATABASE_CONNECTION_URL is overridden
to point at a new SQLite file inside it. The database interface is reset through
temporary_database_interface() so the server uses this temporary database instead of
any previously configured one.
A SubprocessASGIServer starts in a child process on a random available port
(in the 8000-9000 range). This is a real Prefect API server backed by the temporary
database. PREFECT_API_URL is then overridden to point at this server so all client
calls route to it.
Your flows and tasks execute the same code paths they would in production, including state transitions, result persistence, and API interactions.
When you exit the context manager, the API log worker and events worker are drained to flush pending data and prevent stale events from leaking into subsequent test harness invocations. The subprocess server is stopped and the temporary directory is cleaned up at process exit.
</Steps>To test the function decorated with @task or @flow without running it through the
Prefect engine, use .fn() to call the wrapped function directly:
from prefect import flow, task
@task
def my_favorite_task():
return 42
@flow
def my_favorite_flow():
val = my_favorite_task()
return val
def test_my_favorite_task():
assert my_favorite_task.fn() == 42
This bypasses state tracking, retries, and logging. It is useful for fast unit tests that only need to verify business logic.
If your flow or task calls get_run_logger(), calling .fn() outside of a run context
raises a MissingContextError. Disable the run logger to avoid this:
from prefect.logging import disable_run_logger
def test_my_favorite_task():
with disable_run_logger():
assert my_favorite_task.fn() == 42
To test log output from flows and tasks, use pytest's caplog fixture to capture log messages:
import logging
from typing import Any
import pytest
from prefect import flow, get_run_logger, task
from prefect.testing.utilities import prefect_test_harness
@task
def log_message() -> None:
logger = get_run_logger()
logger.info("Logging from task")
@flow
def parent_flow() -> None:
logger = get_run_logger()
logger.info("Logging from flow")
log_message()
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
def test_flow_log_message(caplog: Any) -> None:
caplog.set_level(logging.INFO)
parent_flow()
assert "Logging from flow" in caplog.messages
def test_task_log_message(caplog: Any) -> None:
caplog.set_level(logging.INFO)
parent_flow()
assert "Logging from task" in caplog.messages
The caplog fixture only captures logs when the run logger is active. If you disable the run logger with disable_run_logger(), caplog does not capture any log output from flows or tasks.
</Note>
If your flows or tasks are async, use pytest-asyncio together with the test harness.
The harness itself is a synchronous context manager, so set it up in a synchronous
session-scoped fixture and write your test functions as async:
import pytest
from prefect import flow
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@flow
async def my_async_flow():
return "async result"
@pytest.mark.asyncio
async def test_my_async_flow():
result = await my_async_flow()
assert result == "async result"
You can also use .fn() for async functions:
from prefect import task
@task
async def fetch_data():
return {"key": "value"}
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data.fn()
assert result == {"key": "value"}
The test harness starts a subprocess server and waits for it to become healthy. By default it waits 30 seconds. If your environment is slow (CI runners, constrained resources), increase the timeout:
with prefect_test_harness(server_startup_timeout=60):
...
If the server fails to start, check for:
sqlite3 module.The session-scoped fixture reuses a single database across all tests. If a test creates flows or deployments that interfere with other tests, you have two options:
prefect_test_harness inside a test that already runs under a session-scoped harness,
because the inner harness exit stops the shared subprocess server and leaves later
tests pointing at a dead API URL. Instead, put those tests in a separate file or
module with its own function-scoped fixture:import pytest
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="function")
def clean_harness():
with prefect_test_harness():
yield
def test_needs_clean_database(clean_harness):
# fresh temporary database for this test only
...
Calling get_run_logger() outside of a flow or task run raises a MissingContextError.
This commonly occurs when calling .fn() on a decorated function that uses the run
logger. Wrap the call with disable_run_logger():
from prefect.logging import disable_run_logger
def test_task_with_logging():
with disable_run_logger():
result = my_task.fn()
Alternatively, run the flow or task through the test harness so that a proper run context is available.
If you use pytest-xdist or another parallel test runner, each worker process should
create its own prefect_test_harness instance. Because each harness starts its own
server on a random port with its own SQLite database, parallel workers do not conflict
with each other. Make sure the fixture is session-scoped per worker (the default
behavior with pytest-xdist).
PREFECT_API_URL and other configuration