docs/WEBSOCKET_README.md
这是一个内置在 LangBot 中的完整 IM (即时通讯) 系统,支持:
WebSocketConnectionManager (websocket_manager.py)
WebSocketAdapter (websocket_adapter.py)
WebSocketChatRouterGroup (websocket_chat.py)
ws://localhost:8000/api/v1/pipelines/<pipeline_uuid>/ws/connect?session_type=<person|group>
参数:
pipeline_uuid: 流水线 UUID (必需)session_type: 会话类型,可选 person 或 group (默认: person)连接成功响应:
{
"type": "connected",
"connection_id": "550e8400-e29b-41d4-a716-446655440000",
"pipeline_uuid": "your-pipeline-uuid",
"session_type": "person",
"timestamp": "2025-01-28T12:00:00"
}
发送聊天消息:
{
"type": "message",
"message": [
{
"type": "Plain",
"text": "你好,这是一条测试消息"
}
]
}
发送心跳:
{
"type": "ping"
}
主动断开连接:
{
"type": "disconnect"
}
聊天响应 (流式):
{
"type": "response",
"data": {
"id": 1,
"role": "assistant",
"content": "这是机器人的回复",
"message_chain": [...],
"timestamp": "2025-01-28T12:00:00",
"is_final": false,
"connection_id": "..."
}
}
心跳响应:
{
"type": "pong",
"timestamp": "2025-01-28T12:00:00"
}
广播消息:
{
"type": "broadcast",
"message": "这是一条广播消息",
"timestamp": "2025-01-28T12:00:00"
}
错误消息:
{
"type": "error",
"message": "错误描述"
}
GET /api/v1/pipelines/<pipeline_uuid>/ws/messages/<session_type>
响应:
{
"code": 0,
"msg": "ok",
"data": {
"messages": [...]
}
}
POST /api/v1/pipelines/<pipeline_uuid>/ws/reset/<session_type>
响应:
{
"code": 0,
"msg": "ok",
"data": {
"message": "Session reset successfully"
}
}
GET /api/v1/pipelines/<pipeline_uuid>/ws/connections
响应:
{
"code": 0,
"msg": "ok",
"data": {
"stats": {
"total_connections": 5,
"pipelines": 2,
"connections_by_pipeline": {
"pipeline-1": 3,
"pipeline-2": 2
},
"connections_by_session_type": {
"person": 4,
"group": 1
}
},
"connections": [
{
"connection_id": "...",
"session_type": "person",
"created_at": "2025-01-28T12:00:00",
"last_active": "2025-01-28T12:05:00",
"is_active": true
}
]
}
}
POST /api/v1/pipelines/<pipeline_uuid>/ws/broadcast
Content-Type: application/json
{
"message": "这是一条广播消息"
}
响应:
{
"code": 0,
"msg": "ok",
"data": {
"message": "Broadcast sent successfully"
}
}
使用提供的测试客户端:
# 安装依赖
pip install websockets
# 单个连接测试
python test_websocket_client.py <pipeline_uuid>
# 指定会话类型
python test_websocket_client.py <pipeline_uuid> --session-type group
# 多连接并发测试
python test_websocket_client.py <pipeline_uuid> --multi 5
// 建立 WebSocket 连接
const ws = new WebSocket('ws://localhost:8000/api/v1/pipelines/your-pipeline-uuid/ws/connect?session_type=person');
// 连接建立
ws.onopen = () => {
console.log('WebSocket 连接已建立');
// 发送消息
ws.send(JSON.stringify({
type: 'message',
message: [
{
type: 'Plain',
text: '你好'
}
]
}));
};
// 接收消息
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'connected') {
console.log('连接成功:', data.connection_id);
} else if (data.type === 'response') {
console.log('机器人回复:', data.data.content);
if (data.data.is_final) {
console.log('响应完成');
}
} else if (data.type === 'broadcast') {
console.log('收到广播:', data.message);
}
};
// 连接关闭
ws.onclose = () => {
console.log('WebSocket 连接已关闭');
};
// 错误处理
ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
};
// 发送心跳
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000); // 每 30 秒发送一次心跳
系统支持同时建立多个 WebSocket 连接,每个连接都有唯一的 connection_id。连接按照流水线和会话类型进行分组管理。
支持流式输出,机器人的响应会分块发送,客户端可以实时显示部分响应内容。
支持 person 和 group 两种会话类型,不同类型的会话消息历史互不影响。
客户端可以定期发送 ping 消息,服务器会响应 pong,用于保持连接活跃和检测连接状态。
启用详细日志:
import logging
logging.getLogger('langbot.pkg.platform.sources.websocket_adapter').setLevel(logging.DEBUG)
logging.getLogger('langbot.pkg.platform.sources.websocket_manager').setLevel(logging.DEBUG)
logging.getLogger('langbot.pkg.api.http.controller.groups.pipelines.websocket_chat').setLevel(logging.DEBUG)