Back to Fastgpt

微信个人号(ClawBot) - 设计文档

.agents/design/outlink/wechat-clawbot.md

4.15.08.3 KB
Original Source

微信个人号(ClawBot) - 设计文档

1. 架构概览

┌──────────────────── BullMQ ────────────────────────────┐
│                                                         │
│  Queue: wechatPoll                                      │
│  ┌──────┐   ┌──────┐   ┌──────┐                       │
│  │ poll │   │ poll │   │ poll │   ...                   │
│  │ ch_1 │   │ ch_2 │   │ ch_3 │                        │
│  └──┬───┘   └──┬───┘   └──┬───┘                       │
│     │          │          │                             │
│     └──────────┴──────────┘                             │
│                │                                        │
│         Worker (concurrency: 10)                        │
│                │                                        │
│     ┌──────────┴──────────┐                             │
│     │  1. getUpdates()    │                             │
│     │  2. 按用户分组合并   │                             │
│     │  3. outlinkInvokeChat│                            │
│     │  4. sendMessage()   │                             │
│     │  5. 更新 buf        │                             │
│     │  6. 自链: queue.add │  ←── 完成后立刻创建下一个    │
│     └─────────────────────┘                             │
│                                                         │
└─────────────────────────────────────────────────────────┘

多节点部署:
  Node A ──┐
  Node B ──┼── 同一个 Redis ── 同一个 Queue
  Node C ──┘    BullMQ 自动保证同一个 Job 只被一个 Worker 消费

2. 核心流程

2.1 Job 生命周期

渠道上线(扫码登录成功)
    │
    ▼
queue.add('poll', { shareId }, { jobId: `wechat-poll-${shareId}-${ts}` })
    │
    ▼
Worker 消费 Job
    │
    ├── 1. 从数据库读取 buf 和 token
    ├── 2. 检查渠道状态(离线 → 不续链,轮询自然停止)
    ├── 3. 调用 ilink getUpdates(buf)(长轮询,最多 35 秒)
    ├── 4. 收到消息 → groupMessagesByUser → 合并文本
    ├── 5. 对每组调用 outlinkInvokeChat → sendMessage 回复
    ├── 6. 更新 buf 到数据库
    └── 7. 自链: queue.add 创建下一个 Job

2.2 渠道上下线控制

上线: 扫码成功 → status='online' → queue.add(首个 Job)
下线: 用户登出/删除 → status='offline' → Worker 检测后不续链
异常: 连续失败 ≥5 次 → status='error' → 不续链
重连: 用户重新扫码 → 清空 syncBuf → 同上线流程

3. 类型定义

3.1 WechatAppType

typescript
// packages/global/support/outLink/type.ts
export const WechatAppSchema = z.object({
  token: z.string().default(''),
  baseUrl: z.string().default('https://ilinkai.weixin.qq.com'),
  accountId: z.string().default(''),
  userId: z.string().optional(),
  syncBuf: z.string().default(''),
  status: z.enum(['online', 'offline', 'error']).default('offline'),
  loginTime: z.string().optional(),
  lastError: z.string().optional()
});
export type WechatAppType = z.infer<typeof WechatAppSchema>;

3.2 BullMQ Job 数据

typescript
// packages/service/support/outLink/wechat/type.ts
export type WechatPollJobData = { shareId: string };

4. 关键设计决策

4.1 为什么用自链式而不是 Repeatable

Repeatable自链式
消息延迟固定间隔(如 30s)实时(ilink 长轮询)
Job 重叠会(定时无脑创建)不会(处理完才创建下一个)
停止方式需要删除 repeatable key不续链即可,天然停止
多节点安全BullMQ 保证BullMQ 保证

4.2 Worker 参数

参数说明
concurrency10单实例同时处理 10 个渠道(I/O 密集,不占 CPU)
lockDuration120sgetUpdates 35s + 工作流 60s + sendMessage = ~100s,留余量
stalledInterval60s检测 stalled Job
removeOnCompletecount: 0完成即删
removeOnFailcount: 100, age: 7d保留最近 100 条失败记录

