docs/en/learn/streaming-crew-execution.mdx
CrewAI provides the ability to stream real-time output during crew execution, allowing you to display results as they're generated rather than waiting for the entire process to complete. This feature is particularly useful for building interactive applications, providing user feedback, and monitoring long-running processes.
When streaming is enabled, CrewAI captures LLM responses and tool calls as they happen, packaging them into structured chunks that include context about which task and agent is executing. You can iterate over these chunks in real-time and access the final result once execution completes.
To enable streaming, set the stream parameter to True when creating your crew:
from crewai import Agent, Crew, Task
# Create your agents and tasks
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information on topics",
backstory="You are an experienced researcher with excellent analytical skills.",
)
task = Task(
description="Research the latest developments in AI",
expected_output="A detailed report on recent AI advancements",
agent=researcher,
)
# Enable streaming
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True # Enable streaming output
)
When you call kickoff() on a crew with streaming enabled, it returns a CrewStreamingOutput object that you can iterate over to receive chunks as they arrive:
# Start streaming execution
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
Each chunk provides rich context about the execution:
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(f"Task: {chunk.task_name} (index {chunk.task_index})")
print(f"Agent: {chunk.agent_role}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
if chunk.tool_call:
print(f"Tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
The CrewStreamingOutput object provides several useful properties:
streaming = crew.kickoff(inputs={"topic": "AI"})
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"All chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result.raw}")
For async applications, you can use either akickoff() (native async) or kickoff_async() (thread-based) with async iteration:
akickoff()The akickoff() method provides true native async execution throughout the entire chain:
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Start native async streaming
streaming = await crew.akickoff(inputs={"topic": "AI"})
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
asyncio.run(stream_crew())
kickoff_async()For simpler async integration or backward compatibility:
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Start thread-based async streaming
streaming = await crew.kickoff_async(inputs={"topic": "AI"})
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
asyncio.run(stream_crew())
When executing a crew for multiple inputs with kickoff_for_each(), streaming works differently depending on whether you use sync or async:
With synchronous kickoff_for_each(), you get a list of CrewStreamingOutput objects, one for each input:
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Returns list of streaming outputs
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)
# Iterate over each streaming output
for i, streaming in enumerate(streaming_outputs):
print(f"\n=== Input {i + 1} ===")
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nResult {i + 1}: {result.raw}")
With async kickoff_for_each_async(), you get a single CrewStreamingOutput that yields chunks from all crews as they arrive concurrently:
import asyncio
async def stream_multiple_crews():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Returns single streaming output for all crews
streaming = await crew.kickoff_for_each_async(inputs=inputs_list)
# Chunks from all crews arrive as they're generated
async for chunk in streaming:
print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)
# Access all results
results = streaming.results # List of CrewOutput objects
for i, result in enumerate(results):
print(f"\n\nResult {i + 1}: {result.raw}")
asyncio.run(stream_multiple_crews())
Chunks can be of different types, indicated by the chunk_type field:
Standard text content from LLM responses:
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
Information about tool calls being made:
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
print(f"\nCalling tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
Here's a complete example showing how to build an interactive application with streaming:
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
async def interactive_research():
# Create crew with streaming enabled
researcher = Agent(
role="Research Analyst",
goal="Provide detailed analysis on any topic",
backstory="You are an expert researcher with broad knowledge.",
)
task = Task(
description="Research and analyze: {topic}",
expected_output="A comprehensive analysis with key insights",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True,
verbose=False
)
# Get user input
topic = input("Enter a topic to research: ")
print(f"\n{'='*60}")
print(f"Researching: {topic}")
print(f"{'='*60}\n")
# Start streaming execution
streaming = await crew.kickoff_async(inputs={"topic": topic})
current_task = ""
async for chunk in streaming:
# Show task transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
print(f"\n[{chunk.agent_role}] Working on: {chunk.task_name}")
print("-" * 60)
# Display text chunks
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
# Display tool calls
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nš§ Using tool: {chunk.tool_call.tool_name}")
# Show final result
result = streaming.result
print(f"\n\n{'='*60}")
print("Analysis Complete!")
print(f"{'='*60}")
print(f"\nToken Usage: {result.token_usage}")
asyncio.run(interactive_research())
Streaming is particularly valuable for:
CrewStreamingOutput supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
streaming = await crew.akickoff(inputs={"topic": "AI"})
async with streaming:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
streaming = await crew.akickoff(inputs={"topic": "AI"})
try:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
finally:
await streaming.aclose() # async
# streaming.close() # sync equivalent
After cancellation, streaming.is_cancelled and streaming.is_completed are both True. Both aclose() and close() are idempotent.
.result propertykickoff_for_each_async() with streaming, use .results (plural) to get all outputsHandle errors during streaming execution:
streaming = crew.kickoff(inputs={"topic": "AI"})
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess: {result.raw}")
except Exception as e:
print(f"\nError during streaming: {e}")
if streaming.is_completed:
print("Streaming completed but an error occurred")
By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.