libs/sdk-py/MIGRATION.md
client.runs.stream() (v2) remains fully supported. This guide covers how to
adopt the new client.threads.stream() (v3) surface when you want typed
projections, shared SSE fan-out, or WebSocket transport.
v2 — client.runs.stream()
from langgraph_sdk import get_client
client = get_client()
thread = await client.threads.create()
async for chunk in client.runs.stream(
thread["thread_id"],
"agent",
input={"messages": [{"role": "user", "content": "hello"}]},
stream_mode="messages",
):
print(chunk.event, chunk.data)
v3 — client.threads.stream()
from langgraph_sdk import get_client
import asyncio
client = get_client()
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input={"messages": [{"role": "user", "content": "hello"}]})
async for stream in thread.messages:
print(await stream.text)
v2 client.runs.stream() | v3 client.threads.stream() | |
|---|---|---|
| Thread creation | Explicit client.threads.create() | Lazy (minted client-side if omitted) |
| Connection per run | Yes | No — shared SSE for the session |
| Typed projections | No (raw StreamPart) | Yes (messages, tool_calls, values, …) |
| Subgraph streaming | Not supported | thread.subgraphs / thread.subagents |
| WebSocket transport | No | Yes (transport="websocket", async only) |
| Interrupt handling | Manual polling | thread.interrupted / thread.run.respond() |
| Terminal state | Included in stream | await thread.output |
async with client.threads.stream(
thread_id="existing-thread-id",
assistant_id="agent",
) as thread:
# If the run already completed, thread.output resolves immediately.
result = await thread.output
All projections share one SSE connection. Use asyncio.gather (or
asyncio.TaskGroup) to start multiple consumers before any single projection
has finished — the fan-out task routes events to all subscribers in parallel.
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input={"messages": [{"role": "user", "content": "hi"}]})
async def collect_messages():
return [s async for s in thread.messages]
async def collect_tool_calls():
return [c async for c in thread.tool_calls]
messages, tool_calls = await asyncio.gather(
collect_messages(),
collect_tool_calls(),
)
async with client.threads.stream(assistant_id="agent") as thread:
await thread.run.start(input={"messages": [{"role": "user", "content": "book a flight"}]})
# Wait for the run to pause at an interrupt node.
# thread.interrupted becomes True when input.requested arrives.
while not thread.interrupted:
await asyncio.sleep(0.1)
# Resume with a human response (unambiguous when only one interrupt is outstanding).
await thread.run.respond("yes, confirm booking")
result = await thread.output
The sync client mirrors the async API without async/await:
from langgraph_sdk import get_sync_client
client = get_sync_client()
with client.threads.stream(assistant_id="agent") as thread:
thread.run.start(input={"messages": [{"role": "user", "content": "hello"}]})
for stream in thread.messages:
print(stream.text)
The sync client uses SSE only (transport="websocket" is not supported).