docs/en/concepts/production-architecture.mdx
When building production AI applications with CrewAI, we recommend starting with a Flow.
While it's possible to run individual Crews or Agents, wrapping them in a Flow provides the necessary structure for a robust, scalable application.
crewai login to enable free observability features.A typical production CrewAI application looks like this:
graph TD
Start((Start)) --> Flow[Flow Orchestrator]
Flow --> State{State Management}
State --> Step1[Step 1: Data Gathering]
Step1 --> Crew1[Research Crew]
Crew1 --> State
State --> Step2{Condition Check}
Step2 -- "Valid" --> Step3[Step 3: Execution]
Step3 --> Crew2[Action Crew]
Step2 -- "Invalid" --> End((End))
Crew2 --> End
Your Flow class is the entry point. It defines the state schema and the methods that execute your logic.
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class AppState(BaseModel):
user_input: str = ""
research_results: str = ""
final_report: str = ""
class ProductionFlow(Flow[AppState]):
@start()
def gather_input(self):
# ... logic to get input ...
pass
@listen(gather_input)
def run_research_crew(self):
# ... trigger a Crew ...
pass
Use Pydantic models to define your state. This ensures type safety and makes it clear what data is available at each step.
Delegate complex tasks to Crews. A Crew should be focused on a specific goal (e.g., "Research a topic", "Write a blog post").
@listen(gather_input)
def run_research_crew(self):
crew = ResearchCrew()
result = crew.kickoff(inputs={"topic": self.state.user_input})
self.state.research_results = result.raw
Leverage CrewAI's control primitives to add robustness and control to your Crews.
Use Task Guardrails to validate task outputs before they are accepted. This ensures that your agents produce high-quality results.
def validate_content(result: TaskOutput) -> Tuple[bool, Any]:
if len(result.raw) < 100:
return (False, "Content is too short. Please expand.")
return (True, result.raw)
task = Task(
...,
guardrail=validate_content
)
Always use structured outputs (output_pydantic or output_json) when passing data between tasks or to your application. This prevents parsing errors and ensures type safety.
class ResearchResult(BaseModel):
summary: str
sources: List[str]
task = Task(
...,
output_pydantic=ResearchResult
)
Use LLM Hooks to inspect or modify messages before they are sent to the LLM, or to sanitize responses.
@before_llm_call
def log_request(context):
print(f"Agent {context.agent.role} is calling the LLM...")
When deploying your Flow, consider the following:
The easiest way to deploy your Flow is using CrewAI Enterprise. It handles the infrastructure, authentication, and monitoring for you.
Check out the Deployment Guide to get started.
crewai deploy create
For long-running tasks, use kickoff_async to avoid blocking your API.
Use the @persist decorator to save the state of your Flow to a database. This allows you to resume execution if the process crashes or if you need to wait for human input.
@persist
class ProductionFlow(Flow[AppState]):
# ...
By default, @persist resumes a flow when kickoff(inputs={"id": <uuid>}) is supplied, extending the same flow_uuid history. To fork a persisted flow into a new lineage — hydrate state from a previous run but write under a fresh state.id — pass restore_from_state_id:
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
The new run gets a fresh state.id (auto-generated, or inputs["id"] if pinned) so its @persist writes don't extend the source's history. Combining with from_checkpoint raises a ValueError; pick one hydration source.