docs/en/guides/flows/mastering-flow-state.mdx
State management is the backbone of any sophisticated AI workflow. In CrewAI Flows, the state system allows you to maintain context, share data between steps, and build complex application logic. Mastering state management is essential for creating reliable, maintainable, and powerful AI applications.
This guide will walk you through everything you need to know about managing state in CrewAI Flows, from basic concepts to advanced techniques, with practical code examples along the way.
Effective state management enables you to:
Let's explore how to leverage these capabilities effectively.
In CrewAI Flows, the state follows a predictable lifecycle:
Understanding this lifecycle is crucial for designing effective flows.
CrewAI offers two ways to manage state in your flows:
Let's examine each approach in detail.
Unstructured state uses a dictionary-like approach, offering flexibility and simplicity for straightforward applications.
With unstructured state:
self.state which behaves like a dictionaryHere's a simple example of unstructured state management:
from crewai.flow.flow import Flow, listen, start
class UnstructuredStateFlow(Flow):
@start()
def initialize_data(self):
print("Initializing flow data")
# Add key-value pairs to state
self.state["user_name"] = "Alex"
self.state["preferences"] = {
"theme": "dark",
"language": "English"
}
self.state["items"] = []
# The flow state automatically gets a unique ID
print(f"Flow ID: {self.state['id']}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Previous step returned: {previous_result}")
# Access and modify state
user = self.state["user_name"]
print(f"Processing data for {user}")
# Add items to a list in state
self.state["items"].append("item1")
self.state["items"].append("item2")
# Add a new key-value pair
self.state["processed"] = True
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Access multiple state values
user = self.state["user_name"]
theme = self.state["preferences"]["theme"]
items = self.state["items"]
processed = self.state.get("processed", False)
summary = f"User {user} has {len(items)} items with {theme} theme. "
summary += "Data is processed." if processed else "Data is not processed."
return summary
# Run the flow
flow = UnstructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
Unstructured state is ideal for:
While flexible, unstructured state lacks type checking and schema validation, which can lead to errors in complex applications.
Structured state uses Pydantic models to define a schema for your flow's state, providing type safety, validation, and better developer experience.
With structured state:
self.state, which behaves like a Pydantic model instanceHere's how to implement structured state management:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
# Define your state model
class UserPreferences(BaseModel):
theme: str = "light"
language: str = "English"
class AppState(BaseModel):
user_name: str = ""
preferences: UserPreferences = UserPreferences()
items: List[str] = []
processed: bool = False
completion_percentage: float = 0.0
# Create a flow with typed state
class StructuredStateFlow(Flow[AppState]):
@start()
def initialize_data(self):
print("Initializing flow data")
# Set state values (type-checked)
self.state.user_name = "Taylor"
self.state.preferences.theme = "dark"
# The ID field is automatically available
print(f"Flow ID: {self.state.id}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Processing data for {self.state.user_name}")
# Modify state (with type checking)
self.state.items.append("item1")
self.state.items.append("item2")
self.state.processed = True
self.state.completion_percentage = 50.0
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Access state (with autocompletion)
summary = f"User {self.state.user_name} has {len(self.state.items)} items "
summary += f"with {self.state.preferences.theme} theme. "
summary += "Data is processed." if self.state.processed else "Data is not processed."
summary += f" Completion: {self.state.completion_percentage}%"
return summary
# Run the flow
flow = StructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
Using structured state provides several advantages:
Structured state is recommended for:
Both unstructured and structured states automatically receive a unique identifier (UUID) to help track and manage state instances.
self.state["id"]self.state.idThis UUID is particularly valuable when implementing persistence or tracking multiple flow executions.
Regardless of whether you're using structured or unstructured state, you can update state dynamically throughout your flow's execution.
Flow methods can return values that are then passed as arguments to listening methods:
from crewai.flow.flow import Flow, listen, start
class DataPassingFlow(Flow):
@start()
def generate_data(self):
# This return value will be passed to listening methods
return "Generated data"
@listen(generate_data)
def process_data(self, data_from_previous_step):
print(f"Received: {data_from_previous_step}")
# You can modify the data and pass it along
processed_data = f"{data_from_previous_step} - processed"
# Also update state
self.state["last_processed"] = processed_data
return processed_data
@listen(process_data)
def finalize_data(self, processed_data):
print(f"Received processed data: {processed_data}")
# Access both the passed data and state
last_processed = self.state.get("last_processed", "")
return f"Final: {processed_data} (from state: {last_processed})"
This pattern allows you to combine direct data passing with state updates for maximum flexibility.
One of CrewAI's most powerful features is the ability to persist flow state across executions. This enables workflows that can be paused, resumed, and even recovered after failures.
The @persist() decorator automates state persistence, saving your flow's state at key points in execution.
When applied at the class level, @persist() saves state after every method execution:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
value: int = 0
@persist() # Apply to the entire flow class
class PersistentCounterFlow(Flow[CounterState]):
@start()
def increment(self):
self.state.value += 1
print(f"Incremented to {self.state.value}")
return self.state.value
@listen(increment)
def double(self, value):
self.state.value = value * 2
print(f"Doubled to {self.state.value}")
return self.state.value
# First run
flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Second run - state is automatically loaded
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff()
print(f"Second run result: {result2}") # Will be higher due to persisted state
For more granular control, you can apply @persist() to specific methods:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
class SelectivePersistFlow(Flow):
@start()
def first_step(self):
self.state["count"] = 1
return "First step"
@persist() # Only persist after this method
@listen(first_step)
def important_step(self, prev_result):
self.state["count"] += 1
self.state["important_data"] = "This will be persisted"
return "Important step completed"
@listen(important_step)
def final_step(self, prev_result):
self.state["count"] += 1
return f"Complete with count {self.state['count']}"
@persist supports two distinct hydration modes on kickoff / kickoff_async. Use resume (inputs["id"]) to continue the same lineage; use fork (restore_from_state_id) to start a new lineage seeded from a snapshot:
state.id after kickoff | @persist writes land under | |
|---|---|---|
inputs["id"] (resume) | supplied id | supplied id (extends history) |
restore_from_state_id (fork) | fresh id, or inputs["id"] if pinned | new id (source preserved) |
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
# Run 1: fresh state, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hydrate from flow_1's latest snapshot, but write under a NEW state.id
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2 starts with counter=1 (hydrated), then step() bumps it to 2.
# flow_1's flow_uuid history is unchanged.
Behavior notes:
restore_from_state_id not found in persistence → the kickoff falls back silently to default behavior (mirrors the existing inputs["id"] resume not-found behavior). No exception is raised.restore_from_state_id with from_checkpoint raises a ValueError — they target different state systems (@persist vs. Checkpointing) and cannot be combined.restore_from_state_id=None (default) is byte-identical to a kickoff without the parameter.inputs["id"] while forking means the new run shares a persistence key with another flow — usually you want only restore_from_state_id.Flows support conditional @start() and resumable execution for HITL/cyclic scenarios:
from crewai.flow.flow import Flow, start, listen, and_, or_
class ResumableFlow(Flow):
@start() # unconditional start
def init(self):
...
# Conditional start: run after "init" or external trigger name
@start("init")
def maybe_begin(self):
...
@listen(and_(init, maybe_begin))
def proceed(self):
...
@start() accepts a method name, a router label, or a callable condition.You can use state to implement complex conditional logic in your flows:
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class PaymentState(BaseModel):
amount: float = 0.0
is_approved: bool = False
retry_count: int = 0
class PaymentFlow(Flow[PaymentState]):
@start()
def process_payment(self):
# Simulate payment processing
self.state.amount = 100.0
self.state.is_approved = self.state.amount < 1000
return "Payment processed"
@router(process_payment)
def check_approval(self, previous_result):
if self.state.is_approved:
return "approved"
elif self.state.retry_count < 3:
return "retry"
else:
return "rejected"
@listen("approved")
def handle_approval(self):
return f"Payment of ${self.state.amount} approved!"
@listen("retry")
def handle_retry(self):
self.state.retry_count += 1
print(f"Retrying payment (attempt {self.state.retry_count})...")
# Could implement retry logic here
return "Retry initiated"
@listen("rejected")
def handle_rejection(self):
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
For complex state transformations, you can create dedicated methods:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import List, Dict
class UserData(BaseModel):
name: str
active: bool = True
login_count: int = 0
class ComplexState(BaseModel):
users: Dict[str, UserData] = {}
active_user_count: int = 0
class TransformationFlow(Flow[ComplexState]):
@start()
def initialize(self):
# Add some users
self.add_user("alice", "Alice")
self.add_user("bob", "Bob")
self.add_user("charlie", "Charlie")
return "Initialized"
@listen(initialize)
def process_users(self, _):
# Increment login counts
for user_id in self.state.users:
self.increment_login(user_id)
# Deactivate one user
self.deactivate_user("bob")
# Update active count
self.update_active_count()
return f"Processed {len(self.state.users)} users"
# Helper methods for state transformations
def add_user(self, user_id: str, name: str):
self.state.users[user_id] = UserData(name=name)
self.update_active_count()
def increment_login(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].login_count += 1
def deactivate_user(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].active = False
self.update_active_count()
def update_active_count(self):
self.state.active_user_count = sum(
1 for user in self.state.users.values() if user.active
)
This pattern of creating helper methods keeps your flow methods clean while enabling complex state manipulations.
One of the most powerful patterns in CrewAI is combining flow state management with crew execution.
You can use flow state to parameterize crews:
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
class ResearchState(BaseModel):
topic: str = ""
depth: str = "medium"
results: str = ""
class ResearchFlow(Flow[ResearchState]):
@start()
def get_parameters(self):
# In a real app, this might come from user input
self.state.topic = "Artificial Intelligence Ethics"
self.state.depth = "deep"
return "Parameters set"
@listen(get_parameters)
def execute_research(self, _):
# Create agents
researcher = Agent(
role="Research Specialist",
goal=f"Research {self.state.topic} in {self.state.depth} detail",
backstory="You are an expert researcher with a talent for finding accurate information."
)
writer = Agent(
role="Content Writer",
goal="Transform research into clear, engaging content",
backstory="You excel at communicating complex ideas clearly and concisely."
)
# Create tasks
research_task = Task(
description=f"Research {self.state.topic} with {self.state.depth} analysis",
expected_output="Comprehensive research notes in markdown format",
agent=researcher
)
writing_task = Task(
description=f"Create a summary on {self.state.topic} based on the research",
expected_output="Well-written article in markdown format",
agent=writer,
context=[research_task]
)
# Create and run crew
research_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
# Run crew and store result in state
result = research_crew.kickoff()
self.state.results = result.raw
return "Research completed"
@listen(execute_research)
def summarize_results(self, _):
# Access the stored results
result_length = len(self.state.results)
return f"Research on {self.state.topic} completed with {result_length} characters of results."
When a crew completes, you can process its output and store it in your flow state:
@listen(execute_crew)
def process_crew_results(self, _):
# Parse the raw results (assuming JSON output)
import json
try:
results_dict = json.loads(self.state.raw_results)
self.state.processed_results = {
"title": results_dict.get("title", ""),
"main_points": results_dict.get("main_points", []),
"conclusion": results_dict.get("conclusion", "")
}
return "Results processed successfully"
except json.JSONDecodeError:
self.state.error = "Failed to parse crew results as JSON"
return "Error processing results"
Design your state to contain only what's necessary:
# Too broad
class BloatedState(BaseModel):
user_data: Dict = {}
system_settings: Dict = {}
temporary_calculations: List = []
debug_info: Dict = {}
# ...many more fields
# Better: Focused state
class FocusedState(BaseModel):
user_id: str
preferences: Dict[str, str]
completion_status: Dict[str, bool]
As your flows grow in complexity, structured state becomes increasingly valuable:
# Simple flow can use unstructured state
class SimpleGreetingFlow(Flow):
@start()
def greet(self):
self.state["name"] = "World"
return f"Hello, {self.state['name']}!"
# Complex flow benefits from structured state
class UserRegistrationState(BaseModel):
username: str
email: str
verification_status: bool = False
registration_date: datetime = Field(default_factory=datetime.now)
last_login: Optional[datetime] = None
class RegistrationFlow(Flow[UserRegistrationState]):
# Methods with strongly-typed state access
For complex flows, document how state changes throughout the execution:
@start()
def initialize_order(self):
"""
Initialize order state with empty values.
State before: {}
State after: {order_id: str, items: [], status: 'new'}
"""
self.state.order_id = str(uuid.uuid4())
self.state.items = []
self.state.status = "new"
return "Order initialized"
Implement error handling for state access:
@listen(previous_step)
def process_data(self, _):
try:
# Try to access a value that might not exist
user_preference = self.state.preferences.get("theme", "default")
except (AttributeError, KeyError):
# Handle the error gracefully
self.state.errors = self.state.get("errors", [])
self.state.errors.append("Failed to access preferences")
user_preference = "default"
return f"Used preference: {user_preference}"
Leverage state to track progress in long-running flows:
class ProgressTrackingFlow(Flow):
@start()
def initialize(self):
self.state["total_steps"] = 3
self.state["current_step"] = 0
self.state["progress"] = 0.0
self.update_progress()
return "Initialized"
def update_progress(self):
"""Helper method to calculate and update progress"""
if self.state.get("total_steps", 0) > 0:
self.state["progress"] = (self.state.get("current_step", 0) /
self.state["total_steps"]) * 100
print(f"Progress: {self.state['progress']:.1f}%")
@listen(initialize)
def step_one(self, _):
# Do work...
self.state["current_step"] = 1
self.update_progress()
return "Step 1 complete"
# Additional steps...
Especially with structured state, prefer immutable operations for clarity:
# Instead of modifying lists in place:
self.state.items.append(new_item) # Mutable operation
# Consider creating new state:
from pydantic import BaseModel
from typing import List
class ItemState(BaseModel):
items: List[str] = []
class ImmutableFlow(Flow[ItemState]):
@start()
def add_item(self):
# Create new list with the added item
self.state.items = [*self.state.items, "new item"]
return "Item added"
When developing, add logging to track state changes:
import logging
logging.basicConfig(level=logging.INFO)
class LoggingFlow(Flow):
def log_state(self, step_name):
logging.info(f"State after {step_name}: {self.state}")
@start()
def initialize(self):
self.state["counter"] = 0
self.log_state("initialize")
return "Initialized"
@listen(initialize)
def increment(self, _):
self.state["counter"] += 1
self.log_state("increment")
return f"Incremented to {self.state['counter']}"
You can add methods to visualize your state for debugging:
def visualize_state(self):
"""Create a simple visualization of the current state"""
import json
from rich.console import Console
from rich.panel import Panel
console = Console()
if hasattr(self.state, "model_dump"):
# Pydantic v2
state_dict = self.state.model_dump()
elif hasattr(self.state, "dict"):
# Pydantic v1
state_dict = self.state.dict()
else:
# Unstructured state
state_dict = dict(self.state)
# Remove id for cleaner output
if "id" in state_dict:
state_dict.pop("id")
state_json = json.dumps(state_dict, indent=2, default=str)
console.print(Panel(state_json, title="Current Flow State"))
Mastering state management in CrewAI Flows gives you the power to build sophisticated, robust AI applications that maintain context, make complex decisions, and deliver consistent results.
Whether you choose unstructured or structured state, implementing proper state management practices will help you create flows that are maintainable, extensible, and effective at solving real-world problems.
As you develop more complex flows, remember that good state management is about finding the right balance between flexibility and structure, making your code both powerful and easy to understand.
<Check> You've now mastered the concepts and practices of state management in CrewAI Flows! With this knowledge, you can create robust AI workflows that effectively maintain context, share data between steps, and build sophisticated application logic. </Check>