docs/IM集成开发文档.md
WeKnora 的 IM 集成模块将企业即时通讯平台(企业微信、飞书、Slack、Telegram、钉钉、Mattermost)接入 WeKnora 知识问答管道,支持在 IM 中直接向 AI 提问并获得实时流式回答。
IM 渠道绑定到 Agent,一个 Agent 可接入多个 IM 渠道,所有配置通过前端 Agent 编辑器管理,存储在数据库中。
企业微信提供两种接入模式,根据你的应用类型选择:
无需公网域名,适合快速验证和内网部署。
第一步:创建智能机器人
第二步:在 WeKnora 中添加 IM 渠道
第三步:验证
保存后 WeKnora 会自动建立到企业微信的 WebSocket 长连接。日志中出现以下内容表示连接成功:
[IM] WeCom WebSocket connecting (bot_id=xxx)...
此时在企业微信中给机器人发消息即可收到 AI 回复。
需要公网可达的回调地址,适合已有自建应用的场景。
第一步:创建自建应用
第二步:在 WeKnora 中添加 IM 渠道
https://你的域名/api/v1/im/callback/{channel_id}第三步:配置企业微信接收消息
第四步:配置可信域名(可选)
如需在群聊中使用,在应用详情页 → 网页授权及 JS-SDK 中添加可信域名。
飞书同样提供两种模式,WebSocket 模式配置更简单。
无需公网域名,无需配置事件加密。
第一步:创建飞书应用
第二步:开通权限与事件
{
"scopes": {
"tenant": [
"aily:file:read",
"aily:file:write",
"application:application.app_message_stats.overview:readonly",
"application:application:self_manage",
"application:bot.menu:write",
"cardkit:card:write",
"contact:user.employee_id:readonly",
"corehr:file:download",
"docs:document.content:read",
"event:ip_list",
"im:chat",
"im:chat.access_event.bot_p2p_chat:read",
"im:chat.members:bot_access",
"im:message",
"im:message.group_at_msg:readonly",
"im:message.group_msg",
"im:message.p2p_msg:readonly",
"im:message:readonly",
"im:message:send_as_bot",
"im:resource",
"sheets:spreadsheet",
"wiki:wiki:readonly"
],
"user": [
"aily:file:read",
"aily:file:write",
"im:chat.access_event.bot_p2p_chat:read"
]
}
}
im.message.receive_v1(接收消息)第三步:发布应用
在 版本管理与发布 中创建版本并提交审核。审核通过后用户才能与机器人交互。
第四步:在 WeKnora 中添加 IM 渠道
启动后日志出现以下内容表示连接成功:
[IM] Feishu WebSocket connecting (app_id=xxx)...
需要公网可达的回调地址。
前置步骤同上(创建应用、开通权限),额外需要:
第一步:在 WeKnora 中添加 IM 渠道
第二步:配置飞书事件订阅
im.message.receive_v1Slack 提供两种接入模式,推荐使用 WebSocket (Socket Mode) 模式,无需公网域名。
无需公网域名,适合快速验证和内网部署。
第一步:创建 Slack App
第二步:生成 App-Level Token
connections:write scope。xapp- 开头),这就是 App Token。第三步:开启 Socket Mode
第四步:配置 Event Subscriptions
app_mention (在频道中 @ 机器人)message.channels (频道消息)message.groups (私有频道消息)message.im (私聊消息)message.mpim (多人私聊消息)第五步:配置权限 (OAuth & Permissions)
app_mentions:readchannels:historychat:writegroups:historyim:historympim:historyfiles:read (用于接收文件)xoxb- 开头),这就是 Bot Token。第六步:在 WeKnora 中添加 IM 渠道
xapp- 开头的 Tokenxoxb- 开头的 Token启动后日志出现以下内容表示连接成功:
[IM] Slack WebSocket connecting...
需要公网可达的回调地址。
第一步:创建 Slack App 并获取凭证
第二步:在 WeKnora 中添加 IM 渠道
xoxb- 开头的 Token第三步:配置 Event Subscriptions
Telegram 提供两种接入模式,推荐使用 WebSocket(长轮询)模式,无需公网域名。
无需公网域名,适合快速验证和内网部署。
第一步:创建 Telegram Bot
/newbot,按提示填写 Bot 名称和用户名123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11)第二步:在 WeKnora 中添加 IM 渠道
启动后日志出现以下内容表示连接成功:
[IM] Telegram long polling connecting...
此时在 Telegram 中给 Bot 发送消息即可收到 AI 回复。
需要公网可达的回调地址(HTTPS)。
第一步:创建 Telegram Bot 并获取凭证
同上,通过 BotFather 创建 Bot 并获取 Bot Token。
第二步:在 WeKnora 中添加 IM 渠道
X-Telegram-Bot-Api-Secret-Token 头第三步:配置 Webhook
通过 Telegram Bot API 设置 Webhook:
curl -X POST "https://api.telegram.org/bot<YOUR_BOT_TOKEN>/setWebhook" \
-H "Content-Type: application/json" \
-d '{"url": "<YOUR_CALLBACK_URL>", "secret_token": "<YOUR_SECRET_TOKEN>"}'
注意:Telegram Webhook 必须使用 HTTPS。
钉钉提供两种接入模式,推荐使用 Stream 模式(WebSocket),无需公网域名。
无需公网域名,适合快速验证和内网部署。
第一步:创建钉钉机器人
第二步:配置机器人
第三步:在 WeKnora 中添加 IM 渠道
启动后日志出现以下内容表示连接成功:
[IM] DingTalk Stream connecting...
关于 AI 卡片:配置 Card Template ID 后,流式回复将通过钉钉 AI 卡片实时展示打字效果;未配置时流式内容将在结束后一次性发送。
需要公网可达的回调地址。
第一步:创建钉钉机器人并获取凭证
同上,创建应用并获取 Client ID 和 Client Secret。
第二步:在 WeKnora 中添加 IM 渠道
第三步:配置钉钉事件订阅
Mattermost 为自建部署,当前仅支持 Webhook 模式:出站 Webhook(Outgoing Webhook) 将用户消息 POST 到 WeKnora,Bot 通过 REST API v4 发回频道/线程回复。与 Slack Events API 类似,回调需快速返回 200,实际回答由 Bot Token 异步调用接口完成。
需要公网或内网可达的回调 URL(Mattermost 服务器能访问 WeKnora 的
/api/v1/im/callback/{channel_id})。若 WeKnora 在内网,请在 Mattermost 中配置 Trusted Internal Connections 或将回调地址加入允许列表。
application/x-www-form-urlencoded,WeKnora 两种均可解析)。https://mattermost.example.com(无尾部 /)bot_identity 去重)在绑定频道发送触发出站 Webhook 的消息,应收到 WeKnora 的回复;回复默认出现在触发帖所在线程(使用 Mattermost 的 root_id 与触发 post_id 对齐)。
参考文档: Outgoing webhooks
IM 渠道在 Agent 编辑器的 IM 集成 标签页中管理(仅编辑模式可见,创建 Agent 时不显示)。
每个渠道以卡片形式展示,包含:
┌──────────────────────────────────────────────────────────────────────────────┐
│ IM 集成架构 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 企业微信 │ │ 飞书 │ │ Slack │ │ Telegram │ │ 钉钉 │ IM层│
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ WH/WS │ WH/WS │ WH/WS │ WH/LP │ WH/Stream │
│ ─────┼─────────────┼────────────┼─────────────┼─────────────┼────────── │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ WeCom │ │ Feishu │ │ Slack │ │ Telegram │ │ DingTalk │ 适配 │
│ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ 器层 │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │ │
│ ─────┼─────────────┼─────────────┼──────────────┼─────────────┼──────── │
│ └─────────────┼─────────────┼──────────────┘─────────────┘ │
│ │ │
│ ┌────┴────────────┐ │
│ │ Mattermost │ Webhook-only │
│ └────────┬────────┘ │
│ ▼ │
│ ┌────────────────┐ │
│ │ Mattermost │ │
│ │ Adapter │ │
│ └────────┬────────┘ │
│ │ │
│ ──────────────────┴────────────────────────────────────────────────────│
│ ▼ │
│ ┌──────────────────────────────────┐ │
│ │ im.Service │ 服务编排层 │
│ │ │ · IM 渠道管理 (CRUD) │
│ │ ┌────────────────────────────┐ │ · Adapter Factory (动态创建) │
│ │ │ CommandRegistry │ │ · 斜杠指令分发 │
│ │ │ qaQueue (Worker Pool) │ │ · QA 队列调度 (有界, 异步) │
│ │ │ rateLimiter (滑动窗口) │ │ · 滑动窗口限流 │
│ │ │ processedMsgs (去重) │ │ · 消息去重 (MessageID + TTL) │
│ │ │ inflight (取消跟踪) │ │ · 会话映射 (ChannelSession) │
│ │ └────────────────────────────┘ │ · 流式/全量路由 │
│ └──────────────┬───────────────────┘ │
│ │ │
│ ───────────────┼─────────────────────────────────────────────────────── │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ WeKnora Core (QA Pipeline) │ 核心层 │
│ │ SessionService · MessageService │ │
│ │ TenantService · AgentService │ │
│ │ KnowledgeService (文件保存) │ │
│ └──────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘
设计模式:
| 模式 | 用途 |
|---|---|
| Adapter Pattern | 统一不同 IM 平台的差异,每个平台实现 im.Adapter 接口 |
| Factory Pattern | 通过 AdapterFactory 从数据库渠道配置动态创建 Adapter 实例 |
| Strategy Pattern | StreamSender、FileDownloader 可选接口,按需实现 |
| Command Pattern | Command 接口 + CommandRegistry 实现可插拔的斜杠指令系统 |
| Producer-Consumer | qaQueue 有界队列 + Worker Pool,解耦消息接收与 QA 执行 |
| Event-Driven | 通过 EventBus 解耦 QA 管道与 IM 输出,支持实时块推送 |
IM 渠道配置存储在 im_channels 表中,绑定到 Agent:
CREATE TABLE im_channels (
id VARCHAR(36) PRIMARY KEY,
tenant_id BIGINT NOT NULL,
agent_id VARCHAR(36) NOT NULL, -- 绑定的 Agent ID
platform VARCHAR(20) NOT NULL, -- 'wecom' | 'feishu' | 'slack' | 'telegram' | 'dingtalk' | 'mattermost'
name VARCHAR(255) NOT NULL DEFAULT '',
enabled BOOLEAN NOT NULL DEFAULT true,
mode VARCHAR(20) NOT NULL DEFAULT 'websocket', -- 'webhook' | 'websocket'
output_mode VARCHAR(20) NOT NULL DEFAULT 'stream', -- 'stream' | 'full'
knowledge_base_id VARCHAR(36), -- 可选,绑定知识库以接收文件消息
bot_identity VARCHAR(255), -- 计算字段,防止重复机器人
credentials JSONB NOT NULL DEFAULT '{}', -- 平台凭证
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMPTZ
);
credentials 字段结构:
| 平台 | 模式 | 字段 |
|---|---|---|
| 企业微信 | WebSocket | bot_id, bot_secret |
| 企业微信 | Webhook | corp_id, agent_secret, token, encoding_aes_key, corp_agent_id |
| 飞书 | WebSocket | app_id, app_secret |
| 飞书 | Webhook | app_id, app_secret, verification_token, encrypt_key |
| Slack | WebSocket | app_token, bot_token |
| Slack | Webhook | bot_token, signing_secret |
| Telegram | WebSocket | bot_token |
| Telegram | Webhook | bot_token, secret_token(可选) |
| 钉钉 | WebSocket | client_id, client_secret, card_template_id(可选) |
| 钉钉 | Webhook | client_id, client_secret, card_template_id(可选) |
| Mattermost | Webhook(唯一支持) | site_url, bot_token, outgoing_token(必填);bot_user_id(可选,过滤机器人自身消息) |
mattermost 渠道的 mode 在数据库中固定为 webhook(创建时若未指定,服务端与模型钩子会默认 webhook)。bot_identity 形如 mattermost:wh:{outgoing_token},用于防止同一出站 Webhook 重复绑定多个渠道。
将 IM 渠道中的用户会话映射到 WeKnora 会话:
(im_channel_id, Platform, UserID, ChatID, TenantID) → SessionID
首次交互自动创建,后续消息复用同一会话。/clear 指令会软删除会话记录,下次消息重新创建。
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/v1/agents/:id/im-channels | 创建 IM 渠道 |
| GET | /api/v1/agents/:id/im-channels | 列出 Agent 的所有 IM 渠道 |
| PUT | /api/v1/im-channels/:id | 更新 IM 渠道 |
| DELETE | /api/v1/im-channels/:id | 删除 IM 渠道 |
| POST | /api/v1/im-channels/:id/toggle | 启用/停用 IM 渠道 |
| 方法 | 路径 | 说明 |
|---|---|---|
| GET/POST | /api/v1/im/callback/:channel_id | 通用回调(根据 channel_id 自动路由到对应 Adapter) |
Webhook 模式下,每个渠道有唯一的回调地址
/api/v1/im/callback/{channel_id},在前端渠道卡片上可一键复制。回调路由注册在认证中间件之前,由平台签名验证保护。
每个 IM 渠道代表一个 IM 平台机器人与 WeKnora Agent 的绑定关系。一个 Agent 可以绑定多个渠道(如同时接入企业微信、飞书、Slack 与 Mattermost),同一平台也可以创建多个渠道(如不同的企业微信机器人)。
渠道有一个计算字段 BotIdentity,由平台类型、模式和核心凭证推导,用于防止同一机器人被重复创建。
渠道启动时,Service 通过 AdapterFactory 根据平台类型和凭证动态创建对应的 Adapter 实例。
所有平台的消息在解密、解析后被归一化为 IncomingMessage,抹平平台差异:
type IncomingMessage struct {
Platform Platform // "wecom" | "feishu" | "slack" | "telegram" | "dingtalk" | "mattermost"
MessageType MessageType // "text" | "file" | "image"
UserID string // 平台用户标识
UserName string // 显示名 (可选)
ChatID string // 群聊 ID (私聊为空)
ChatType ChatType // "direct" | "group"
Content string // 纯文本内容
MessageID string // 平台消息 ID (用于去重)
FileKey string // 文件标识 (文件/图片消息)
FileName string // 文件名 (文件/图片消息)
FileSize int64 // 文件大小 (字节)
Extra map[string]string // 平台特有字段 (如 req_id、aes_key)
}
type ReplyMessage struct {
Content string // Markdown 文本
IsStreaming bool // 是否为流式块
IsFinal bool // 是否为最后一块
Extra map[string]string // 平台特有字段
}
将 IM 渠道 (渠道 ID + 用户 + 群聊) 映射到 WeKnora 会话,实现对话上下文持续性。首次交互自动创建,后续消息复用同一会话。并发创建通过唯一约束 + fallback 查询处理。存储于 im_channel_sessions 表。
用户在 IM 中发送消息
│
▼
┌─ HTTP Handler / WebSocket 回调 ─────────────────┐
│ 1. 根据 channel_id 查找渠道配置 │
│ 2. 获取对应 Adapter │
│ 3. 签名验证 (VerifyCallback) │
│ 4. URL 验证处理 (HandleURLVerification) │
│ 5. 解密 + 解析 → IncomingMessage (ParseCallback) │
│ 6. 立即返回 HTTP 200 (异步处理) │
└──────────────────────────┬──────────────────────-┘
│ goroutine
▼
┌─ im.Service.HandleMessage ──────────────────────┐
│ 1. 去重检查 (MessageID, 5 分钟 TTL) │
│ 2. 内容长度校验 (≤ 4096 rune,超出截断) │
│ 3. 斜杠指令检测 → 命中则分发到 CommandRegistry │
│ 4. 限流检查 (滑动窗口, 10次/60s) │
│ 5. 从渠道配置获取 agent_id、tenant_id │
│ 6. 解析/创建 ChannelSession │
│ 7. 获取 WeKnora Session │
│ 8. 加载 Agent 配置(获取知识库、模型等信息) │
│ 9. 文件消息?→ 下载并保存到知识库 │
│ 10. 提交到 qaQueue (有界队列, 异步执行) │
└───────────┬─────────────────────────────────────┘
│
▼
┌─ qaQueue Worker ────────────────────────────────┐
│ 从队列取出请求,记录 inflight,判断流式/全量模式 │
└───────────┬─────────────────────┬───────────────┘
│ │
流式模式 ▼ 全量模式 ▼
┌────────────────────┐ ┌─────────────────────┐
│ handleMessageStream│ │ runQA (阻塞收集完整 │
│ │ │ 回答后一次性发送) │
│ · StartStream │ └─────────────────────┘
│ · EventBus 订阅 │
│ · 300ms 批量刷新 │
│ · 工具事件展示 │
│ · SendStreamChunk │
│ · EndStream │
└────────────────────┘
│
▼
消息持久化 (user + assistant)
渠道创建/更新 (前端 UI)
│
▼
┌─ im.Service ──────────────────────────┐
│ 1. 保存渠道配置到数据库 │
│ 2. 如果渠道已启用: │
│ a. AdapterFactory 创建 Adapter │
│ b. WebSocket 模式:建立长连接 │
│ c. Webhook 模式:注册回调处理 │
│ 3. 维护 channels map (channel_id → │
│ channelState{Channel, Adapter}) │
└────────────────────────────────────────┘
服务启动时:
LoadAndStartChannels() → 从 DB 加载所有 enabled 的渠道 → 逐个 StartChannel()
渠道停用/删除时:
StopChannel() → 取消 Adapter 上下文 → 从 map 移除
type Adapter interface {
Platform() Platform
VerifyCallback(c *gin.Context) error
ParseCallback(c *gin.Context) (*IncomingMessage, error)
SendReply(ctx context.Context, incoming *IncomingMessage, reply *ReplyMessage) error
HandleURLVerification(c *gin.Context) bool
}
| 方法 | 职责 |
|---|---|
Platform() | 返回平台标识,用于路由和注册 |
VerifyCallback() | 验证回调请求的签名/Token |
ParseCallback() | 解密并解析回调为 IncomingMessage,非消息事件返回 nil |
SendReply() | 通过平台 API 发送完整回复 |
HandleURLVerification() | 处理平台初始 URL 验证(首次配置时调用) |
type StreamSender interface {
StartStream(ctx context.Context, incoming *IncomingMessage) (streamID string, err error)
SendStreamChunk(ctx context.Context, incoming *IncomingMessage, streamID string, content string) error
EndStream(ctx context.Context, incoming *IncomingMessage, streamID string) error
}
实现此接口后,Service 会自动路由到流式模式。渠道配置 output_mode: "full" 可强制关闭。
type FileDownloader interface {
DownloadFile(ctx context.Context, msg *IncomingMessage) (io.ReadCloser, string, error)
}
实现此接口后,当用户发送文件/图片消息且渠道配置了 knowledge_base_id 时,Service 会自动下载文件并保存到指定知识库。
type AdapterFactory func(ctx context.Context, channel *IMChannel, msgHandler func(*IncomingMessage)) (Adapter, CancelFunc, error)
每个平台注册一个工厂函数,Service 在启动渠道时调用工厂创建 Adapter 实例。工厂函数根据渠道的 mode 和 credentials 决定创建哪种 Adapter。
提供两种连接模式,对应两套适配器实现:
WebhookAdapter)适用于自建应用,需要公网可访问的回调地址。
企业微信服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
│
解密 (AES-256-CBC)
解析 XML → IncomingMessage
│
处理完成后调用 WeCom REST API 回复
encoding_aes_key Base64 解码得到(32 字节),IV 为 Key 前 16 字节random(16) + msg_len(4) + message + corp_id,PKCS#7 填充sort([token, timestamp, nonce, encrypt])),常量时间比较text(文本)和 image(图片,PicUrl 直接下载或 MediaId 临时素材 API)appchat/send 群聊 API,失败时降级到私聊直发/cgi-bin/message/send 接口发送 Markdown 消息WSAdapter + LongConnClient)适用于智能客服机器人,无需公网域名,由客户端主动建立 WebSocket 长连接。
LongConnClient ══WebSocket══▶ wss://openws.work.weixin.qq.com
│
1. 发送 aibot_subscribe (bot_id + secret)
2. 接收 aibot_msg_callback 消息帧
3. 通过 aibot_respond_msg 回复
4. 每 30s 心跳保活 (ping/pong)
5. 断连自动重连 (指数退避 1s → 30s)
text(文本)、image(图片)、file(文件)、voice(语音,服务端已转文本)、mixed(混合,文本 + 图片)、event(服务器事件)finish=true 标记结束| 文件 | 职责 |
|---|---|
internal/im/wecom/webhook_adapter.go | Webhook 模式:回调解密、签名验证、REST API 回复、群聊发送、Token 缓存、文件下载 |
internal/im/wecom/ws_adapter.go | WebSocket 模式适配器壳,代理到 LongConnClient |
internal/im/wecom/longconn.go | WebSocket 客户端:连接管理、心跳、帧协议、自动重连、多消息类型解析、文件解密 |
统一适配器同时支持 Webhook 和 WebSocket 模式,且原生实现 StreamSender 和 FileDownloader 接口。
飞书服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
│
解密 (AES-256-CBC,可选)
解析 JSON → IncomingMessage
│
通过飞书 Open API 回复
SHA-256(encrypt_key),IV 为密文前 16 字节im.message.receive_v1 事件,忽略其他事件类型text(文本)、file(文件)、image(图片)、post(富文本,提取标题 + 结构化内容)@_user_xxx 提及前缀通过飞书官方 SDK (github.com/larksuite/oapi-sdk-go) 建立长连接,事件推送与 Webhook 等价,无需公网域名,内置自动重连。
飞书的流式输出基于 CardKit 卡片流式更新,是官方推荐的最佳实践:
StartStream:
1. POST /cardkit/v1/cards → 创建卡片实体 (streaming_mode: true)
2. POST /im/v1/messages → 发送卡片消息到聊天
SendStreamChunk:
3. PUT /cardkit/v1/cards/{id}/elements/{eid}/content → 更新元素内容 (累积全文)
EndStream:
4. PATCH /cardkit/v1/cards/{id}/settings → 设置 streaming_mode: false
每次 SendStreamChunk 发送的是累积全文而非增量,由 feishuStreamState 跟踪完整内容和严格递增的 sequence 序号。
Think 块处理: 流式输出中的 <think>...</think> 块会被转换为飞书 Markdown 引用块格式:
> 💭 **思考过程**
> [thinking content line 1]
> [thinking content line 2]
孤立流清理: 后台协程每 1 分钟扫描超过 5 分钟未关闭的流式卡片,自动调用 EndStream 关闭(防止内存泄漏)。
| 文件 | 职责 |
|---|---|
internal/im/feishu/adapter.go | 事件解析、CardKit 流式实现、Token 缓存、AES 解密、Think 块转换、文件下载 |
internal/im/feishu/longconn.go | WebSocket 长连接(封装飞书 SDK)、事件分发 |
统一适配器同时支持 Webhook 和 WebSocket (Socket Mode) 模式,且原生实现 StreamSender 接口。
Slack 服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
│
签名验证 (HMAC-SHA256)
解析 JSON → IncomingMessage
│
通过 Slack Web API 回复
signing_secret 对请求体进行 HMAC-SHA256 签名验证,防止伪造请求。message 和 app_mention 事件,忽略机器人自己发送的消息。url_verification challenge 请求。通过 slack-go/slack/socketmode 建立长连接,事件推送与 Webhook 等价,无需公网域名,内置自动重连。
LongConnClient ══WebSocket══▶ wss://wss-primary.slack.com
│
1. 使用 App Token 建立连接
2. 接收 Events API 消息帧
3. 确认消息 (Ack)
4. 通过 Slack Web API 回复
Slack 的流式输出基于消息更新 (chat.update) 实现:
StartStream:
1. POST /chat.postMessage → 发送初始消息,获取 ts (timestamp)
SendStreamChunk:
2. POST /chat.update → 根据 ts 更新消息内容 (累积全文)
EndStream:
3. 无需特殊操作
每次 SendStreamChunk 发送的是累积全文而非增量。
| 文件 | 职责 |
|---|---|
internal/im/slack/adapter.go | 事件解析、签名验证、流式实现、文件下载 |
internal/im/slack/longconn.go | WebSocket 长连接(封装 slack-go Socket Mode) |
统一适配器同时支持 Webhook 和长轮询模式,原生实现 StreamSender 和 FileDownloader 接口。
Telegram 服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
│
Secret Token 验证(可选)
解析 JSON → IncomingMessage
│
通过 Telegram Bot API 回复
X-Telegram-Bot-Api-Secret-Token 请求头与配置的 secret_token 进行常量时间比较text(文本)、document(文件,通过 file_id 下载)、photo(图片,自动选择最大尺寸)@bot 提及前缀Telegram 的 WebSocket 模式实际使用长轮询(Long Polling)而非真正的 WebSocket,通过持续调用 getUpdates API 获取新消息。
LongConnClient ──HTTP POST──▶ https://api.telegram.org/bot<token>/getUpdates
│
1. 发送 getUpdates (timeout=30s,长轮询)
2. 解析返回的 Update 列表
3. 更新 offset 以确认已处理
4. 循环继续
5. 错误时退避 3 秒重试
offset 参数自动确认已处理的消息Telegram 的流式输出基于消息编辑 (editMessageText) 实现:
StartStream:
1. POST sendMessage → 发送初始 "正在思考..." 消息,获取 message_id
SendStreamChunk:
2. POST editMessageText → 根据 message_id 更新消息内容(累积全文)
EndStream:
3. POST editMessageText → 最终更新,启用 Markdown 解析
每次 SendStreamChunk 发送的是累积全文而非增量,最小编辑间隔 500ms(避免触发 Telegram 速率限制)。
Think 块处理: 流式输出中的 <think>...</think> 块会被转换为 Telegram 引用块格式:
> 💭 *思考过程*
> [thinking content line 1]
> [thinking content line 2]
孤立流清理: 后台协程每 1 分钟扫描超过 5 分钟未关闭的流式消息,自动清理(防止内存泄漏)。
Telegram 支持文件和图片消息的下载,流程:
getFile API 获取文件路径https://api.telegram.org/file/bot<token>/<file_path> 下载文件| 文件 | 职责 |
|---|---|
internal/im/telegram/adapter.go | 事件解析、Secret Token 验证、流式实现(editMessage)、文件下载 |
internal/im/telegram/longconn.go | 长轮询客户端:getUpdates 循环、offset 管理、错误退避 |
统一适配器同时支持 Webhook 和 Stream 模式,原生实现 StreamSender 接口。支持可选的 AI 卡片流式输出。
钉钉服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
│
HmacSHA256 签名验证
解析 JSON → IncomingMessage
│
通过 sessionWebhook 或 OpenAPI 回复
client_secret 对 timestamp + "\n" + secret 进行 HmacSHA256 签名,Base64 编码后与请求头 Sign 比较,时间戳 1 小时内有效sessionWebhook 回复(适用于群聊场景),失败或不可用时降级到 OpenAPI
/v1.0/robot/groupMessages/send/v1.0/robot/oToMessages/batchSendsampleMarkdown)/v1.0/oauth2/accessToken 获取 Access Token,带缓存(提前 5 分钟刷新)通过钉钉官方 SDK (github.com/open-dingtalk/dingtalk-stream-sdk-go) 建立 Stream 连接,事件推送与 Webhook 等价,无需公网域名,内置自动重连。
LongConnClient ══Stream══▶ 钉钉 Stream 服务
│
1. 使用 ClientID + ClientSecret 建立连接
2. 注册 ChatBot 回调处理
3. 接收消息事件
4. SDK 内置重连和心跳
钉钉的流式输出基于 AI 互动卡片 实现(需配置 card_template_id):
StartStream:
1. POST /v1.0/card/instances/createAndDeliver → 创建并投递 AI 卡片
SendStreamChunk:
2. PUT /v1.0/card/streaming → 流式更新卡片内容(累积全文)
EndStream:
3. PUT /v1.0/card/streaming (isFinalize=true) → 标记流式结束
每次 SendStreamChunk 发送的是累积全文(isFull: true),最小更新间隔 500ms。
无卡片模板时的降级策略: 未配置 card_template_id 时,流式内容在内存中累积,EndStream 时一次性通过 sessionWebhook 或 OpenAPI 发送完整回复。
Think 块处理: 使用与飞书相同的 Markdown 引用块格式(MarkdownThinkStyle):
> 💭 **思考过程**
> [thinking content line 1]
> [thinking content line 2]
孤立流清理: 后台协程每 1 分钟扫描超过 5 分钟未关闭的流,自动清理防止内存泄漏。
| 文件 | 职责 |
|---|---|
internal/im/dingtalk/adapter.go | 事件解析、HmacSHA256 签名验证、AI 卡片流式实现、Access Token 缓存、OpenAPI 调用 |
internal/im/dingtalk/longconn.go | Stream 客户端(封装钉钉 SDK)、ChatBot 消息分发 |
仅支持 Webhook 模式:Mattermost Outgoing Webhook 将请求体 POST 到 WeKnora 统一回调地址;适配器实现 Adapter、StreamSender、FileDownloader(标准库 HTTP 调用 REST API,无第三方 SDK)。
Mattermost 服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
│
校验 body 中 token = outgoing_token
解析 JSON 或 x-www-form-urlencoded
│
通过 REST API v4 回复(Bearer Bot Token)
token 必须与渠道凭证中的 outgoing_token 一致(创建渠道时工厂会校验 outgoing_token 非空)。application/json 与 application/x-www-form-urlencoded(与 官方文档 一致)。bot_user_id,且 user_id 与之相同,则返回 nil 消息,避免自激。IncomingMessage.Extra 中保存 thread_root_id(有 root_id 时用其值,否则用触发帖 post_id);SendReply / CreatePost 时设置 root_id,使回复出现在同一线程。MessageID 使用触发帖的 post_id。file_ids,取首个文件 ID 作为 FileKey;DownloadFile 先 GET /api/v4/files/{id}/info 再 GET /api/v4/files/{id} 下载内容。与 Slack 类似,通过编辑同一条帖子展示累积全文:
StartStream:
1. POST /api/v4/posts → 创建占位帖(如「正在思考...」),得到 post id
SendStreamChunk:
2. PUT /api/v4/posts/{post_id}/patch → Patch message 字段(累积全文)
EndStream:
3. 最后一次 Patch,与 SendStreamChunk 相同逻辑
流式刷新间隔由 Service 侧 streamFlushInterval(300ms)批量合并,以降低编辑频率、减轻 API 压力。
Mattermost 出站 Webhook 无 Slack/Feishu 类 challenge 流程,HandleURLVerification 恒为 false。
| 文件 | 职责 |
|---|---|
internal/im/mattermost/adapter.go | 出站 Webhook 解析、Token 校验、发帖/补丁流式、文件下载 |
internal/im/mattermost/client.go | REST v4:CreatePost、PatchPostMessage、文件 info/下载 |
internal/im/mattermost/form_parse.go | 表单编码 body 与 file_ids 辅助解析 |
IM 渠道支持斜杠指令(Slash Commands),用户在聊天中输入 /指令名 即可触发,无需经过 QA 管道,且不受限流约束。
| 指令 | 参数 | 说明 |
|---|---|---|
/help | [指令名] | 显示所有可用指令列表;带参数时显示指定指令的详细用法 |
/info | — | 查看当前绑定智能体的名称、角色设定、知识库列表等信息 |
/search | <关键词> | 对绑定的知识库执行混合检索(向量 + 关键词),返回最多 5 条原文片段,不经过 AI 总结 |
/stop | — | 取消当前排队中或正在执行的 QA 请求 |
/clear | — | 清空当前对话记忆(软删除 ChannelSession),下次消息开始全新会话 |
用户消息 ──▶ HandleMessage
│
├─ 以 "/" 开头?
│ │
│ ├─ 已注册指令 → CommandRegistry.Parse → Command.Execute → 回复结果
│ │ │
│ │ ActionClear → 软删除 ChannelSession
│ │ ActionStop → 取消排队/执行中的 QA
│ │
│ └─ LooksLikeCommand() = true 但未注册
│ → 回复 "未知指令,发送 /help 查看"
│ LooksLikeCommand() = false (如 "/api/v2/users")
│ → 当作普通消息,进入 QA 管道
│
└─ 普通消息 → 限流检查 → qaQueue → QA 管道
LooksLikeCommand()通过检查首 token 是否含有/分隔符来区分指令尝试和 URL 路径,避免误拦截。
实现 im.Command 接口并在 Service 初始化时注册到 CommandRegistry:
type Command interface {
Name() string // 指令名 (不含 "/")
Description() string // 一行描述,用于 /help 输出
Execute(ctx context.Context, cmdCtx *CommandContext, args []string) (*CommandResult, error)
}
设计约定:
CommandContext 中CommandResult 返回友好提示,error 仅用于基础设施故障(DB 异常、网络错误等)CommandResult.Action 声明副作用意图(如清空会话),由 Service 执行| 文件 | 职责 |
|---|---|
internal/im/command.go | Command 接口、CommandAction、CommandContext 定义 |
internal/im/command_registry.go | CommandRegistry:指令注册、解析、分发、LooksLikeCommand |
internal/im/cmd_help.go | /help 指令实现 |
internal/im/cmd_info.go | /info 指令实现(展示 Agent 信息、知识库列表) |
internal/im/cmd_search.go | /search 指令实现(混合检索,最多 5 条,内容截断 200 rune) |
internal/im/cmd_stop.go | /stop 指令实现 |
internal/im/cmd_clear.go | /clear 指令实现 |
有界工作池队列管理 QA 请求,防止并发过载:
消息 ──▶ Enqueue ──▶ [ 等待队列 (≤50) ] ──▶ Worker Pool (5 workers) ──▶ QA 管道
│ │
├─ 队列已满 → 拒绝并回复提示 │
├─ 用户排队超限 (≤3) → 拒绝 ├─ 等待超时 (>60s) → 丢弃并通知
└─ /stop → Remove(userKey) 取消 └─ 正常执行 QA
设计要点:
/stop 指令通过 qaQueue.Remove(userKey) 取消排队请求,通过 inflight map 中的 context.CancelFunc 取消执行中请求在消息进入 QA 队列之前,按 channelID:userID:chatID 维度进行滑动窗口限流:
| 参数 | 值 | 说明 |
|---|---|---|
| 窗口大小 | 60s | 滑动时间窗口 |
| 最大请求数 | 10 次/窗口 | 每个用户每分钟最多 10 条消息进入 QA |
| 清理周期 | 1 min | 自动清理过期条目,防止内存泄漏 |
超出限流时回复提示消息,不计入队列。斜杠指令不受限流约束。
| 文件 | 职责 |
|---|---|
internal/im/qaqueue.go | qaQueue:有界队列、Worker Pool、QueueMetrics、指标上报 |
internal/im/ratelimit.go | slidingWindowLimiter:per-key 滑动窗口限流、并发安全清理 |
流式模式通过 EventBus 实时收集 QA 管道产生的内容块,以 300ms 间隔批量推送,在延迟与 API 限频之间取得平衡:
QA 管道 ──chunk──chunk──chunk──▶ EventBus
│
每 300ms 刷新
│
┌───────────▼───────────┐
│ 累积内容 → 完整替换推送 │
│ (非增量,每次发送全文) │
└───────────────────────┘
<think>...</think> 块在飞书和钉钉中转换为 Markdown 引用块展示,在 Telegram 中转换为引用块(斜体标题),在其他平台中过滤⏳ [工具名](包裹在 think 块内)✅ [工具名] · [摘要]⚠️ [工具名] 失败fallbackNonStream)streamReaperInterval(1 分钟)扫描超过 streamOrphanTTL(5 分钟)未关闭的流,自动关闭防止内存泄漏<think> 标签转换为 Markdown 引用块(> 💭 **思考过程**),Telegram 使用斜体标题(> 💭 *思考过程*)当用户在 IM 中发送文件或图片消息时,如果渠道配置了 knowledge_base_id,Service 会自动将文件保存到对应知识库:
用户发送文件/图片消息
│
▼
消息类型 = file/image?
渠道配置了 knowledge_base_id?
Adapter 实现了 FileDownloader?
│ 全部满足
▼
1. adapter.DownloadFile(msg) → io.ReadCloser + fileName
2. 通知用户 "正在处理文件..."
3. knowledgeService.Save(file, knowledgeBaseID)
4. 通知用户 "文件已保存到知识库"
各平台文件下载方式:
| 平台 | 方式 |
|---|---|
| 飞书 | GetMessageResource API(通过 FileKey) |
| 企业微信 Webhook | PicUrl 直接下载 或 MediaId 临时素材 API |
| 企业微信 WebSocket | 加密附件 URL + per-message AES 密钥解密 |
| Telegram | getFile API 获取文件路径 + HTTPS 下载(支持文档和图片) |
| Mattermost | GET /api/v4/files/{file_id}/info + GET /api/v4/files/{file_id}(Bearer Bot Token) |
| 参数 | 值 | 说明 |
|---|---|---|
qaTimeout | 120s | QA 管道最大执行时间 |
dedupTTL | 5 min | 消息去重 ID 保留时长 |
dedupCleanupInterval | 1 min | 去重清理周期 |
maxContentLength | 4096 | 消息最大长度 (rune),超出截断 |
streamFlushInterval | 300ms | 流式内容批量刷新间隔 |
defaultMaxQueueSize | 50 | QA 队列最大容量 |
defaultMaxPerUser | 3 | 单用户最大排队请求数 |
defaultWorkers | 5 | QA 并发 Worker 数 |
queueTimeout | 60s | 请求在队列中的最大等待时间 |
rateLimitWindow | 60s | 限流滑动窗口大小 |
rateLimitMaxRequests | 10 | 每用户每窗口最大请求数 |
metricsLogInterval | 30s | 队列指标日志上报周期 |
streamOrphanTTL | 5 min | 飞书/Telegram/钉钉 孤立流超时时间 |
streamReaperInterval | 1 min | 飞书/Telegram/钉钉 孤立流清理扫描周期 |
| Telegram 编辑间隔 | 500ms | editMessageText 最小调用间隔(避免速率限制) |
| Telegram 长轮询超时 | 30s | getUpdates timeout 参数 |
| Telegram 错误退避 | 3s | getUpdates 失败后等待时间 |
| DingTalk 卡片更新间隔 | 500ms | AI 卡片流式更新最小间隔 |
| DingTalk 签名有效期 | 1h | Webhook 回调签名时间戳有效窗口 |
| WeCom WS 心跳 | 30s | WebSocket 保活频率 |
| WeCom WS 读超时 | 90s | 3 × 心跳间隔,允许一次心跳丢失 |
| WeCom WS 重连退避 | 1s → 30s | 指数退避,上限 30 秒 |
| Token 缓存安全余量 | 5 min | Token 过期前提前刷新 |
| 场景 | 处理策略 |
|---|---|
| 流式初始化失败 | 自动降级到全量模式 (fallbackNonStream) |
| QA 管道异常 | 回复 "抱歉,处理您的问题时出现了异常,请稍后再试。" |
| QA 超时 (>120s) | 标记消息完成,回复超时提示 |
| 空回答 | 回复 "抱歉,我暂时无法回答这个问题。" |
| 空流式内容 | 无可见内容时回退到完整回复 |
| WebSocket 断连 | 指数退避自动重连 |
| 平台重试 | MessageID 去重,5 分钟内自动跳过 |
| 渠道启动失败 | 日志记录错误,不影响其他渠道 |
| QA 队列已满 | 拒绝请求并回复 "当前排队人数较多,请稍后再试。" |
| 用户排队超限 | 拒绝请求并回复提示(单用户 ≤3) |
| 排队等待超时 | 超过 60s 自动丢弃,回复 "您的消息等待超时,请重新发送。" |
| 消息限流 | 滑动窗口内超过 10 次,回复限流提示 |
| 飞书孤立流 | 每分钟扫描,超过 5 分钟未关闭的自动结束 |
| Telegram/钉钉孤立流 | 同飞书,每分钟扫描并自动清理 |
| 企业微信群聊回复失败 | appchat API 失败时降级到用户私聊 |
| 钉钉 AI 卡片创建失败 | 降级到 sessionWebhook 或 OpenAPI 回复 |
| 钉钉 sessionWebhook 不可用 | 降级到 OpenAPI(群聊/私聊分别调用不同端点) |
接入新的 IM 平台只需 3 步:
im.Adapter 接口在 internal/im/<platform>/ 下创建适配器:
package myplatform
type Adapter struct { /* 平台配置 */ }
func (a *Adapter) Platform() im.Platform { return "myplatform" }
func (a *Adapter) VerifyCallback(c *gin.Context) error { /* 签名验证 */ }
func (a *Adapter) ParseCallback(c *gin.Context) (*im.IncomingMessage, error) { /* 解析消息 */ }
func (a *Adapter) SendReply(ctx context.Context, incoming *im.IncomingMessage, reply *im.ReplyMessage) error { /* 发送回复 */ }
func (a *Adapter) HandleURLVerification(c *gin.Context) bool { /* URL 验证 */ }
可选接口:
im.StreamSender 以支持流式输出im.FileDownloader 以支持文件消息自动保存到知识库可参考已有的 Telegram(纯 HTTP API)和 DingTalk(SDK + AI 卡片)适配器作为实现参考。
在 internal/container/container.go 的 registerIMAdapterFactories 中注册工厂函数:
imService.RegisterAdapterFactory("myplatform", func(ctx context.Context, channel *im.IMChannel, msgHandler func(*im.IncomingMessage)) (im.Adapter, im.CancelFunc, error) {
creds := parseCredentials(channel.Credentials)
appKey := getString(creds, "app_key")
appSecret := getString(creds, "app_secret")
adapter := myplatform.NewAdapter(appKey, appSecret)
// WebSocket 模式需要启动长连接
if channel.Mode == "websocket" {
cancelCtx, cancel := context.WithCancel(ctx)
go adapter.StartLongConn(cancelCtx, msgHandler)
return adapter, func() { cancel() }, nil
}
return adapter, func() {}, nil
})
在 IMChannelPanel.vue 中:
在 i18n 文件中添加平台名称翻译。
Service 层 (im.Service) 不需要任何修改 — 渠道管理、指令分发、消息编排、会话管理、QA 调度、限流、流式控制全部由 Service 统一处理。