docs/v3/advanced/background-tasks.mdx
This example demonstrates how to use background tasks in the context of a web application using Prefect for task submission, execution, monitoring, and result storage. We'll build out an application using FastAPI to offer API endpoints to our clients, and task workers to execute the background tasks these endpoints defer.
<Tip> Refer to the [examples repository](https://github.com/PrefectHQ/examples/tree/main/apps/background-tasks) for the complete example's source code. </Tip>This pattern is useful when you need to perform operations that are too long for a standard web request-response cycle, such as data processing, sending emails, or interacting with external APIs that might be slow.
This example will build out:
@prefect.task definitions representing the work you want to run in the backgroundfastapi application providing API endpoints to:
POST request and submit the task to Prefect with .delay()GET request using its task_run_idDockerfile to build a multi-stage image for the web app, Prefect server and task worker(s)compose.yaml to manage lifecycles of the web app, Prefect server and task worker(s)├── Dockerfile
├── README.md
├── compose.yaml
├── pyproject.toml
├── src
│ └── foo
│ ├── __init__.py
│ ├── _internal/*.py
│ ├── api.py
│ └── task.py
You can follow along by cloning the examples repository or instead use uv to bootstrap a your own new project:
uv init --lib foo
uv add prefect marvin
This example application is structured as a library with a src/foo directory for portability and organization.
return value of your task definitions to your result storage (e.g. a local directory, S3, GCS, etc), enabling caching and idempotency.The core of the background processing is a Python function decorated with @prefect.task. This marks the function as a unit of work that Prefect can manage (e.g. observe, cache, retry, etc.)
from typing import Any, TypeVar
import marvin
from prefect import task, Task
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.states import State
from prefect.task_worker import serve
from prefect.client.schemas.objects import TaskRun
T = TypeVar("T")
def _print_output(task: Task, task_run: TaskRun, state: State[T]):
result = state.result()
print(f"result type: {type(result)}")
print(f"result: {result!r}")
@task(cache_policy=INPUTS + TASK_SOURCE, on_completion=[_print_output])
async def create_structured_output(data: Any, target: type[T], instructions: str) -> T:
return await marvin.cast_async(
data,
target=target,
instructions=instructions,
)
def main():
serve(create_structured_output)
if __name__ == "__main__":
main()
Key details:
@task: Decorator to define our task we want to run in the background.cache_policy: Caching based on INPUTS and TASK_SOURCE.serve(create_structured_output): This function starts a task worker subscribed to newly delay()ed task runs.The FastAPI application provides API endpoints to trigger the background task and check its status.
import logging
from uuid import UUID
from fastapi import Depends, FastAPI, Response
from fastapi.responses import JSONResponse
from foo._internal import get_form_data, get_task_result, StructuredOutputRequest
from foo.task import create_structured_output
logger = logging.getLogger(__name__)
app = FastAPI()
@app.post("/tasks", status_code=202)
async def submit_task(
form_data: StructuredOutputRequest = Depends(get_form_data),
) -> JSONResponse:
"""Submit a task to Prefect for background execution."""
future = create_structured_output.delay(
form_data.payload,
target=form_data.target_type,
instructions=form_data.instructions,
)
logger.info(f"Submitted task run: {future.task_run_id}")
return {"task_run_id": str(future.task_run_id)}
@app.get("/tasks/{task_run_id}/status")
async def get_task_status_api(task_run_id: UUID) -> Response:
"""Checks the status of a submitted task run."""
status, data = await get_task_result(task_run_id)
response_data = {"task_run_id": str(task_run_id), "status": status}
http_status_code = 200
if status == "completed":
response_data["result"] = data
elif status == "error":
response_data["message"] = data
# Optionally set a different HTTP status for errors
return JSONResponse(response_data, status_code=http_status_code)
The get_task_result helper function (in src/foo/_internal/_prefect.py) uses the Prefect Python client to interact with the Prefect API:
from typing import Any, Literal, cast
from uuid import UUID
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.logging import get_logger
logger = get_logger(__name__)
Status = Literal["completed", "pending", "error"]
def _any_task_run_result(task_run: TaskRun) -> Any:
try:
return cast(Any, task_run.state.result(_sync=True)) # type: ignore
except Exception as e:
logger.warning(f"Could not retrieve result for task run {task_run.id}: {e}")
return None
async def get_task_result(task_run_id: UUID) -> tuple[Status, Any]:
"""Get task result or status.
Returns:
tuple: (status, data)
status: "completed", "pending", or "error"
data: the result if completed, error message if error, None if pending
"""
try:
async with get_client() as client:
task_run = await client.read_task_run(task_run_id)
if not task_run.state:
return "pending", None
if task_run.state.is_completed():
try:
result = _any_task_run_result(task_run)
return "completed", result
except Exception as e:
logger.warning(
f"Could not retrieve result for completed task run {task_run_id}: {e}"
)
return "completed", "<Could not retrieve result>"
elif task_run.state.is_failed():
try:
error_result = _any_task_run_result(task_run)
error_message = (
str(error_result)
if error_result
else "Task failed without specific error message."
)
return "error", error_message
except Exception as e:
logger.warning(
f"Could not retrieve error result for failed task run {task_run_id}: {e}"
)
return "error", "<Could not retrieve error message>"
else:
return "pending", None
except Exception as e:
logger.error(f"Error checking task status for {task_run_id}: {e}")
return "error", f"Failed to check task status: {str(e)}"
This function fetches the TaskRun object from the API and checks its state to determine if it's Completed, Failed, or still Pending/Running. If completed, it attempts to retrieve the result using task_run.state.result(). If failed, it tries to get the error message.
A multi-stage Dockerfile is used to create optimized images for each service (Prefect server, task worker, and web API). This approach helps keep image sizes small and separates build dependencies from runtime dependencies.
# Stage 1: Base image with Python and uv
FROM --platform=linux/amd64 ghcr.io/astral-sh/uv:python3.12-bookworm-slim as base
WORKDIR /app
ENV UV_SYSTEM_PYTHON=1
ENV PATH="/root/.local/bin:$PATH"
COPY pyproject.toml uv.lock* ./
# Note: We install all dependencies needed for all stages here.
# A more optimized approach might separate dependencies per stage.
RUN --mount=type=cache,target=/root/.cache/uv \
uv pip install --system -r pyproject.toml
COPY src/ /app/src
FROM base as server
CMD ["prefect", "server", "start"]
# --- Task Worker Stage --- #
FROM base as task
# Command to start the task worker by running the task script
# This script should call `prefect.task_worker.serve(...)`
CMD ["python", "src/foo/task.py"]
# --- API Stage --- #
FROM base as api
# Command to start the FastAPI server using uvicorn
CMD ["uvicorn", "src.foo.api:app", "--host", "0.0.0.0", "--port", "8000"]
base): Sets up Python, uv, installs all dependencies from pyproject.toml into a base layer to make use of Docker caching, and copies the source code.server): Builds upon the base stage. Sets the default command (CMD) to start the Prefect server.task): Builds upon the base stage. Sets the CMD to run the src/foo/task.py script, which is expected to contain the serve() call for the task(s).api): Builds upon the base stage. Sets the CMD to start the FastAPI application using uvicorn.The compose.yaml file then uses the target build argument to specify which of these final stages (server, task, api) to use for each service container.
We use compose.yaml to define and run the multi-container application, managing the lifecycles of the FastAPI web server, the Prefect API server, database and task worker(s).
services:
prefect-server:
build:
context: .
target: server
ports:
- "4200:4200"
volumes:
- prefect-data:/root/.prefect # Persist Prefect DB
environment:
# Allow connections from other containers
PREFECT_SERVER_API_HOST: 0.0.0.0
# Task Worker
task:
build:
context: .
target:
deploy:
replicas: 1 # task workers are safely horizontally scalable (think redis stream consumer groups)
volumes:
# Mount storage for results
- ./task-storage:/task-storage
depends_on:
prefect-server:
condition: service_started
environment:
PREFECT_API_URL: http://prefect-server:4200/api
PREFECT_LOCAL_STORAGE_PATH: /task-storage
PREFECT_LOGGING_LOG_PRINTS: "true"
PREFECT_RESULTS_PERSIST_BY_DEFAULT: "true"
MARVIN_ENABLE_DEFAULT_PRINT_HANDLER: "false"
OPENAI_API_KEY: ${OPENAI_API_KEY}
develop:
# Optionally watch for code changes for development
watch:
- action: sync
path: .
target: /app
ignore:
- .venv/
- task-storage/
- action: rebuild
path: uv.lock
api:
build:
context: .
target: api
volumes:
# Mount storage for results
- ./task-storage:/task-storage
ports:
- "8000:8000"
depends_on:
task:
condition: service_started
prefect-server:
condition: service_started
environment:
PREFECT_API_URL: http://prefect-server:4200/api
PREFECT_LOCAL_STORAGE_PATH: /task-storage
develop:
# Optionally watch for code changes for development
watch:
- action: sync
path: .
target: /app
ignore:
- .venv/
- task-storage/
- action: rebuild
path: uv.lock
volumes:
# Named volumes for data persistence
prefect-data: {}
task-storage: {}
In a production use-case, you'd likely want to:
Dockerfile for each servicepostgres service and configure it as the Prefect database.develop sectionprefect-server: Runs the Prefect API server and UI.
build: Uses a multi-stage Dockerfile (not shown here, but present in the example repo) targeting the server stage.ports: Exposes the Prefect API/UI on port 4200.volumes: Uses a named volume prefect-data to persist the Prefect SQLite database (/root/.prefect/prefect.db) across container restarts.PREFECT_SERVER_API_HOST=0.0.0.0: Makes the API server listen on all interfaces within the Docker network, allowing the task and api services to connect.task: Runs the Prefect task worker process (executing python src/foo/task.py which calls serve).
build: Uses the task stage from the Dockerfile.depends_on: Ensures the prefect-server service is started before this service attempts to connect.PREFECT_API_URL: Crucial setting that tells the worker where to find the Prefect API to poll for submitted task runs.PREFECT_LOCAL_STORAGE_PATH=/task-storage: Configures the worker to store task run results in the /task-storage directory inside the container. This path is mounted to the host using the task-storage named volume via volumes: - ./task-storage:/task-storage (or just task-storage: if using a named volume without a host path binding).PREFECT_RESULTS_PERSIST_BY_DEFAULT=true: Tells Prefect tasks to automatically save their results using the configured storage (defined by PREFECT_LOCAL_STORAGE_PATH in this case).PREFECT_LOGGING_LOG_PRINTS=true: Configures the Prefect logger to capture output from print() statements within tasks.OPENAI_API_KEY=${OPENAI_API_KEY}: Passes secrets needed by the task code from the host environment (via a .env file loaded by Docker Compose) into the container's environment.api: Runs the FastAPI web application.
build: Uses the api stage from the Dockerfile.depends_on: Waits for the prefect-server (required for submitting tasks and checking status) and optionally the task worker.PREFECT_API_URL: Tells the FastAPI application where to send .delay() calls and status check requests.PREFECT_LOCAL_STORAGE_PATH: May be needed if the API itself needs to directly read result files (though typically fetching results via task_run.state.result() is preferred).volumes: Defines named volumes (prefect-data, task-storage) to persist data generated by the containers.
Assuming you have obtained the code (either by cloning the repository or using uv init as described previously) and are in the project directory:
Prerequisites: Ensure Docker Desktop (or equivalent) with docker compose support is running.
Build and Run Services: This example's task uses marvin, which (by default) requires an OpenAI API key. Provide it as an environment variable when starting the services:
OPENAI_API_KEY=<your-openai-api-key> docker compose up --build --watch
This command will:
--build: Build the container images if they don't exist or if the Dockerfile/context has changed.--watch: Watch for changes in the project source code and automatically sync/rebuild services (useful for development).--detach or -d to run the containers in the background.Access Services:
docker compose down
# also remove the named volumes
docker compose down -v
This example provides a repeatable pattern for integrating Prefect-managed background tasks with any python web application. You can:
src/**/*.py to define and submit your specific web app and background tasks.compose.yaml) further, for example, using different result storage or logging levels.