Back to Crewai

Consuming Streams

docs/edge/en/learn/consuming-streams.mdx

1.15.2a24.4 KB
Original Source

Overview

Use this guide when you want to subscribe to a CrewAI stream and print or route frames as they arrive.

The basic pattern is:

python
stream = flow.stream_events(inputs={"topic": "AI agents"})

with stream:
    for frame in stream:
        ...

result = stream.result

Always consume the stream before reading stream.result.

If you only care about text generated by LLM calls, subscribe to the llm projection and print frame.content:

python
stream = flow.stream_events(inputs={"topic": "AI agents"})

with stream:
    for frame in stream.llm:
        print(frame.content, end="", flush=True)

print()
result = stream.result

frame.content is an empty string for frames that do not carry printable text, so this is also safe:

python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
    for frame in stream.events:
        if frame.channel == "llm" and frame.content:
            print(frame.content, end="", flush=True)

result = stream.result

Tool events arrive on the tools channel. Use frame.type to distinguish starts, finishes, and errors.

python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
    for frame in stream.events:
        if frame.channel == "llm" and frame.content:
            print(frame.content, end="", flush=True)

        if frame.channel == "tools" and frame.type == "tool_usage_started":
            print(f"\nTool started: {frame.event.get('tool_name')}")

        if frame.channel == "tools" and frame.type == "tool_usage_finished":
            print(f"\nTool finished: {frame.event.get('tool_name')}")

result = stream.result

frame.event is the structured payload for the source event. Use it for metadata such as tool names, arguments, message roles, and runtime identifiers.

Watch Flow Progress

Flow lifecycle and method execution frames arrive on the flow channel:

python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
    for frame in stream.flow:
        print(frame.type, frame.namespace)

result = stream.result

Use this when you want a progress log instead of token-level output.

Interleave Selected Channels

Use interleave() when you want a subset of channels while preserving their relative order:

python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
    for frame in stream.interleave(["llm", "tools"]):
        if frame.channel == "llm":
            print(frame.content, end="", flush=True)
        elif frame.type == "tool_usage_started":
            print(f"\nTool: {frame.event.get('tool_name')}")

result = stream.result

Stream a Direct LLM Call

Direct llm.call(...) returns the final assembled result. To stream a direct LLM call, use llm.stream_events(...):

python
from crewai import LLM


llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events("Explain streaming in one sentence.")

with stream:
    for frame in stream.llm:
        print(frame.content, end="", flush=True)

print()
result = stream.result

Stream a Conversational Turn

Conversational Flows expose stream_turn() for one user message:

python
stream = flow.stream_turn(
    "What can you help me with?",
    session_id="session-1",
)

with stream:
    for frame in stream.interleave(["llm", "messages"]):
        if frame.channel == "llm":
            print(frame.content, end="", flush=True)
        elif frame.channel == "messages":
            print(f"\n{frame.event.get('role')}: {frame.event.get('content')}")

reply = stream.result

Async Consumers

Async streams use the same channel projections:

python
stream = flow.astream(inputs={"topic": "AI agents"})

async with stream:
    async for frame in stream.llm:
        print(frame.content, end="", flush=True)

result = stream.result

Cleanup

Use the stream as a context manager when possible. If a client disconnects or you stop consuming early, close the stream:

python
stream = flow.stream_events(inputs={"topic": "AI agents"})

try:
    for frame in stream.events:
        print(frame.content, end="", flush=True)
finally:
    if not stream.is_exhausted:
        stream.close()

For async streams, call await stream.aclose().

See Also