docs/MIGRATION_SUMMARY.md
已完全移除旧的基于SSE的WebChat系统,并替换为基于WebSocket的双向实时通信系统。这是一个内置在LangBot中的完整IM系统,支持流式输出。
src/langbot/pkg/api/http/controller/groups/pipelines/webchat.py - 旧的SSE路由src/langbot/pkg/platform/sources/webchat.py - 旧的WebChat适配器src/langbot/pkg/platform/sources/webchat.yaml - 旧的配置文件1. WebSocket连接管理器
src/langbot/pkg/platform/sources/websocket_manager.py
2. WebSocket适配器
src/langbot/pkg/platform/sources/websocket_adapter.py
reply_message_chunk 方法)3. WebSocket路由控制器
src/langbot/pkg/api/http/controller/groups/pipelines/websocket_chat.py
4. 配置文件
src/langbot/pkg/platform/sources/websocket.yaml
1. WebSocket客户端
web/src/app/infra/websocket/WebSocketClient.ts
2. 更新的组件
web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx
3. HTTP客户端更新
web/src/app/infra/http/BackendClient.ts
Python测试客户端
test_websocket_client.py
使用文档
WEBSOCKET_README.md
1. botmgr.py
webchat_proxy_botwebsocket_proxy_botwebsocket而非webchat)2. 适配器注册
# 旧代码(已删除)
webchat_adapter_class = self.adapter_dict['webchat']
self.webchat_proxy_bot = RuntimeBot(...)
# 新代码
websocket_adapter_class = self.adapter_dict['websocket']
self.websocket_proxy_bot = RuntimeBot(
uuid='websocket-proxy-bot',
name='WebSocket',
adapter='websocket',
...
)
1. API调用完全更换
旧代码(已删除):
// SSE流式请求
await fetch(url, {
method: 'POST',
body: JSON.stringify({ is_stream: true })
})
// 手动解析 text/event-stream
新代码:
// WebSocket实时通信
const wsClient = new WebSocketClient(pipelineId, sessionType);
await wsClient.connect();
wsClient.onMessage((message) => {
// 流式消息自动处理
setMessages(prev => [...prev, message]);
});
wsClient.sendMessage(messageChain);
2. 连接状态管理
新增功能:
3. 流式支持
完整的流式消息处理:
wsClient.onMessage((message) => {
if (message.is_final) {
// 最终消息
finalizeBotMessage(message);
} else {
// 中间消息块,实时更新UI
updateBotMessage(message);
}
});
连接
ws://localhost:8000/api/v1/pipelines/<pipeline_uuid>/ws/connect?session_type=<person|group>
消息格式
客户端发送:
{
"type": "message",
"message": [
{"type": "Plain", "text": "你好"}
]
}
服务器响应(流式):
{
"type": "response",
"data": {
"id": 1,
"role": "assistant",
"content": "你好,我是...",
"is_final": false,
"timestamp": "2025-01-28T..."
}
}
| 端点 | 方法 | 说明 |
|---|---|---|
/api/v1/pipelines/<uuid>/ws/messages/<type> | GET | 获取消息历史 |
/api/v1/pipelines/<uuid>/ws/reset/<type> | POST | 重置会话 |
/api/v1/pipelines/<uuid>/ws/connections | GET | 获取连接统计 |
/api/v1/pipelines/<uuid>/ws/broadcast | POST | 广播消息 |
WebSocket Adapter
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
quote_origin: bool = False,
is_final: bool = False,
) -> dict:
"""回复消息块 - 流式"""
message_data = WebSocketMessage(
id=-1,
role='assistant',
content=str(message),
message_chain=[component.__dict__ for component in message],
timestamp=datetime.now().isoformat(),
is_final=is_final and bot_message.tool_calls is None,
)
# 发送到队列,由WebSocket连接处理发送
await session.resp_queues[message_id].put(message_data)
return message_data.model_dump()
async def is_stream_output_supported(self) -> bool:
"""WebSocket始终支持流式输出"""
return True
DebugDialog组件
wsClient.onMessage((message) => {
setMessages((prevMessages) => {
const existingIndex = prevMessages.findIndex(
(msg) => msg.role === 'assistant' && msg.content === 'Generating...'
);
if (existingIndex !== -1) {
// 更新正在生成的消息
const updatedMessages = [...prevMessages];
updatedMessages[existingIndex] = message;
return updatedMessages;
} else {
// 添加新消息
return [...prevMessages, message];
}
});
});
此次迁移完全不兼容旧的WebChat系统:
API端点变更
/api/v1/pipelines/<uuid>/chat/sendws://.../<uuid>/ws/connect通信协议变更
流式实现变更
text/event-stream 格式使用新系统需要:
| 特性 | 旧WebChat (SSE) | 新WebSocket |
|---|---|---|
| 双向通信 | ❌ 单向(服务器→客户端) | ✅ 双向 |
| 主动推送 | ❌ 不支持 | ✅ 支持 |
| 连接管理 | ❌ 无状态 | ✅ 有状态,完整生命周期 |
| 流式输出 | ✅ 支持 | ✅ 支持(更优) |
| 心跳机制 | ❌ 无 | ✅ 30秒心跳 |
| 自动重连 | ❌ 无 | ✅ 最多5次 |
| 多连接 | ⚠️ 难以管理 | ✅ 完整支持 |
| 连接状态 | ❌ 不可见 | ✅ 实时显示 |
| 广播功能 | ❌ 不支持 | ✅ 支持 |
# 单连接测试
python test_websocket_client.py <pipeline_uuid>
# 指定会话类型
python test_websocket_client.py <pipeline_uuid> --session-type group
# 多连接并发测试(5个连接)
python test_websocket_client.py <pipeline_uuid> --multi 5
const ws = new WebSocket('ws://localhost:8000/api/v1/pipelines/<uuid>/ws/connect?session_type=person');
ws.onopen = () => {
console.log('已连接');
ws.send(JSON.stringify({
type: 'message',
message: [{type: 'Plain', text: '你好'}]
}));
};
ws.onmessage = (event) => {
console.log('收到:', JSON.parse(event.data));
};
A: 根据需求,不需要考虑任何对老版本的兼容性,彻底迁移可以避免代码冗余和维护负担。
A:
reply_message_chunk发送消息块is_final=true表示最后一块A:
A:
/api/v1/pipelines/<uuid>/ws/broadcast APIonBroadcast回调接收✅ 完成的工作
✅ 核心特性
✅ 技术亮点
🎉 系统已完全迁移到WebSocket,无任何旧代码遗留!