src/process/channels/ARCHITECTURE.md
Channels 是 AionUi 的多平台 AI 助手框架,将 AionUi 的 AI 能力(Gemini、Claude、Codex)通过即时通讯平台暴露给远程用户。目前支持三个平台:
| 平台 | SDK | 连接方式 | 消息更新 |
|---|---|---|---|
| Telegram | grammY | 长轮询 (Long Polling) | 编辑消息文本 |
| Lark(飞书) | @larksuiteoapi/node-sdk | WebSocket 长连接 | 编辑互动卡片 (Interactive Card) |
| DingTalk(钉钉) | dingtalk-stream | WebSocket Stream | AI Card 流式更新 |
核心设计原则:平台无关的统一消息协议 — 所有平台插件将原生消息转换为 IUnifiedIncomingMessage,所有回复通过 IUnifiedOutgoingMessage 发出,由各平台适配器转回原生格式。
src/channels/
├── index.ts # 模块导出入口
├── types.ts # 全局类型定义与转换工具函数
├── ARCHITECTURE.md # 本文档
│
├── core/ # 核心编排层
│ ├── ChannelManager.ts # 单例编排器,管理子系统生命周期
│ └── SessionManager.ts # 用户会话管理(per-chat 隔离)
│
├── gateway/ # 网关层:消息路由与插件管理
│ ├── ActionExecutor.ts # 消息路由 → Action 执行 → AI 对话
│ ├── PluginManager.ts # 插件注册、生命周期、状态推送
│ └── index.ts
│
├── agent/ # AI Agent 集成层
│ ├── ChannelEventBus.ts # 全局事件总线(Agent → Channel)
│ ├── ChannelMessageService.ts # 消息发送与流回调管理
│ └── index.ts
│
├── actions/ # Action 处理器(命令/按钮响应)
│ ├── types.ts # Action 类型定义与常量
│ ├── SystemActions.ts # 会话管理、帮助、设置、Agent 切换
│ ├── PlatformActions.ts # 配对流程(pairing.show/refresh/check)
│ ├── ChatActions.ts # 对话操作(发送、重新生成、工具确认)
│ └── index.ts
│
├── pairing/ # 用户授权配对子系统
│ ├── PairingService.ts # 配对码生成、审批、过期清理
│ └── index.ts
│
├── plugins/ # 平台插件实现
│ ├── BasePlugin.ts # 抽象基类,定义生命周期状态机
│ ├── index.ts
│ │
│ ├── telegram/ # Telegram 插件
│ │ ├── TelegramPlugin.ts # Bot 实例管理、长轮询、消息分发
│ │ ├── TelegramAdapter.ts # 消息格式转换(Telegram ↔ Unified)
│ │ ├── TelegramKeyboards.ts # Inline Keyboard 与 Reply Keyboard 构建
│ │ └── index.ts
│ │
│ ├── lark/ # Lark(飞书)插件
│ │ ├── LarkPlugin.ts # WebSocket 连接、事件去重、卡片消息
│ │ ├── LarkAdapter.ts # 消息格式转换(Lark ↔ Unified)
│ │ ├── LarkCards.ts # 互动卡片模板(菜单、帮助、配对等)
│ │ └── index.ts
│ │
│ └── dingtalk/ # DingTalk(钉钉)插件
│ ├── DingTalkPlugin.ts # Stream 连接、AI Card 流式、Token 管理
│ ├── DingTalkAdapter.ts # 消息格式转换(DingTalk ↔ Unified)
│ ├── DingTalkCards.ts # AI Card / ActionCard 模板
│ ├── README.md # DingTalk 插件说明
│ └── index.ts
│
└── utils/ # 工具函数
├── credentialCrypto.ts # 凭据 Base64 编解码
└── index.ts
graph TB
subgraph "IM 平台"
TG[Telegram]
LK[Lark / 飞书]
DT[DingTalk / 钉钉]
end
subgraph "Plugin Layer 插件层"
TP["TelegramPlugin
grammY · Long Polling"]
LP["LarkPlugin
larksuiteoapi · WebSocket"]
DP["DingTalkPlugin
dingtalk-stream · AI Card"]
end
subgraph "Gateway Layer 网关层"
PM["PluginManager
插件注册 · 生命周期 · 状态推送"]
AE["ActionExecutor
授权检查 · 路由 · 流式响应"]
end
subgraph "Core Layer 核心层"
CM["ChannelManager
单例编排器"]
SM["SessionManager
per-chat 会话隔离"]
PS["PairingService
配对码管理"]
end
subgraph "Agent Layer AI Agent 层"
CMS["ChannelMessageService
消息发送 · 流回调"]
CEB["ChannelEventBus
全局事件总线"]
end
subgraph "AionUi Core"
WM["WorkerManage
Agent 进程管理"]
AGT["Agent Task
Gemini / ACP / Codex"]
DB[("SQLite Database")]
IPC["ipcBridge
Renderer - Main"]
end
TG <--> TP
LK <--> LP
DT <--> DP
TP --> PM
LP --> PM
DP --> PM
PM --> AE
AE --> SM
AE --> PS
AE --> CMS
CM --> PM
CM --> SM
CM --> PS
CM --> AE
CMS --> WM
WM --> AGT
AGT -->|"ipcBridge + channelEventBus"| CEB
CEB --> CMS
SM --> DB
PS --> DB
PM --> IPC
ChannelManager 是整个 Channel 子系统的单例入口,在应用启动时调用 initialize(),关闭时调用 shutdown()。它创建并持有以下四个核心子组件:
ChannelManager (singleton)
├── PluginManager -- 插件注册/启停/状态监控
├── SessionManager -- 用户会话内存缓存 + DB 持久化
├── PairingService -- 配对码生成/审批/定时清理
└── ActionExecutor -- 消息路由/Action 分发/AI 对话
初始化时的关键连线:
ActionExecutor 注入 PluginManager、SessionManager、PairingServicePluginManager.setMessageHandler() 设置为 ActionExecutor.getMessageHandler()PluginManager.setConfirmHandler() 设置为工具确认回调(经由 ChannelMessageService.confirm())type PluginType = 'telegram' | 'slack' | 'discord' | 'lark' | 'dingtalk';
type PluginStatus = 'created' | 'initializing' | 'ready' | 'starting' | 'running' | 'stopping' | 'stopped' | 'error';
interface IChannelPluginConfig {
id: string;
type: PluginType;
name: string;
enabled: boolean;
credentials?: IPluginCredentials; // 加密存储(Base64)
config?: IPluginConfigOptions; // mode, webhookUrl, rateLimit, requireMention
status: PluginStatus;
lastConnected?: number;
createdAt: number;
updatedAt: number;
}
// 入站(平台 → 系统)
interface IUnifiedIncomingMessage {
id: string;
platform: PluginType;
chatId: string;
user: IUnifiedUser;
content: IUnifiedMessageContent;
timestamp: number;
action?: IMessageAction;
raw?: unknown;
}
// 出站(系统 → 平台)
interface IUnifiedOutgoingMessage {
type: 'text' | 'image' | 'file' | 'buttons';
text?: string;
parseMode?: 'HTML' | 'MarkdownV2' | 'Markdown';
buttons?: IActionButton[][];
replyMarkup?: unknown;
// ...
}
type ActionCategory = 'platform' | 'system' | 'chat';
interface IRegisteredAction {
name: string; // e.g. 'session.new', 'pairing.show'
category: ActionCategory;
description: string;
handler: ActionHandler; // (context, params?) => Promise<IActionResult>
}
type ChannelAgentType = 'gemini' | 'acp' | 'codex';
interface IChannelSession {
id: string;
userId: string;
agentType: ChannelAgentType;
conversationId?: string;
workspace?: string;
chatId?: string; // per-chat 隔离键(如 "user:xxx" 或 "group:xxx")
createdAt: number;
lastActivity: number;
}
sequenceDiagram
participant User as IM 用户
participant Plugin as Platform Plugin
participant AE as ActionExecutor
participant SM as SessionManager
participant PS as PairingService
participant CMS as ChannelMessageService
participant WM as WorkerManage
participant Agent as Agent Task
participant CEB as ChannelEventBus
User->>Plugin: 发送文字消息
Plugin->>Plugin: 转换为 IUnifiedIncomingMessage
Plugin->>AE: messageHandler(message)
Note over AE: 授权检查
AE->>PS: isUserAuthorized(userId, platform)
alt 未授权
PS-->>AE: false
AE->>Plugin: 显示配对码
else 已授权
PS-->>AE: true
AE->>SM: getSession(userId, chatId)
alt 无 Session
AE->>AE: 创建 Conversation + Session
end
Note over AE: 发送 "⏳ Thinking..." 占位消息
AE->>Plugin: sendMessage("⏳ Thinking...")
Plugin-->>AE: thinkingMsgId
AE->>CMS: sendMessage(sessionId, convId, text, onStream)
CMS->>CMS: initialize() / 注册 EventBus 监听
CMS->>WM: getTaskByIdRollbackBuild(convId, {yoloMode})
CMS->>Agent: task.sendMessage({input/content, msg_id})
loop 流式响应
Agent->>CEB: emitAgentMessage(convId, data)
CEB->>CMS: handleAgentMessage(event)
CMS->>CMS: transformMessage + composeMessage
CMS->>AE: onStream(TMessage, isInsert)
Note over AE: 500ms 节流定时器
AE->>Plugin: editMessage(msgId, outgoingMsg)
Plugin->>User: 更新消息内容
end
Agent->>CEB: emitAgentMessage(convId, {type:'finish'})
CEB->>CMS: stream.resolve()
CMS-->>AE: Promise resolved
Note over AE: 添加操作按钮
AE->>Plugin: editMessage(lastMsgId, finalMsg + buttons)
Plugin->>User: 显示完整回复 + 操作按钮
end
ActionExecutor.handleChatMessage() 中实现了基于定时器的节流控制,核心参数:
逻辑:
pendingMessage,重置定时器首条流式消息的特殊处理:始终作为对 "⏳ Thinking..." 消息的编辑(而非插入新消息),避免异步竞态条件。
三类 Action 及其处理器:
| 类别 | 名称 | 处理器 | 说明 |
|---|---|---|---|
| system | session.new | handleSessionNew | 创建新会话(清理旧 Session + Worker) |
session.status | handleSessionStatus | 显示当前会话状态 | |
help.show/features/pairing/tips | handleHelp* | 帮助信息 | |
settings.show | handleSettingsShow | 设置引导 | |
agent.show | handleAgentShow | 显示可用 Agent 列表 | |
agent.select | handleAgentSelect | 切换 Agent 类型 | |
| platform | pairing.show | handlePairingShow | 生成并显示配对码 |
pairing.refresh | handlePairingRefresh | 刷新配对码 | |
pairing.check | handlePairingCheck | 检查配对状态 | |
pairing.help | handlePairingHelp | 配对帮助 | |
| chat | chat.send | handleChatSend | 占位(实际由 ActionExecutor 处理) |
chat.regenerate | handleChatRegenerate | 重新生成回复 | |
chat.continue | handleChatContinue | 继续生成 | |
action.copy | handleCopy | 复制提示 | |
system.confirm | handleToolConfirm | 工具确认 |
Agent 请求确认 → Agent Task 广播 tool_group (status=Confirming)
→ ChannelEventBus → ChannelMessageService → ActionExecutor
→ 根据平台构建确认 UI(Telegram: InlineKeyboard / Lark: 互动卡片 / DingTalk: AI Card 按钮)
→ Plugin 推送给用户
用户点击按钮 → Plugin 解析回调数据
→ Telegram: confirmHandler(userId, 'telegram', callId, value)
→ Lark: extractCardAction → messageHandler → handleToolConfirm
→ DingTalk: extractCardAction → confirmHandler(userId, 'dingtalk', callId, value)
→ ChannelMessageService.confirm(conversationId, callId, value)
→ Agent Task.confirm() → Agent 继续执行
Agent Task(如 GeminiAgentManager)发送消息时同时通过两条路径:
这两条路径互不干扰,实现了 AionUi 桌面端和 IM 端的同步显示。
Plugin 配置存储在 assistant_plugins 表中,config 列以 JSON 格式存储 { credentials, config }。
凭据加密使用 Base64 编码(utils/credentialCrypto.ts):
encryptString(plaintext) → "b64:<base64>"decryptString(encoded) → 原文"enc:" 前缀(旧版 safeStorage)、"plain:" 前缀、无前缀各平台凭据字段:
| 平台 | 凭据字段 |
|---|---|
| Telegram | token (Bot Token) |
| Lark | appId, appSecret, encryptKey(?), verificationToken(?) |
| DingTalk | clientId, clientSecret |
配对流程(PairingService):
assistant_pairing_codes 表,状态为 pendingchannelBridge.pairingRequested.emit() 通知 Settings UIPairingService.approvePairing() 创建 assistant_users 记录channelBridge.userAuthorized.emit() 通知 Settings UISession 使用 复合键 userId:chatId 进行隔离,同一用户在不同群聊中拥有独立会话。
SessionManager.activeSessions: Map<compositeKey, IChannelSession>assistant_sessions 表(含 chat_id 列)cleanupStaleSessions(maxAgeMs = 24h)Conversation 级别也实现了 per-chat 隔离:
conversations 表增加 channel_chat_id 列ActionExecutor 通过 db.findChannelConversation(source, chatId, type, backend) 查找已有会话-- 插件配置(Migration v7, v10, v14 演进)
CREATE TABLE assistant_plugins (
id TEXT PRIMARY KEY,
type TEXT NOT NULL CHECK(type IN ('telegram','slack','discord','lark','dingtalk')),
name TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 0,
config TEXT NOT NULL, -- JSON: { credentials, config }
status TEXT,
last_connected INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- 授权用户(Migration v7)
CREATE TABLE assistant_users (
id TEXT PRIMARY KEY,
platform_user_id TEXT NOT NULL,
platform_type TEXT NOT NULL,
display_name TEXT,
authorized_at INTEGER NOT NULL,
last_active INTEGER,
session_id TEXT,
UNIQUE(platform_user_id, platform_type)
);
-- 用户会话(Migration v7, v14 增加 chat_id)
CREATE TABLE assistant_sessions (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
agent_type TEXT NOT NULL CHECK(agent_type IN ('gemini','acp','codex')),
conversation_id TEXT,
workspace TEXT,
chat_id TEXT, -- per-chat 隔离键
created_at INTEGER NOT NULL,
last_activity INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES assistant_users(id) ON DELETE CASCADE,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE SET NULL
);
-- 配对码(Migration v7)
CREATE TABLE assistant_pairing_codes (
code TEXT PRIMARY KEY,
platform_user_id TEXT NOT NULL,
platform_type TEXT NOT NULL,
display_name TEXT,
requested_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK(status IN ('pending','approved','rejected','expired'))
);
-- conversations 表扩展(Migration v12, v14)
-- source 列: 'aionui' | 'telegram' | 'lark' | 'dingtalk'
-- channel_chat_id 列: per-chat 隔离键
迁移演进:
assistant_plugins.type 约束增加 'lark'conversations.source 约束增加 'lark'channel_chat_id、chat_id)BasePlugin 定义了所有平台插件的生命周期状态机和通用接口:
生命周期状态机:
created → initializing → ready → starting → running → stopping → stopped
↓ ↓ ↓
error ←←←←←←←←←←←←←←←←←←←←←←←←←←←
抽象方法(子类实现):
| 方法 | 说明 |
|---|---|
onInitialize(config) | 平台专有初始化(校验凭据、创建 SDK 客户端) |
onStart() | 连接平台(启动轮询/WebSocket) |
onStop() | 断开连接、清理资源 |
sendMessage(chatId, message) | 发送消息,返回平台消息 ID |
editMessage(chatId, messageId, message) | 编辑已发送的消息(流式更新) |
getActiveUserCount() | 返回活跃用户数 |
getBotInfo() | 返回 Bot 信息 |
回调注册:
onMessage(handler) — 接收消息回调(由 PluginManager 注入 ActionExecutor 的处理函数)onConfirm(handler) — 工具确认回调grammy)bot.start() 内部自动删除 webhook)void this.messageHandler(msg).catch(...) 避免阻塞轮询循环)extractCategory(data) + extractAction(data) 解析 callback_query.data@larksuiteoapi/node-sdk(官方 Node SDK)lark.WSClient,无需公网 URL)processedEvents: Map<eventId, timestamp>,TTL 5 分钟,每分钟清理msg_type: 'interactive'),因为 Lark 仅支持编辑卡片消息im.message.receive_v1 — 接收消息card.action.trigger — 卡片按钮点击(需 3 秒内响应,因此异步处理)application.bot.menu_v6 — Bot 自定义菜单点击ou_ → open*id, oc*→ chat_id,on\_ → union_iddingtalk-streamDWClient,通过 TOPIC_ROBOT 和 TOPIC_CARD 注册回调)accessToken,调用 /v1.0/oauth2/accessToken 获取,缓存至过期前 60 秒POST /v1.0/card/instances,模板 ID 382e4302-...)POST /v1.0/card/instances/deliver)PUT /v1.0/card/streaming,isFull: true)PUT /v1.0/card/instances,flowStatus: '3')sessionWebhook(Markdown),再回退到 Open APIuser:{staffId},群聊 group:{conversationId}https 模块,30 秒超时channelBridge 通过 src/common/ipcBridge.ts 中的 channel 对象定义,提供 Settings UI 与主进程之间的双向通信:
请求-响应(Provider):
| 端点 | 方向 | 说明 |
|---|---|---|
channel.get-plugin-status | UI → Main | 获取所有插件状态 |
channel.enable-plugin | UI → Main | 启用插件(含凭据) |
channel.disable-plugin | UI → Main | 禁用插件 |
channel.test-plugin | UI → Main | 测试插件连接 |
channel.get-pending-pairings | UI → Main | 获取待审批配对请求 |
channel.approve-pairing | UI → Main | 审批配对请求 |
channel.reject-pairing | UI → Main | 拒绝配对请求 |
channel.get-authorized-users | UI → Main | 获取授权用户列表 |
channel.revoke-user | UI → Main | 撤销用户授权 |
channel.get-active-sessions | UI → Main | 获取活跃会话列表 |
事件推送(Emitter):
| 事件 | 方向 | 说明 |
|---|---|---|
channel.pairing-requested | Main → UI | 新配对请求(弹出通知) |
channel.plugin-status-changed | Main → UI | 插件状态变化(实时更新 UI) |
channel.user-authorized | Main → UI | 用户授权成功 |
IPC 处理器注册在 src/process/bridge/channelBridge.ts 的 initChannelBridge() 函数中,在应用启动时调用。
| 模式 | 应用位置 | 说明 |
|---|---|---|
| 单例模式 | ChannelManager, ChannelMessageService, PairingService, channelEventBus | 全局唯一实例,通过 getInstance() 或 get*() 获取 |
| 策略模式 | BasePlugin + 三个具体插件 | 统一接口,不同平台实现 |
| 注册表模式 | PluginManager.pluginRegistry, ActionExecutor.actionRegistry | 动态注册处理器,按名称/类型查找 |
| 观察者模式 | ChannelEventBus (extends EventEmitter) | Agent → Channel 的解耦消息传递 |
| 适配器模式 | TelegramAdapter, LarkAdapter, DingTalkAdapter | 平台原生格式 ↔ 统一格式转换 |
| 状态机模式 | BasePlugin 生命周期 | created→initializing→ready→starting→running→stopping→stopped |
| 复合键模式 | SessionManager.buildKey(userId, chatId) | 支持 per-chat 会话隔离 |
| 节流模式 | ActionExecutor.handleChatMessage() | 500ms 定时器节流流式消息更新 |
| 降级策略 | DingTalkPlugin.sendMessage() | AI Card → sessionWebhook → Open API 三级降级 |
| 事件去重 | LarkPlugin, DingTalkPlugin | processedEvents Map + TTL 清理 |