docs/graph/beta/index.md
!!! warning "Beta API" This is the new beta graph API. It provides enhanced capabilities for parallel execution, conditional branching, and complex workflows. The original graph API is still available (and compatible of interop with the new beta API) and is documented in the main graph documentation.
The beta graph API in pydantic-graph provides a powerful builder pattern for constructing parallel execution graphs with:
This API is designed for advanced workflows where you want declarative control over parallelism, routing, and data aggregation.
The beta graph API is included with pydantic-graph:
pip install pydantic-graph
Or as part of pydantic-ai:
pip install pydantic-ai
Here's a simple example to get you started:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class CounterState:
"""State for tracking a counter value."""
value: int = 0
async def main():
# Create a graph builder with state and output types
g = GraphBuilder(state_type=CounterState, output_type=int)
# Define steps using the decorator
@g.step
async def increment(ctx: StepContext[CounterState, None, None]) -> int:
"""Increment the counter and return its value."""
ctx.state.value += 1
return ctx.state.value
@g.step
async def double_it(ctx: StepContext[CounterState, None, int]) -> int:
"""Double the input value."""
return ctx.inputs * 2
# Add edges connecting the nodes
g.add(
g.edge_from(g.start_node).to(increment),
g.edge_from(increment).to(double_it),
g.edge_from(double_it).to(g.end_node),
)
# Build and run the graph
graph = g.build()
state = CounterState()
result = await graph.run(state=state)
print(f'Result: {result}')
#> Result: 2
print(f'Final state: {state.value}')
#> Final state: 1
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
The [GraphBuilder][pydantic_graph.beta.graph_builder.GraphBuilder] is the main entry point for constructing graphs. It's generic over:
StateT - The type of mutable state shared across all nodesDepsT - The type of dependencies injected into nodesInputT - The type of initial input to the graphOutputT - The type of final output from the graphSteps are async functions decorated with [@g.step][pydantic_graph.beta.graph_builder.GraphBuilder.step] that define the actual work to be done in each node. They receive a [StepContext][pydantic_graph.beta.step.StepContext] with access to:
ctx.state - The mutable graph statectx.deps - Injected dependenciesctx.inputs - Input data for this stepEdges define the connections between nodes. The builder provides multiple ways to create edges:
g.add()][pydantic_graph.beta.graph_builder.GraphBuilder.add] - Add one or more edge pathsg.add_edge()][pydantic_graph.beta.graph_builder.GraphBuilder.add_edge] - Add a simple edge between two nodesg.edge_from()][pydantic_graph.beta.graph_builder.GraphBuilder.edge_from] - Start building a complex edge pathEvery graph has:
g.start_node][pydantic_graph.beta.graph_builder.GraphBuilder.start_node] - The entry point receiving initial inputsg.end_node][pydantic_graph.beta.graph_builder.GraphBuilder.end_node] - The exit point producing final outputsHere's an example showcasing parallel execution with a map operation:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
from pydantic_graph.beta.join import reduce_list_append
@dataclass
class ProcessingState:
"""State for tracking processing metrics."""
items_processed: int = 0
async def main():
g = GraphBuilder(
state_type=ProcessingState,
input_type=list[int],
output_type=list[int],
)
@g.step
async def square(ctx: StepContext[ProcessingState, None, int]) -> int:
"""Square a number and track that we processed it."""
ctx.state.items_processed += 1
return ctx.inputs * ctx.inputs
# Create a join to collect results
collect_results = g.join(reduce_list_append, initial_factory=list[int])
# Build the graph with map operation
g.add(
g.edge_from(g.start_node).map().to(square),
g.edge_from(square).to(collect_results),
g.edge_from(collect_results).to(g.end_node),
)
graph = g.build()
state = ProcessingState()
result = await graph.run(state=state, inputs=[1, 2, 3, 4, 5])
print(f'Results: {sorted(result)}')
#> Results: [1, 4, 9, 16, 25]
print(f'Items processed: {state.items_processed}')
#> Items processed: 5
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
In this example:
.map() operation fans out each item to a separate parallel execution of the square stepreduce_list_append][pydantic_graph.beta.join.reduce_list_append]Explore the detailed documentation for each feature:
Beyond the basic [graph.run()][pydantic_graph.beta.graph.Graph.run] method, the beta API provides fine-grained control over graph execution.
Use [graph.iter()][pydantic_graph.beta.graph.Graph.iter] to execute the graph one step at a time:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class CounterState:
value: int = 0
async def main():
g = GraphBuilder(state_type=CounterState, output_type=int)
@g.step
async def increment(ctx: StepContext[CounterState, None, None]) -> int:
ctx.state.value += 1
return ctx.state.value
@g.step
async def double_it(ctx: StepContext[CounterState, None, int]) -> int:
return ctx.inputs * 2
g.add(
g.edge_from(g.start_node).to(increment),
g.edge_from(increment).to(double_it),
g.edge_from(double_it).to(g.end_node),
)
graph = g.build()
state = CounterState()
# Use iter() for step-by-step execution
async with graph.iter(state=state) as graph_run:
print(f'Initial state: {state.value}')
#> Initial state: 0
# Advance execution step by step
async for event in graph_run:
print(f'{state.value=} | {event=}')
#> state.value=0 | event=[GraphTask(node_id='increment', inputs=None)]
#> state.value=1 | event=[GraphTask(node_id='double_it', inputs=1)]
#> state.value=1 | event=[GraphTask(node_id='__end__', inputs=2)]
#> state.value=1 | event=EndMarker(_value=2)
if graph_run.output is not None:
print(f'Final output: {graph_run.output}')
#> Final output: 2
break
(This example is complete, it can be run "as is" — you'll need to add import asyncio; asyncio.run(main()) to run main)
The [GraphRun][pydantic_graph.beta.graph.GraphRun] object provides:
next_task property: Inspect upcoming tasksoutput property: Check if the graph has completed and get the final outputnext() method: Manually advance execution with optional value injectionGenerate Mermaid diagrams of your graph structure using [graph.render()][pydantic_graph.beta.graph.Graph.render]:
from dataclasses import dataclass
from pydantic_graph.beta import GraphBuilder, StepContext
@dataclass
class SimpleState:
pass
g = GraphBuilder(state_type=SimpleState, output_type=str)
@g.step
async def step_a(ctx: StepContext[SimpleState, None, None]) -> int:
return 10
@g.step
async def step_b(ctx: StepContext[SimpleState, None, int]) -> str:
return f'Result: {ctx.inputs}'
g.add(
g.edge_from(g.start_node).to(step_a),
g.edge_from(step_a).to(step_b),
g.edge_from(step_b).to(g.end_node),
)
graph = g.build()
# Generate a Mermaid diagram
mermaid_diagram = graph.render(title='My Graph', direction='LR')
print(mermaid_diagram)
"""
---
title: My Graph
---
stateDiagram-v2
direction LR
step_a
step_b
[*] --> step_a
step_a --> step_b
step_b --> [*]
"""
The rendered diagram can be displayed in documentation, notebooks, or any tool that supports Mermaid syntax.
The original graph API (documented in the main graph page) uses a class-based approach with [BaseNode][pydantic_graph.nodes.BaseNode] subclasses. The beta API uses a builder pattern with decorated functions, which provides:
Advantages:
Trade-offs:
Both APIs are fully supported and can even be integrated together when needed.
!!! info "No Native Persistence" Unlike the original Graph API, the beta graph API does not include built-in state persistence. This is due to the complexity of achieving consistent snapshotting with parallel execution.
For workflows that need to preserve progress across failures, restarts, or long-running operations, use one of the supported durable execution solutions.