packages/sdk-python/README.md
Experimental Python SDK for programmatic access to Qwen Code through the
stream-json protocol.
pip install qwen-code-sdk
For preview releases, enable pre-release resolution:
pip install --pre qwen-code-sdk
>=3.10qwen CLI installed and available in PATHYou can also point the SDK at an explicit CLI binary or script with
path_to_qwen_executable.
Before using the SDK, verify that the CLI works in the same environment:
qwen --version
import asyncio
from qwen_code_sdk import (
is_sdk_assistant_message,
is_sdk_result_message,
query,
)
def text_from_message(message):
content = message.get("message", {}).get("content", [])
if not isinstance(content, list):
return repr(content)
texts = [
block.get("text", "")
for block in content
if isinstance(block, dict) and block.get("type") == "text"
]
return "".join(texts) if texts else "[no text content]"
def print_result(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
return
print(message.get("result", ""))
async def main() -> None:
async with query(
"List the top-level packages in this repository.",
{
"cwd": "/path/to/project",
"path_to_qwen_executable": "qwen",
},
) as result:
async for message in result:
if is_sdk_assistant_message(message):
print(text_from_message(message))
elif is_sdk_result_message(message):
print_result(message)
asyncio.run(main())
asyncio.run() is appropriate for standalone scripts. If your application
already runs an event loop, such as Jupyter, FastAPI, or pytest-asyncio, call
await main() instead.
from qwen_code_sdk import is_sdk_result_message, query_sync
with query_sync(
"Say hello",
{
"cwd": "/path/to/project",
"path_to_qwen_executable": "qwen",
},
) as result:
for message in result:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
query(prompt, options=None) -> Queryquery_sync(prompt, options=None) -> SyncQueryQuery.close(), interrupt(), set_model(), set_permission_mode()Query.supported_commands(), mcp_server_status(), get_session_id()prompt accepts either a single str or an AsyncIterable[SDKUserMessage]
for multi-turn sessions.
options = {
"cwd": "/path/to/project",
"path_to_qwen_executable": "qwen",
"model": "qwen-plus",
"permission_mode": "plan",
"max_session_turns": 1,
"env": {
"OPENAI_MODEL": "qwen-plus",
},
"timeout": {
"control_request": 60,
"can_use_tool": 60,
"stream_close": 60,
},
}
Common fields:
cwd: working directory used by the CLIpath_to_qwen_executable: qwen, an absolute binary path, or a .js CLI
bundlemodel: model override for this sessionpermission_mode: one of default, plan, auto-edit, or yolo; yolo
auto-approves all tools, so use it only in trusted or sandboxed environmentsenv: extra environment variables passed to the CLI processsystem_prompt / append_system_prompt: override or extend the system
promptcore_tools, exclude_tools, allowed_tools: constrain tool availabilitytimeout: seconds for control requests, permission callbacks, and stream
close waitsenv is merged on top of the parent process environment. Set secrets such as
OPENAI_API_KEY in the parent environment or a secrets manager rather than
hardcoding them in source.
For multi-turn use cases, pass an async iterable of SDKUserMessage objects.
Use a stable UUID for session_id when you want to correlate messages:
import asyncio
from qwen_code_sdk import SDKUserMessage, is_sdk_result_message, query
SESSION_ID = "123e4567-e89b-12d3-a456-426614174000"
async def prompts():
first: SDKUserMessage = {
"type": "user",
"session_id": SESSION_ID,
"message": {"role": "user", "content": "Create a short project summary."},
"parent_tool_use_id": None,
}
yield first
second: SDKUserMessage = {
"type": "user",
"session_id": SESSION_ID,
"message": {"role": "user", "content": "Also list the test files."},
"parent_tool_use_id": None,
}
yield second
async def main():
async with query(
prompts(),
{
"cwd": "/path/to/project",
"path_to_qwen_executable": "qwen",
"session_id": SESSION_ID,
},
) as result:
async for message in result:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
asyncio.run(main())
All messages in the async iterable must be known upfront. The SDK sends them
sequentially to the CLI but cannot feed a prior response back into the generator.
If you need conversational turn-taking, manage each turn as a separate query()
call.
import asyncio
from pathlib import Path
from qwen_code_sdk import is_sdk_result_message, query
PROJECT_ROOT = Path("/path/to/project").resolve()
def project_path(tool_name, tool_input):
key = "path" if tool_name == "list_directory" else "file_path"
raw_path = tool_input.get(key)
if not isinstance(raw_path, str) or not raw_path:
return None
resolved = (PROJECT_ROOT / raw_path).resolve()
try:
resolved.relative_to(PROJECT_ROOT)
except ValueError:
return None
return resolved
async def can_use_tool(tool_name, tool_input, context):
if tool_name in {"read_file", "list_directory", "write_file"}:
resolved = project_path(tool_name, tool_input)
if resolved is None:
return {
"behavior": "deny",
"message": "Only project-local paths are allowed",
}
if tool_name == "write_file" and resolved.suffix != ".md":
return {"behavior": "deny", "message": "Only .md files can be written"}
return {"behavior": "allow", "updatedInput": tool_input}
return {
"behavior": "deny",
"message": f"{tool_name} is not allowed by this application",
}
async def main():
async with query(
"Update README.md with a one paragraph summary.",
{
"cwd": str(PROJECT_ROOT),
"path_to_qwen_executable": "qwen",
"can_use_tool": can_use_tool,
},
) as result:
async for message in result:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
asyncio.run(main())
The callback defaults to deny. If it does not return within
timeout.can_use_tool seconds, the SDK auto-denies the tool request. The
default timeout is 60 seconds.
The context argument includes cancel_event, suggestions, and
blocked_path when the CLI provides a path-specific permission target.
can_use_tool must be an async def callback accepting
(tool_name, tool_input, context). stderr must accept a single str.
Control methods can be called while a session is active:
import asyncio
from qwen_code_sdk import is_sdk_result_message, query
async def main():
async with query(
"Inspect this project and wait for my next instruction.",
{
"cwd": "/path/to/project",
"path_to_qwen_executable": "qwen",
},
) as result:
commands = await result.supported_commands()
print(commands)
await result.set_permission_mode("plan")
await result.set_model("qwen-plus")
async for message in result:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
asyncio.run(main())
Use interrupt() to cancel the current CLI operation and close() to clean up
the underlying process.
import asyncio
from qwen_code_sdk import is_sdk_result_message, query
async def main():
# Resume a known session.
async with query(
"Continue from the previous state.",
{
"path_to_qwen_executable": "qwen",
"resume": "123e4567-e89b-12d3-a456-426614174000",
},
) as result:
async for message in result:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
asyncio.run(main())
To continue the latest session instead:
import asyncio
from qwen_code_sdk import is_sdk_result_message, query
async def main():
async with query(
"Continue the last session.",
{
"path_to_qwen_executable": "qwen",
"continue_session": True,
},
) as latest:
async for message in latest:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
asyncio.run(main())
Use only one of resume, continue_session, or session_id in a request. The
SDK raises ValidationError if these session options are combined.
ValidationError: invalid query options or malformed session identifiersControlRequestTimeoutError: CLI control operation exceeded timeoutProcessExitError: qwen exited with a non-zero codeAbortError: query or control request was cancelledfrom qwen_code_sdk import (
ProcessExitError,
ValidationError,
is_sdk_result_message,
query_sync,
)
try:
with query_sync("Say hello", {"path_to_qwen_executable": "qwen"}) as result:
for message in result:
if is_sdk_result_message(message):
if message.get("is_error"):
error = message.get("error") or {}
print(f"Error: {error.get('message', 'Unknown error')}")
else:
print(message.get("result", ""))
except ValidationError as exc:
print(f"Invalid SDK options: {exc}")
except ProcessExitError as exc:
print(f"qwen exited with {exc.exit_code}: {exc}")
0.1.x is intentionally narrow:
qwen CLI via process transportstream-json parity with the TypeScript SDK core flowSee developer documentation for more detail.