docs/developers/daemon/01-architecture.md
一个 qwen serve 进程坚持 一 daemon = 一 workspace 的不变式。它内嵌一个 Express HTTP 服务、持有一个 acp-bridge 实例、拉起一个 ACP 子进程(qwen --acp)来跑真正的 agent 运行时。多个客户端(CLI TUI、IDE companion、IM channel 机器人、Web BFF、自定义脚本)通过 HTTP + SSE 连进来,要么共享同一个 ACP session(sessionScope: 'single',默认),要么每个客户端各拿一个('thread')。
在 ACP 子进程内部,MCP server 通过 McpTransportPool(F2)实现工作区内共享:一对 (server name + 配置指纹) 对应一条 MCP transport,不管被几个 session 发现都只起一份。Bridge 的 MultiClientPermissionMediator(F3)在四种策略之一下协调多客户端的权限投票。
本文给出 系统级全景,本文档集其余 18 篇文档都挂在它下面。每条主干流程都给一张 Mermaid 时序图,单个组件的实现细节请看对应的专题文档。
flowchart LR
subgraph clients["Clients"]
WUI["Web UI
(packages/webui/src/daemon)"]
TUI["CLI TUI
(待迁移到 SDK ui/*)"]
IDE["VSCode IDE
(packages/vscode-ide-companion)"]
CH["Channel bots
(DingTalk / WeChat / Telegram)"]
SDK["Any SDK consumer
(packages/sdk-typescript/src/daemon)"]
end
subgraph daemon["qwen serve process (one workspace)"]
EXP["Express app
(packages/cli/src/serve/server.ts)"]
BR["AcpBridge
(packages/acp-bridge/src/bridge.ts)"]
MED["MultiClientPermissionMediator
(F3)"]
EB["EventBus per session
(eventBus.ts)"]
FS["WorkspaceFileSystem
(cli/src/serve/fs/)"]
end
subgraph child["ACP child process (qwen --acp)"]
AGT["QwenAgent runtime"]
POOL["McpTransportPool
(F2, core/src/tools)"]
BDG["WorkspaceMcpBudget"]
end
subgraph external["External"]
MCP1["MCP server A
(stdio)"]
MCP2["MCP server B
(websocket)"]
end
WUI -- "HTTP+SSE" --> EXP
TUI -- "HTTP+SSE" --> EXP
IDE -- "HTTP+SSE (loopback)" --> EXP
CH -- "HTTP+SSE" --> EXP
SDK -- "HTTP+SSE" --> EXP
EXP --> BR
BR --> MED
BR --> EB
EXP --> FS
BR -- "ACP NDJSON over stdio" --> AGT
AGT --> POOL
POOL --> BDG
POOL -- "shared transport" --> MCP1
POOL -- "shared transport" --> MCP2
要点:
AcpChannel 连接,默认是真实的子进程 + 一对管道;inMemoryChannel 用于测试。flowchart TB
subgraph serve["packages/cli/src/serve"]
RQS["runQwenServe.ts
(bootstrap)"]
SRV["server.ts (Express)"]
CAP["capabilities.ts"]
AUTH["auth.ts"]
FSM["fs/ (sandbox)"]
DSP["daemonStatusProvider.ts"]
end
subgraph br["packages/acp-bridge"]
BR2["bridge.ts"]
BC2["bridgeClient.ts"]
EB2["eventBus.ts"]
MED2["permissionMediator.ts"]
ST2["status.ts"]
CH2["channel.ts / spawnChannel.ts"]
end
subgraph core["packages/core/src/tools"]
POOL2["mcp-transport-pool.ts"]
ENT["mcp-pool-entry.ts"]
WBG["mcp-workspace-budget.ts"]
SMV["session-mcp-view.ts"]
end
subgraph sdk["packages/sdk-typescript/src/daemon"]
DC["DaemonClient.ts"]
DSC["DaemonSessionClient.ts"]
EVT["events.ts"]
SSE["sse.ts"]
AUTHF["DaemonAuthFlow.ts"]
UI["ui/* (#4328 + #4353)
normalizer / transcript / store / render"]
end
subgraph adapters["Adapters"]
WUIP["webui/src/daemon/
DaemonSessionProvider.tsx"]
CHB["channels/base/
DaemonChannelBridge.ts"]
DT["channels/dingtalk"]
WX["channels/weixin"]
TG["channels/telegram"]
IDEA["vscode-ide-companion/
daemonIdeConnection.ts"]
end
RQS --> SRV
RQS --> CAP
RQS --> AUTH
RQS --> FSM
RQS --> BR2
BR2 --> BC2
BR2 --> EB2
BR2 --> MED2
BR2 --> CH2
BR2 -.spawns.-> core
POOL2 --> ENT
POOL2 --> WBG
POOL2 --> SMV
WUIP --> DSC
WUIP --> UI
CHB --> DSC
DT --> CHB
WX --> CHB
TG --> CHB
IDEA --> DSC
DSC --> DC
DC --> EVT
DC --> SSE
DC --> AUTHF
UI --> EVT
箭头方向表示编译期依赖:serve/ 启动层依赖 @qwen-code/acp-bridge,bridge 包本身不反向 import serve/。HTTP、auth、filesystem adapter 留在 CLI 包里,ACP session 生命周期和权限协调留在 bridge 包里。
记住三条信任边界:
serve/auth.ts 中间件链。sequenceDiagram
autonumber
participant C as Client (SDK)
participant MW as Middleware
(CORS→host→log→bearer→rate-limit→JSON→telemetry)
participant R as Route handler
participant BR as AcpBridge
participant BC as BridgeClient
participant CH as ACP child
C->>MW: POST /session/:id/prompt
Authorization: Bearer …
X-Qwen-Client-Id: …
MW->>MW: deny/allow Origin CORS
MW->>MW: hostAllowlist (DNS rebinding guard)
MW->>MW: access-log hook (if daemonLog enabled)
MW->>MW: bearerAuth (constant-time compare)
MW->>MW: rateLimit (when enabled)
MW->>MW: express.json body parser
MW->>MW: daemonTelemetryMiddleware
MW->>MW: mutationGate (strict on mutating routes)
MW->>R: req validated
R->>BR: bridge.sendPrompt(sessionId, body, clientId)
BR->>BC: client.sendPrompt(sessionId, …)
BC->>CH: ACP JSON-RPC over stdin
CH-->>BC: ACP response / notifications
BC-->>BR: result
BR-->>R: result
R-->>C: 200 JSON
非流式路由(prompt、cancel、model 切换、metadata、workspace CRUD)以一次 JSON 响应结束。流式输出不是在该 HTTP 连接上以分块方式返回,而是走 SSE 通道;见流程 2。
sequenceDiagram
autonumber
participant C as Client
participant SR as GET /session/:id/events
participant EB as EventBus
(per session)
participant BC as BridgeClient
participant CH as ACP child
C->>SR: GET …/events
Last-Event-ID: 42 (optional)
SR->>EB: subscribe(lastSeenId=42, maxQueued=N)
EB-->>SR: replay frames 43..currentTail
(from ring buffer)
SR-->>C: NDJSON: id=43, type=session_update, …
CH-->>BC: ACP notification (e.g. agent_message_chunk)
BC->>EB: publish({type, data})
EB-->>SR: enqueue id=N
SR-->>C: id=N, type=…, data=…
Note over EB,SR: If subscriber queue >= maxQueued,
EventBus emits client_evicted terminal frame
and closes subscriber.
要点:
eventRingSize,默认 8000)。Last-Event-ID 已经落出环外,会收到合成 catch-up 信号,必须用 loadSession / resumeSession 重建更深层状态。slow_client_warning,达到上限时收到 client_evicted(终态)后被关掉。sequenceDiagram
autonumber
participant CH as ACP child (agent)
participant BC as BridgeClient.requestPermission
participant MED as Mediator (policy)
participant EB as EventBus
participant C1 as Client A
(originator)
participant C2 as Client B
CH->>BC: ACP requestPermission(requestId, options)
BC->>MED: request({requestId, sessionId, originatorClientId, allowedOptionIds}, timeoutMs)
MED->>EB: publish permission_request
(broadcast to subscribers)
EB-->>C1: SSE permission_request
EB-->>C2: SSE permission_request
alt first-responder
C2->>MED: POST /permission/:requestId optionId=allow
MED-->>BC: resolved
BC-->>CH: ACP response
MED->>EB: permission_resolved
C1->>MED: POST /permission/:requestId (late vote)
MED-->>C1: 409 permission_already_resolved
else designated
C2->>MED: vote (clientId != originatorClientId)
MED-->>C2: 403 permission_forbidden
C1->>MED: vote (matches originator)
MED-->>BC: resolved
else consensus (N-of-M)
C1->>MED: vote
MED->>EB: permission_partial_vote (1/N)
C2->>MED: vote
MED->>EB: permission_partial_vote (2/N)
Note over MED: when tally reaches quorum on one option, resolve
else local-only
C2->>MED: vote (remote)
MED-->>C2: 403 permission_forbidden (remote_not_allowed)
Note over MED,CH: blocks until a loopback voter resolves it
end
跨策略「逃生口」:任何客户端都可以投 CANCEL_VOTE_SENTINEL 把请求短路成 cancelled / agent_cancelled。bridge 防止 wire 端通过普通 optionId 字段偷偷塞这个哨兵(InvalidPermissionOptionError)。
四种策略一句话:
first-responder — 第一个有效投票获胜(默认,保留 live 协作 UX)。designated — 只有 originatorClientId 能投,其他客户端收 permission_forbidden。consensus — N-of-M 法定人数,过程中发 permission_partial_vote 让 UI 渲染进度。local-only — 拒绝任何 HTTP 投票,只接受 loopback。sequenceDiagram
autonumber
participant S as Session in ACP child
participant P as McpTransportPool
participant SIF as spawnInFlight (dedup)
participant E as PoolEntry
participant BDG as WorkspaceMcpBudget
participant SRV as MCP server
S->>P: acquire(name, cfg, sessionId)
P->>SIF: check inflight for (name+fingerprint)
alt cached inflight
SIF-->>P: existing promise
else cold start
P->>BDG: tryReserve(name)
BDG-->>P: ok / refused
alt refused
P-->>S: BudgetExhaustedError
else ok
P->>E: new PoolEntry(...)
E->>SRV: connect transport
SRV-->>E: ready
E-->>P: connected
end
end
P->>P: sessionToEntries.add(sessionId, id)
P-->>S: PooledConnection
Note over S,P: Session uses entry, then…
S->>P: release(id, sessionId)
P->>E: detach session
E->>E: arm drain timer (default 30s)
Note over E: refs==0 → drain timer fires → close transport
(MAX_IDLE_MS 5min hard cap survives flap)
Note over S,P: Operator restart flow…
S->>P: restartByName(name, opts?)
P->>E: drain + close
P->>E: spawn replacement
E->>SRV: reconnect
P->>EB: publish mcp_server_restarted
要点:
releaseSession(sessionId) 借助 sessionToEntries 反向索引,以 O(refs) 释放该 session 持有的所有条目。drainAll() 置 draining 标志(拒绝新的 acquire),并以可配置超时等待所有条目关闭。restartByName 可以接 entryIndex 来精确重启某条;池里同名多条目时返回 {entries: RestartResult[]} 形状。sequenceDiagram
autonumber
participant Op as Operator (signal)
participant RQS as runQwenServe
participant APP as Express app
participant BR as AcpBridge
participant CH as ACP child
Op->>RQS: qwen serve --workspace … --token …
RQS->>RQS: validate flags + canonicalize workspace
RQS->>RQS: allocate PermissionAuditRing
RQS->>BR: createHttpAcpBridge(options)
RQS->>APP: createServeApp(bridge, …)
RQS->>APP: listen(host, port)
RQS->>RQS: arm SIGINT / SIGTERM handlers
Op->>RQS: SIGTERM
RQS->>BR: dispose device-flow registry
RQS->>BR: bridge.shutdown()
BR->>CH: send graceful close (10s deadline)
CH-->>BR: exit
RQS->>APP: server.close() (5s force-close timer)
APP->>APP: closeAllConnections() (+2s secondary)
Note over Op,RQS: Second SIGTERM during shutdown →
bridge.killAllSync() + process.exit(1) (orphan prevention)
为什么要分两阶段:
bridge.killAllSync() + process.exit(1),防孤儿。| 关注点 | 文件 |
|---|---|
| Bootstrap | packages/cli/src/serve/runQwenServe.ts |
| Express 应用 | packages/cli/src/serve/server.ts |
| 能力注册表 | packages/cli/src/serve/capabilities.ts |
| Auth 中间件 | packages/cli/src/serve/auth.ts |
| Bridge | packages/acp-bridge/src/bridge.ts |
| BridgeClient | packages/acp-bridge/src/bridgeClient.ts |
| 权限协调器 | packages/acp-bridge/src/permissionMediator.ts |
| EventBus | packages/acp-bridge/src/eventBus.ts |
| MCP transport 池 | packages/core/src/tools/mcp-transport-pool.ts |
| Workspace MCP 预算 | packages/core/src/tools/mcp-workspace-budget.ts |
| Workspace 文件系统 | packages/cli/src/serve/fs/ |
| SDK DaemonClient | packages/sdk-typescript/src/daemon/DaemonClient.ts |
| SDK SessionClient | packages/sdk-typescript/src/daemon/DaemonSessionClient.ts (61-385) |
| 事件 schema | packages/sdk-typescript/src/daemon/events.ts (14-112) |
../../users/qwen-serve.md。../qwen-serve-protocol.md。../../design/f2-mcp-transport-pool.md。