docs/v3/examples/ai-data-analyst-with-pydantic-ai.mdx
{/*
This page is automatically generated via the generate_example_pages.py script. Any changes to this page will be overwritten.
*/}
<a href="https://github.com/PrefectHQ/prefect/blob/main/examples/ai_data_analyst_with_pydantic_ai.py" target="_blank">View on GitHub</a>
This example shows how to build resilient AI workflows using Prefect and pydantic-ai.
The integration provides automatic retries for LLM calls, full observability of agent decisions,
and durable execution semantics that make workflows idempotent and rerunnable.
You need to analyze datasets programmatically, but writing custom analysis code for each dataset is time-consuming. Instead, you'll build an AI agent that:
All while being resilient to LLM failures, tool errors, and network issues.
This example demonstrates:
PrefectAgent – Wraps pydantic-ai agents for durable executionTaskConfig – Custom retry policies and timeouts for AI operationsInstall dependencies (if not already installed):
uv add pydantic-ai[prefect] pandas
# or with pip:
pip install "pydantic-ai[prefect]" pandas
from __future__ import annotations
from typing import Any
import pandas as pd
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig
from prefect import flow, task
These functions are "tools" that the AI agent can call to analyze data. Prefect automatically wraps each tool execution as a task for observability and retries.
def calculate_statistics(ctx: RunContext[pd.DataFrame], column: str) -> dict[str, Any]:
"""Calculate descriptive statistics for a column.
The AI agent can call this tool to understand data distribution,
and Prefect ensures it retries on failure."""
df = ctx.deps
if column not in df.columns:
return {"error": f"Column '{column}' not found. Available: {list(df.columns)}"}
stats = df[column].describe().to_dict()
stats["missing_count"] = int(df[column].isna().sum())
stats["unique_count"] = int(df[column].nunique())
return {
k: (float(v) if isinstance(v, (int, float)) else v) for k, v in stats.items()
}
def detect_anomalies(
ctx: RunContext[pd.DataFrame], column: str, threshold: float = 3.0
) -> list[dict[str, Any]]:
"""Detect anomalies using standard deviation method.
Identifies values that are more than `threshold` standard deviations from the mean.
This tool demonstrates how complex analysis logic can be made reliable with Prefect."""
df = ctx.deps
if column not in df.columns:
return [{"error": f"Column '{column}' not found"}]
if not pd.api.types.is_numeric_dtype(df[column]):
return [{"error": f"Column '{column}' is not numeric"}]
mean = df[column].mean()
std = df[column].std()
if std == 0:
return []
anomalies = df[abs(df[column] - mean) > (threshold * std)]
return [
{
"index": int(idx),
"value": float(row[column]),
"z_score": float((row[column] - mean) / std),
}
for idx, row in anomalies.head(10).iterrows()
]
def get_column_info(ctx: RunContext[pd.DataFrame]) -> dict[str, Any]:
"""Get overview of all columns in the dataset.
Helps the AI agent understand the dataset structure before analysis."""
df = ctx.deps
return {
"columns": list(df.columns),
"shape": {"rows": len(df), "columns": len(df.columns)},
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
}
Structured output ensures the AI returns consistent, parseable results.
class DataAnalysis(BaseModel):
"""Structured analysis results from the AI agent."""
summary: str = Field(description="High-level summary of the dataset")
key_findings: list[str] = Field(
description="Key findings discovered from the data", min_length=3, max_length=5
)
recommendations: list[str] = Field(
description="Actionable recommendations based on the findings",
min_length=3,
max_length=5,
)
columns_analyzed: list[str] = Field(
description="List of columns that were analyzed"
)
def __str__(self) -> str:
"""Format the analysis results for clean display."""
findings = "\n".join(
f" {i}. {finding}" for i, finding in enumerate(self.key_findings, 1)
)
recommendations = "\n".join(
f" {i}. {rec}" for i, rec in enumerate(self.recommendations, 1)
)
return f"""
{"=" * 80}
ANALYSIS RESULTS
{"=" * 80}
📋 Summary:
{self.summary}
🔑 Key Findings:
{findings}
💡 Recommendations:
{recommendations}
📊 Columns Analyzed: {", ".join(self.columns_analyzed)}
{"=" * 80}
"""
We configure the agent with tools and wrap it with PrefectAgent for durability.
def create_data_analyst_agent() -> PrefectAgent[pd.DataFrame, DataAnalysis]:
"""Create an AI data analyst with Prefect durability.
The PrefectAgent wrapper automatically:
- Wraps agent.run as a Prefect flow
- Wraps LLM calls as Prefect tasks with retries
- Wraps tool calls as separate Prefect tasks
"""
# Create the base pydantic-ai agent
agent = Agent(
"openai:gpt-4o",
name="data-analyst-agent",
output_type=DataAnalysis,
deps_type=pd.DataFrame,
# Register tools that the agent can use
tools=[calculate_statistics, detect_anomalies, get_column_info],
system_prompt=(
"You are an expert data analyst. Analyze the provided dataset using "
"the available tools. Focus on finding meaningful patterns, anomalies, "
"and actionable insights. Always start by understanding the dataset "
"structure with get_column_info."
),
)
# Wrap with PrefectAgent for durable execution with custom retry policy
return PrefectAgent(
agent,
model_task_config=TaskConfig(
retries=3, # Retry LLM calls up to 3 times
retry_delay_seconds=[1.0, 2.0, 4.0], # Exponential backoff
timeout_seconds=60.0, # 60s timeout for LLM calls
),
tool_task_config=TaskConfig(
retries=2, # Retry tool calls up to 2 times
retry_delay_seconds=[0.5, 1.0],
),
)
Create a realistic sales dataset for demonstration.
@task
def create_sample_dataset() -> pd.DataFrame:
"""Generate a sample sales dataset with some anomalies.
In production, you'd load real data from a file, database, or API."""
data = {
"product": ["Widget", "Gadget", "Doohickey", "Widget", "Gadget"] * 20,
"sales": [100, 150, 200, 110, 145] * 19
+ [100, 150, 200, 1000, 2000], # Last 2 are anomalies
"region": ["North", "South", "East", "West", "Central"] * 20,
"month": [1, 2, 3, 4, 5] * 20,
}
return pd.DataFrame(data)
Orchestrate the entire AI analysis workflow with Prefect.
@flow(name="ai-data-analyst", log_prints=True)
async def analyze_dataset_with_ai() -> DataAnalysis:
"""Run AI-powered data analysis with automatic retries and observability.
This flow demonstrates how Prefect makes AI workflows production-ready:
1. Dataset preparation is tracked as a task
2. AI agent execution is wrapped for durability
3. All LLM and tool calls are logged and retryable
4. Results are structured and validated with Pydantic
"""
# Prepare the dataset
print("📊 Preparing dataset...")
df = create_sample_dataset()
print(f"Dataset shape: {df.shape}\n")
# Create the AI agent with Prefect durability
print("🤖 Initializing AI data analyst...")
agent = create_data_analyst_agent()
# Run the analysis - all LLM and tool calls are automatically retried on failure
print("🔍 Running AI analysis...\n")
result = await agent.run(
"Analyze this sales dataset. Identify patterns, anomalies, and provide recommendations.",
deps=df,
)
# Display results
print(result.output)
return result.output
To get full durable execution with automatic idempotency, serve the flow to create a deployment. Deployed flows enable Prefect's transactional semantics for agent operations.
if __name__ == "__main__":
import os
import sys
# Check if OpenAI API key is set
if not os.getenv("OPENAI_API_KEY"):
print("❌ Error: OPENAI_API_KEY environment variable not set")
print("Set it with: export OPENAI_API_KEY='your-key-here'")
sys.exit(1)
# Serve the flow - this creates a deployment and runs a worker process
analyze_dataset_with_ai.serve(
name="ai-data-analyst-deployment",
tags=["ai", "pydantic-ai", "data-analysis"],
)
Once served, trigger runs via:
Prefect UI:
CLI:
prefect deployment run ai-data-analyst/ai-data-analyst-deployment --watch
For quick local testing without deployment:
import asyncio
asyncio.run(analyze_dataset_with_ai())
When you serve and trigger this flow, Prefect and pydantic-ai work together to create a resilient AI pipeline:
serve() creates a deployment and starts a worker to execute flow runsPrefectAgent wrapper makes all AI operations retryable:
get_column_info, calculate_statistics, detect_anomalies),
the call is run as a Prefect taskflow.serve() or flow.deploy() to unlock automatic idempotency and transactional semanticsPrefectAgentTry it yourself:
export OPENAI_API_KEY='your-key'prefect server startuv run -s examples/ai_data_analyst_with_pydantic_ai.pyFor more on AI orchestration with Prefect: