docs/edge/en/learn/streaming-runtime-contract.mdx
CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered StreamFrame objects for Flow lifecycle events, direct LLM tokens, tool activity, conversation messages, and custom events.
Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow, chat turn, or direct LLM call is running.
Every frame has the same envelope:
from crewai.types.streaming import StreamFrame
frame.id # unique frame id
frame.seq # execution-local order, when available
frame.type # source event type, such as "flow_started"
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace # source/runtime namespace
frame.timestamp # event timestamp
frame.parent_id # parent event id, when available
frame.previous_id # previous event id, when available
frame.data # event payload
frame.event # alias for frame.data
frame.content # printable text for token-like frames, otherwise ""
The channel field is the fastest way to route frames in consumers:
| Channel | Contains |
|---|---|
llm | Token and thinking chunks from LLM streaming events |
flow | Flow lifecycle, method execution, routing, and pause/resume events |
tools | Tool usage events |
messages | Conversation transcript events |
lifecycle | Runtime lifecycle events that are not specific to another channel |
custom | Events that do not map to a built-in channel |
frame.type preserves the source event type, so consumers can handle specific events inside a channel.
Set stream=True on a Flow to make kickoff() return a stream session:
from crewai.flow import Flow, start
class ReportFlow(Flow):
@start()
def generate(self):
return "done"
flow = ReportFlow(stream=True)
stream = flow.kickoff()
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
if chunk.type == "tool_usage_started":
print(chunk.event["tool_name"])
result = stream.result
You must consume the stream before reading stream.result. Accessing the result early raises a RuntimeError so consumers do not accidentally treat a partial run as complete.
You can also call flow.stream_events(...) directly when you want streaming for a single invocation without setting stream=True on the Flow instance.
StreamSession exposes channel projections that preserve global frame order within the selected channel:
stream = flow.stream_events()
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
Available projections are:
| Projection | Frames |
|---|---|
stream.events | All frames |
stream.llm | LLM frames |
stream.messages | Conversation message frames |
stream.flow | Flow frames |
stream.tools | Tool frames |
stream.interleave([...]) | A selected set of channels |
Use stream.interleave(["flow", "llm", "messages"]) when a consumer wants only some channels but still needs their relative order.
Use astream() for async consumers:
flow = ReportFlow()
stream = flow.astream()
async with stream:
async for chunk in stream.events:
print(chunk.channel, chunk.type, chunk.content)
result = stream.result
The async session has the same projections as the sync session.
llm.call(...) still returns the final assembled result. Use llm.stream_events(...) when you want to iterate over chunks as they arrive while keeping the structured event payload:
from crewai import LLM
llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
messages=[
{
"role": "user",
"content": "Explain CrewAI streaming in two short sentences.",
}
]
)
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
result = stream.result
llm.stream_events(...) temporarily enables streaming for the wrapped call and restores the LLM's previous stream setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider.
Conversational Flows can stream one user turn with stream_turn():
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
conversational = True
flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.content, end="", flush=True)
reply = stream.result
During stream_turn(), the built-in conversational answer path enables LLM token streaming for that turn and restores the LLM's previous stream setting afterward. Custom route handlers that create their own agents or LLM instances should configure those LLMs for streaming if they need token-level output.
Use the session as a context manager when possible. If a client disconnects before the stream is exhausted, close the session explicitly:
stream = flow.stream_events()
try:
for frame in stream.events:
print(frame.type)
finally:
if not stream.is_exhausted:
stream.close()
For async streams, use await stream.aclose().
Crew streaming with stream=True still returns the chunk-oriented CrewStreamingOutput API described in Streaming Crew Execution. Direct llm.call(...) still returns the final LLM result. The frame contract is intended for runtimes that need a stable event envelope across Flows, direct LLM calls, conversational turns, tools, and messages.