4.3 错误处理策略

错误类型处理
网络超时正常(getUpdates 35s 超时),续链
API 返回错误记录失败计数,延迟 10s 续链
连续失败 ≥ 5 次标记 status='error',停止续链
渠道被删除outLink 查不到,不续链
工作流处理失败发送 defaultResponse 给用户,续链继续

4.4 重连时 buf 清空

重连时清空 syncBuf 是正确的。新 token 对应新 session,旧 buf 在 ilink 服务端已失效。清空后首次 getUpdates 会返回新的 buf。

5. 数据库索引

typescript
// packages/service/support/outLink/schema.ts
OutLinkSchemaType.index({ shareId: -1 });
OutLinkSchemaType.index({ teamId: 1, tmbId: 1, appId: 1 });
// 条件索引: 仅索引 wechat online 渠道,用于服务重启恢复
OutLinkSchemaType.index(
  { type: 1, 'app.status': 1 },
  { partialFilterExpression: { type: 'wechat', 'app.status': 'online' } }
);

6. Redis Key 清单

Key用途TTL
publish:wechat:qrcode:${shareId}二维码临时存储480s
publish:wechat:failures:${shareId}连续失败计数300s

7. 文件清单

修改现有文件

文件改动
packages/global/support/outLink/constant.tsPublishChannelEnum 新增 wechat
packages/global/support/outLink/type.ts新增 WechatAppSchema / WechatAppType
packages/global/core/chat/constants.tsChatSourceEnum / ChatSourceMap 新增 wechat
packages/global/support/wallet/usage/constants.tsUsageSourceEnum / UsageSourceMap 新增 wechat
packages/global/support/wallet/usage/tools.tsgetUsageSourceByPublishChannel 新增 case
packages/global/core/chat/utils.tsgetChatSourceByPublishChannel 新增 case
packages/web/i18n/zh-CN/publish.jsonwechat 相关 i18n
packages/web/i18n/en/publish.jsonwechat 相关 i18n
packages/web/i18n/zh-Hant/publish.jsonwechat 相关 i18n
packages/service/common/bullmq/index.tsQueueNames 新增 wechatPoll
packages/service/support/outLink/schema.ts新增条件索引
projects/app/src/pageComponents/app/detail/Publish/index.tsx注册 wechat 渠道入口
projects/app/src/service/common/bullmq/index.ts注册 initWechatPollWorker + resumeAllWechatPolling

新建文件

文件说明
projects/app/src/pageComponents/app/detail/Publish/Wechat/index.tsx渠道列表(含状态、扫码登录入口)
projects/app/src/pageComponents/app/detail/Publish/Wechat/WechatEditModal.tsx创建/编辑弹窗(name + maxUsagePoints)
projects/app/src/pageComponents/app/detail/Publish/Wechat/QRLoginModal.tsx扫码登录弹窗(二维码展示 + 状态轮询)
packages/service/support/outLink/wechat/ilinkClient.tsilink API 客户端(QR 登录 + 消息收发)
packages/service/support/outLink/wechat/type.tsWechatPollJobData 类型
packages/service/support/outLink/wechat/messageParser.ts消息解析纯函数(extractTextFromItem + groupMessagesByUser)
packages/service/support/outLink/wechat/mq.tsBullMQ Worker + 轮询调度
projects/app/src/pages/api/support/outLink/wechat/qrcode/generate.ts二维码生成 API
projects/app/src/pages/api/support/outLink/wechat/qrcode/status.ts扫码状态查询 API(confirmed 时保存 token + 启动轮询)
projects/app/src/pages/api/support/outLink/wechat/logout.ts登出 API(status → offline,清空 token)
test/cases/service/support/outLink/wechat/messageParser.test.ts消息解析单元测试(16 cases)