docs/edge/en/learn/streaming-flow-execution.mdx
CrewAI Flows support streaming output, allowing you to receive real-time updates as your flow executes. This feature enables you to build responsive applications that display results incrementally, provide live progress updates, and create better user experiences for long-running workflows.
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered StreamFrame items with printable content plus structured event data as execution progresses.
To enable streaming, set the stream attribute to True on your Flow class:
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class ResearchFlow(Flow):
stream = True # Enable streaming for the entire flow
@start()
def initialize(self):
return {"topic": "AI trends"}
@listen(initialize)
def research_topic(self, data):
researcher = Agent(
role="Research Analyst",
goal="Research topics thoroughly",
backstory="Expert researcher with analytical skills",
)
task = Task(
description="Research {topic} and provide insights",
expected_output="Detailed research findings",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
return crew.kickoff(inputs=data)
When you call kickoff() on a flow with streaming enabled, it returns a stream session that yields ordered StreamFrame items:
flow = ResearchFlow()
# Start streaming execution
streaming = flow.kickoff()
# Iterate over stream items as they arrive
for item in streaming:
print(item.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
Each item provides both printable content and structured event data:
streaming = flow.kickoff()
for item in streaming:
print(f"Channel: {item.channel}")
print(f"Type: {item.type}")
print(f"Content: {item.content}")
print(f"Event payload: {item.event}")
The stream session provides useful properties and methods:
streaming = flow.kickoff()
# Iterate and collect items
for item in streaming:
print(item.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Total frames: {len(streaming.frames)}")
print(f"Final result: {streaming.result}")
For async applications, use kickoff_async() with async iteration:
import asyncio
async def stream_flow():
flow = ResearchFlow()
# Start async streaming
streaming = await flow.kickoff_async()
# Async iteration over stream items
async for item in streaming:
print(item.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result}")
asyncio.run(stream_flow())
Streaming works seamlessly across multiple flow steps, including flows that execute multiple crews:
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class MultiStepFlow(Flow):
stream = True
@start()
def research_phase(self):
"""First crew: Research the topic."""
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information",
backstory="Expert at finding relevant information",
)
task = Task(
description="Research AI developments in healthcare",
expected_output="Research findings on AI in healthcare",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["research"] = result.raw
return result.raw
@listen(research_phase)
def analysis_phase(self, research_data):
"""Second crew: Analyze the research."""
analyst = Agent(
role="Data Analyst",
goal="Analyze information and extract insights",
backstory="Expert at identifying patterns and trends",
)
task = Task(
description="Analyze this research: {research}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"research": research_data})
# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for item in streaming:
# Track which flow step is executing
step_name = item.event.get("method_name") or item.event.get("task_name")
if step_name and step_name != current_step:
current_step = step_name
print(f"\n\n=== {step_name} ===\n")
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
Here's a complete example showing how to build a progress dashboard with streaming:
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class ResearchPipeline(Flow):
stream = True
@start()
def gather_data(self):
researcher = Agent(
role="Data Gatherer",
goal="Collect relevant information",
backstory="Skilled at finding quality sources",
)
task = Task(
description="Gather data on renewable energy trends",
expected_output="Collection of relevant data points",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["data"] = result.raw
return result.raw
@listen(gather_data)
def analyze_data(self, data):
analyst = Agent(
role="Data Analyst",
goal="Extract meaningful insights",
backstory="Expert at data analysis",
)
task = Task(
description="Analyze: {data}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"data": data})
async def run_with_dashboard():
flow = ResearchPipeline()
print("="*60)
print("RESEARCH PIPELINE DASHBOARD")
print("="*60)
streaming = await flow.kickoff_async()
current_agent = ""
current_task = ""
frame_count = 0
async for item in streaming:
frame_count += 1
# Display phase transitions
task_name = item.event.get("task_name", "")
agent_role = item.event.get("agent_role", "")
if task_name and task_name != current_task:
current_task = task_name
current_agent = agent_role
print(f"\n\nš Phase: {current_task}")
print(f"š¤ Agent: {current_agent}")
print("-" * 60)
# Display text output
if item.content:
print(item.content, end="", flush=True)
# Display tool usage
elif item.channel == "tools":
print(f"\nš§ Tool event: {item.type}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total frames: {frame_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
Streaming works naturally with Flow state management:
from pydantic import BaseModel
class AnalysisState(BaseModel):
topic: str = ""
research: str = ""
insights: str = ""
class StatefulStreamingFlow(Flow[AnalysisState]):
stream = True
@start()
def research(self):
# State is available during streaming
topic = self.state.topic
print(f"Researching: {topic}")
researcher = Agent(
role="Researcher",
goal="Research topics thoroughly",
backstory="Expert researcher",
)
task = Task(
description=f"Research {topic}",
expected_output="Research findings",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state.research = result.raw
return result.raw
@listen(research)
def analyze(self, research):
# Access updated state
print(f"Analyzing {len(self.state.research)} chars of research")
analyst = Agent(
role="Analyst",
goal="Extract insights",
backstory="Expert analyst",
)
task = Task(
description="Analyze: {research}",
expected_output="Key insights",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
result = crew.kickoff(inputs={"research": research})
self.state.insights = result.raw
return result.raw
# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")
Flow streaming is particularly valuable for:
Flow streaming yields StreamFrame items across several channels:
Standard text content from LLM responses:
for item in streaming:
if item.channel == "llm" and item.content:
print(item.content, end="", flush=True)
Information about tool calls within the flow:
for item in streaming:
if item.channel == "tools":
print(f"\nTool event: {item.type}")
print(f"Payload: {item.event}")
Handle errors gracefully during streaming:
flow = ResearchFlow()
streaming = flow.kickoff()
try:
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
except Exception as e:
print(f"\nError during flow execution: {e}")
if streaming.is_completed:
print("Streaming completed but flow encountered an error")
The stream session supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
streaming = await flow.kickoff_async()
async with streaming:
async for item in streaming:
print(item.content, end="", flush=True)
streaming = await flow.kickoff_async()
try:
async for item in streaming:
print(item.content, end="", flush=True)
finally:
await streaming.aclose() # async
# streaming.close() # sync equivalent
After cancellation, streaming.is_cancelled and streaming.is_completed are both True. Both aclose() and close() are idempotent.
.result propertyYou can combine streaming with flow visualization to provide a complete picture:
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.