Back to Langbot

WebChat 到 WebSocket 迁移总结

docs/MIGRATION_SUMMARY.md

4.9.69.3 KB
Original Source

WebChat 到 WebSocket 迁移总结

概述

已完全移除旧的基于SSE的WebChat系统,并替换为基于WebSocket的双向实时通信系统。这是一个内置在LangBot中的完整IM系统,支持流式输出。

已删除的文件

后端

  • src/langbot/pkg/api/http/controller/groups/pipelines/webchat.py - 旧的SSE路由
  • src/langbot/pkg/platform/sources/webchat.py - 旧的WebChat适配器
  • src/langbot/pkg/platform/sources/webchat.yaml - 旧的配置文件

前端

  • ❌ BackendClient中所有SSE相关代码已完全移除
  • ❌ DebugDialog中所有SSE相关逻辑已完全替换

新增的文件

后端核心文件

1. WebSocket连接管理器

src/langbot/pkg/platform/sources/websocket_manager.py
  • 管理所有并发WebSocket连接
  • 线程安全的连接池
  • 按流水线、会话类型分组
  • 广播和单播消息功能
  • 连接统计和监控

2. WebSocket适配器

src/langbot/pkg/platform/sources/websocket_adapter.py
  • 实现平台适配器接口
  • 完整流式支持 (reply_message_chunk 方法)
  • 双向消息流处理
  • 消息历史管理
  • 会话管理

3. WebSocket路由控制器

src/langbot/pkg/api/http/controller/groups/pipelines/websocket_chat.py
  • WebSocket端点处理
  • REST API接口
  • 心跳机制
  • 连接生命周期管理

4. 配置文件

src/langbot/pkg/platform/sources/websocket.yaml
  • WebSocket适配器元数据

前端核心文件

1. WebSocket客户端

web/src/app/infra/websocket/WebSocketClient.ts
  • WebSocket连接管理
  • 自动重连(最多5次)
  • 心跳机制(30秒)
  • 事件回调系统

2. 更新的组件

web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx
  • 完全重写,使用WebSocket
  • 实时连接状态显示
  • 流式消息支持
  • 自动重连

3. HTTP客户端更新

web/src/app/infra/http/BackendClient.ts
  • 移除所有旧的WebChat API
  • 仅保留WebSocket API

测试工具

Python测试客户端

test_websocket_client.py
  • 单连接交互测试
  • 多连接并发测试
  • 命令行工具

文档

使用文档

WEBSOCKET_README.md
  • 完整的API文档
  • 架构说明
  • 使用示例
  • 故障排查

核心变更

后端变更

1. botmgr.py

  • ❌ 移除 webchat_proxy_bot
  • ✅ 仅保留 websocket_proxy_bot
  • ✅ 更新适配器过滤逻辑(排除websocket而非webchat

2. 适配器注册

python
# 旧代码(已删除)
webchat_adapter_class = self.adapter_dict['webchat']
self.webchat_proxy_bot = RuntimeBot(...)

# 新代码
websocket_adapter_class = self.adapter_dict['websocket']
self.websocket_proxy_bot = RuntimeBot(
    uuid='websocket-proxy-bot',
    name='WebSocket',
    adapter='websocket',
    ...
)

前端变更

1. API调用完全更换

旧代码(已删除):

typescript
// SSE流式请求
await fetch(url, {
  method: 'POST',
  body: JSON.stringify({ is_stream: true })
})
// 手动解析 text/event-stream

新代码:

typescript
// WebSocket实时通信
const wsClient = new WebSocketClient(pipelineId, sessionType);
await wsClient.connect();

wsClient.onMessage((message) => {
  // 流式消息自动处理
  setMessages(prev => [...prev, message]);
});

wsClient.sendMessage(messageChain);

2. 连接状态管理

新增功能:

  • ✅ 实时连接状态指示器(绿色/红色圆点)
  • ✅ 连接/断开toast提示
  • ✅ 自动重连逻辑
  • ✅ 心跳保活

3. 流式支持

完整的流式消息处理:

typescript
wsClient.onMessage((message) => {
  if (message.is_final) {
    // 最终消息
    finalizeBotMessage(message);
  } else {
    // 中间消息块,实时更新UI
    updateBotMessage(message);
  }
});

API对比

WebSocket端点

连接

ws://localhost:8000/api/v1/pipelines/<pipeline_uuid>/ws/connect?session_type=<person|group>

消息格式

客户端发送:

json
{
  "type": "message",
  "message": [
    {"type": "Plain", "text": "你好"}
  ]
}

服务器响应(流式):

json
{
  "type": "response",
  "data": {
    "id": 1,
    "role": "assistant",
    "content": "你好,我是...",
    "is_final": false,
    "timestamp": "2025-01-28T..."
  }
}

REST API

端点方法说明
/api/v1/pipelines/<uuid>/ws/messages/<type>GET获取消息历史
/api/v1/pipelines/<uuid>/ws/reset/<type>POST重置会话
/api/v1/pipelines/<uuid>/ws/connectionsGET获取连接统计
/api/v1/pipelines/<uuid>/ws/broadcastPOST广播消息

流式支持详解

后端流式实现

WebSocket Adapter

python
async def reply_message_chunk(
    self,
    message_source: platform_events.MessageEvent,
    bot_message,
    message: platform_message.MessageChain,
    quote_origin: bool = False,
    is_final: bool = False,
) -> dict:
    """回复消息块 - 流式"""
    message_data = WebSocketMessage(
        id=-1,
        role='assistant',
        content=str(message),
        message_chain=[component.__dict__ for component in message],
        timestamp=datetime.now().isoformat(),
        is_final=is_final and bot_message.tool_calls is None,
    )

    # 发送到队列,由WebSocket连接处理发送
    await session.resp_queues[message_id].put(message_data)
    return message_data.model_dump()

async def is_stream_output_supported(self) -> bool:
    """WebSocket始终支持流式输出"""
    return True

前端流式处理

DebugDialog组件

typescript
wsClient.onMessage((message) => {
  setMessages((prevMessages) => {
    const existingIndex = prevMessages.findIndex(
      (msg) => msg.role === 'assistant' && msg.content === 'Generating...'
    );

    if (existingIndex !== -1) {
      // 更新正在生成的消息
      const updatedMessages = [...prevMessages];
      updatedMessages[existingIndex] = message;
      return updatedMessages;
    } else {
      // 添加新消息
      return [...prevMessages, message];
    }
  });
});

兼容性说明

⚠️ 不兼容旧版本

此次迁移完全不兼容旧的WebChat系统:

  1. API端点变更

    • 旧: /api/v1/pipelines/<uuid>/chat/send
    • 新: ws://.../<uuid>/ws/connect
  2. 通信协议变更

    • 旧: HTTP + SSE (Server-Sent Events)
    • 新: WebSocket (双向)
  3. 流式实现变更

    • 旧: text/event-stream 格式
    • 新: WebSocket JSON消息

迁移要求

使用新系统需要:

  1. ✅ 前端必须支持WebSocket
  2. ✅ 后端必须运行新的WebSocket适配器
  3. ✅ 清除旧的WebChat相关配置

优势对比

特性旧WebChat (SSE)新WebSocket
双向通信❌ 单向(服务器→客户端)✅ 双向
主动推送❌ 不支持✅ 支持
连接管理❌ 无状态✅ 有状态,完整生命周期
流式输出✅ 支持✅ 支持(更优)
心跳机制❌ 无✅ 30秒心跳
自动重连❌ 无✅ 最多5次
多连接⚠️ 难以管理✅ 完整支持
连接状态❌ 不可见✅ 实时显示
广播功能❌ 不支持✅ 支持

测试方式

1. Python测试客户端

bash
# 单连接测试
python test_websocket_client.py <pipeline_uuid>

# 指定会话类型
python test_websocket_client.py <pipeline_uuid> --session-type group

# 多连接并发测试(5个连接)
python test_websocket_client.py <pipeline_uuid> --multi 5

2. 前端测试

  1. 启动LangBot服务器
  2. 访问前端界面
  3. 打开流水线调试对话框
  4. 观察连接状态指示器(左下角圆点)
  5. 发送消息测试流式响应

3. 浏览器控制台测试

javascript
const ws = new WebSocket('ws://localhost:8000/api/v1/pipelines/<uuid>/ws/connect?session_type=person');

ws.onopen = () => {
  console.log('已连接');
  ws.send(JSON.stringify({
    type: 'message',
    message: [{type: 'Plain', text: '你好'}]
  }));
};

ws.onmessage = (event) => {
  console.log('收到:', JSON.parse(event.data));
};

常见问题

Q: 为什么完全删除旧代码而不保留兼容性?

A: 根据需求,不需要考虑任何对老版本的兼容性,彻底迁移可以避免代码冗余和维护负担。

Q: 流式输出如何工作?

A:

  1. 后端通过reply_message_chunk发送消息块
  2. 消息块放入队列
  3. WebSocket连接从队列取出并发送
  4. 前端实时更新UI
  5. is_final=true表示最后一块

Q: 如何确保连接不断开?

A:

  1. 客户端每30秒发送心跳(ping)
  2. 服务器响应pong
  3. 连接断开时自动重连(最多5次)

Q: 如何实现后端主动推送?

A:

  1. 调用 /api/v1/pipelines/<uuid>/ws/broadcast API
  2. 消息会被推送到该流水线的所有连接
  3. 前端通过onBroadcast回调接收

总结

完成的工作

  • 完全移除旧的WebChat/SSE系统
  • 实现完整的WebSocket双向通信系统
  • 支持流式输出
  • 支持多连接并发
  • 实现自动重连和心跳机制
  • 提供完整的测试工具和文档

核心特性

  • 双向实时通信
  • 流式消息支持
  • 多连接管理
  • 自动重连
  • 心跳保活
  • 连接状态可视化
  • 广播消息

技术亮点

  • 异步架构(asyncio)
  • 线程安全的连接管理
  • 独立的消息队列
  • 完整的错误处理
  • 模块化设计

🎉 系统已完全迁移到WebSocket,无任何旧代码遗留!