pkg/channels/README.zh.md
影响范围:
pkg/channels/,pkg/bus/,pkg/media/,pkg/identity/,cmd/picoclaw/internal/gateway/
重构前(main 分支):
pkg/channels/
├── telegram.go # 每个 channel 直接放在 channels 包内
├── discord.go
├── slack.go
├── manager.go # Manager 直接引用各 channel 类型
├── ...
pkg/channels/ 包的顶层switch 或 if-else 链条直接构造各 channelMetadata map[string]string 中重构后(refactor/channel-system 分支):
pkg/channels/
├── base.go # BaseChannel 共享抽象层
├── interfaces.go # 可选能力接口(TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder)
├── README.md # 英文文档
├── README.zh.md # 中文文档
├── media.go # MediaSender 可选接口
├── webhook.go # WebhookHandler, HealthChecker 可选接口
├── errors.go # 错误哨兵值(ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed)
├── errutil.go # 错误分类帮助函数
├── registry.go # 工厂注册表(RegisterFactory / getFactory)
├── manager.go # 统一编排:Worker 队列、速率限制、重试、Typing/Placeholder、共享 HTTP
├── split.go # 长消息智能分割(保留代码块完整性)
├── telegram/ # 每个 channel 独立子包
│ ├── init.go # 工厂注册
│ ├── telegram.go # 实现
│ └── telegram_commands.go
├── discord/
│ ├── init.go
│ └── discord.go
├── slack/ line/ onebot/ dingtalk/ feishu/ wecom/ qq/ whatsapp/ whatsapp_native/ maixcam/ pico/
│ └── ...
pkg/bus/
├── bus.go # MessageBus(缓冲区 64,安全关闭+排水)
├── types.go # 结构化消息类型(Peer, SenderInfo, MediaPart, InboundMessage, OutboundMessage, OutboundMediaMessage)
pkg/media/
├── store.go # MediaStore 接口 + FileMediaStore 实现(两阶段释放,TTL 清理)
pkg/identity/
├── identity.go # 统一用户身份:规范 "platform:id" 格式 + 向后兼容匹配
┌────────────┐ InboundMessage ┌───────────┐ LLM + Tools ┌────────────┐
│ Telegram │──┐ │ │ │ │
│ Discord │──┤ PublishInbound() │ │ PublishOutbound() │ │
│ Slack │──┼──────────────────────▶ │ MessageBus │ ◀─────────────────── │ AgentLoop │
│ LINE │──┤ (buffered chan, 64) │ │ (buffered chan, 64) │ │
│ ... │──┘ │ │ │ │
└────────────┘ └─────┬─────┘ └────────────┘
│
SubscribeOutbound() │ SubscribeOutboundMedia()
▼
┌───────────────────┐
│ Manager │
│ ├── dispatchOutbound() 路由到 Worker 队列
│ ├── dispatchOutboundMedia()
│ ├── runWorker() 消息分割 + sendWithRetry()
│ ├── runMediaWorker() sendMediaWithRetry()
│ ├── preSend() 停止 Typing + 撤销 Reaction + 编辑 Placeholder
│ └── runTTLJanitor() 清理过期 Typing/Placeholder
└────────┬──────────┘
│
channel.Send() / SendMedia()
│
▼
┌────────────────┐
│ 各平台 API/SDK │
└────────────────┘
| 原则 | 说明 |
|---|---|
| 子包隔离 | 每个 channel 一个独立 Go 子包,依赖 channels 父包提供的 BaseChannel 和接口 |
| 工厂注册 | 各子包通过 init() 自注册,Manager 通过名字查找工厂,消除 import 耦合 |
| 能力发现 | 可选能力通过接口(MediaSender, TypingCapable, ReactionCapable, PlaceholderCapable, MessageEditor, WebhookHandler, HealthChecker)声明,Manager 运行时类型断言发现 |
| 结构化消息 | Peer、MessageID、SenderInfo 从 Metadata 提升为 InboundMessage 的一等字段 |
| 错误分类 | Channel 返回哨兵错误(ErrRateLimit, ErrTemporary 等),Manager 据此决定重试策略 |
| 集中编排 | 速率限制、消息分割、重试、Typing/Reaction/Placeholder 全部由 Manager 和 BaseChannel 统一处理,Channel 只负责 Send |
在 main 分支上,Channel 文件直接位于 pkg/channels/ 顶层,例如:
pkg/channels/telegram.gopkg/channels/discord.go重构后,这些文件已被删除,代码移动到了对应子包:
pkg/channels/telegram/telegram.gopkg/channels/discord/discord.go| main 分支文件 | 重构分支位置 | 变化 |
|---|---|---|
pkg/channels/telegram.go | pkg/channels/telegram/telegram.go + init.go | 包名从 channels 变为 telegram |
pkg/channels/discord.go | pkg/channels/discord/discord.go + init.go | 同上 |
pkg/channels/manager.go | pkg/channels/manager.go | 大幅重写 |
| (不存在) | pkg/channels/base.go | 新增共享抽象层 |
| (不存在) | pkg/channels/registry.go | 新增工厂注册表 |
| (不存在) | pkg/channels/errors.go + errutil.go | 新增错误分类体系 |
| (不存在) | pkg/channels/interfaces.go | 新增可选能力接口 |
| (不存在) | pkg/channels/media.go | 新增 MediaSender 接口 |
| (不存在) | pkg/channels/webhook.go | 新增 WebhookHandler/HealthChecker |
| (不存在) | pkg/channels/whatsapp_native/ | 新增 WhatsApp 原生模式(whatsmeow) |
| (不存在) | pkg/channels/split.go | 新增消息分割(从 utils 迁入) |
| (不存在) | pkg/bus/types.go | 新增结构化消息类型 |
| (不存在) | pkg/media/store.go | 新增媒体文件生命周期管理 |
| (不存在) | pkg/identity/identity.go | 新增统一用户身份 |
以 Telegram 为例,主要改动项:
3a. 包声明和导入
// 旧代码(main 分支)
package channels
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
)
// 新代码(重构分支)
package telegram
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels" // 引用父包
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity" // 新增
"github.com/sipeed/picoclaw/pkg/media" // 新增(如需媒体)
)
3b. 结构体嵌入 BaseChannel
// 旧代码:直接持有 bus、config 等字段
type TelegramChannel struct {
bus *bus.MessageBus
config *config.Config
running bool
allowList []string
// ...
}
// 新代码:嵌入 BaseChannel,它提供 bus、running、allowList 等
type TelegramChannel struct {
*channels.BaseChannel // 嵌入共享抽象
bot *telego.Bot
config *config.Config
// ... 只保留 channel 特有字段
}
3c. 构造函数
// 旧代码:直接赋值
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
return &TelegramChannel{
bus: bus,
config: cfg,
allowList: cfg.Channels.Telegram.AllowFrom,
// ...
}, nil
}
// 新代码:使用 NewBaseChannel + 功能选项
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
base := channels.NewBaseChannel(
"telegram", // 名称
cfg.Channels.Telegram, // 原始配置(any 类型)
bus, // 消息总线
cfg.Channels.Telegram.AllowFrom, // 允许列表
channels.WithMaxMessageLength(4096), // 平台消息长度上限
channels.WithGroupTrigger(cfg.Channels.Telegram.GroupTrigger), // 群聊触发配置
channels.WithReasoningChannelID(cfg.Channels.Telegram.ReasoningChannelID), // 思维链路由
)
return &TelegramChannel{
BaseChannel: base,
bot: bot,
config: cfg,
}, nil
}
3d. Start/Stop 生命周期
// 新代码:使用 SetRunning 原子操作
func (c *TelegramChannel) Start(ctx context.Context) error {
// ... 初始化 bot、webhook 等
c.SetRunning(true) // 必须在就绪后调用
go bh.Start()
return nil
}
func (c *TelegramChannel) Stop(ctx context.Context) error {
c.SetRunning(false) // 必须在清理前调用
// ... 停止 bot handler、取消 context
return nil
}
3e. Send 方法的错误返回
// 旧代码:只返回 error
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.running { return fmt.Errorf("not running") }
// ...
if err != nil { return err }
}
// 新代码:返回投递后的消息 ID,以及供 Manager 判断重试策略的哨兵错误
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning // ← Manager 不会重试
}
// ...
if err != nil {
// 使用 ClassifySendError 根据 HTTP 状态码包装错误
return nil, channels.ClassifySendError(statusCode, err)
// 或手动包装:
// return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
}
return []string{deliveredID}, nil // 如果拿不到 ID,也可以返回 nil, nil
}
3f. 消息接收(Inbound)
// 旧代码:直接构造 InboundMessage 并发布
msg := bus.InboundMessage{
Channel: "telegram",
SenderID: senderID,
ChatID: chatID,
Content: content,
Metadata: map[string]string{
"peer_kind": "group", // 路由信息埋在 metadata
"peer_id": chatID,
"message_id": msgID,
},
}
c.bus.PublishInbound(ctx, msg)
// 新代码:使用 BaseChannel.HandleMessage,传入结构化字段
sender := bus.SenderInfo{
Platform: "telegram",
PlatformID: strconv.FormatInt(from.ID, 10),
CanonicalID: identity.BuildCanonicalID("telegram", strconv.FormatInt(from.ID, 10)),
Username: from.Username,
DisplayName: from.FirstName,
}
peer := bus.Peer{
Kind: "group", // 或 "direct"
ID: chatID,
}
// HandleMessage 内部调用 IsAllowedSender 检查权限,构建 MediaScope,发布到 bus
c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, mediaRefs, metadata, sender)
3g. 添加工厂注册(必需)
为你的 channel 创建 init.go:
// pkg/channels/telegram/init.go
package telegram
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory(config.ChannelTelegram, func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.TelegramSettings)
if !ok { return nil, channels.ErrSendFailed }
return NewTelegramChannel(bc, c, b)
})
}
3h. 在 Gateway 中导入子包
// cmd/picoclaw/internal/gateway/helpers.go
import (
_ "github.com/sipeed/picoclaw/pkg/channels/telegram" // 触发 init() 注册
_ "github.com/sipeed/picoclaw/pkg/channels/discord"
_ "github.com/sipeed/picoclaw/pkg/channels/your_new_channel" // 新增
)
如果你的代码直接读取 InboundMessage.Metadata 中的路由字段:
// 旧代码
peerKind := msg.Metadata["peer_kind"]
peerID := msg.Metadata["peer_id"]
msgID := msg.Metadata["message_id"]
// 新代码
peerKind := msg.Peer.Kind // 一等字段
peerID := msg.Peer.ID // 一等字段
msgID := msg.MessageID // 一等字段
sender := msg.Sender // bus.SenderInfo 结构体
scope := msg.MediaScope // 媒体生命周期作用域
// 旧代码
if !c.isAllowed(senderID) { return }
// 新代码:优先使用结构化检查
if !c.IsAllowedSender(sender) { return }
// 或回退到字符串检查:
if !c.IsAllowed(senderID) { return }
BaseChannel.HandleMessage 方法内部已经处理了这个逻辑,无需在 channel 中重复检查。
Manager 已被完全重写。你的修改需要理解新架构:
| 旧 Manager 职责 | 新 Manager 职责 |
|---|---|
| 直接构造 channel(switch/if-else) | 通过工厂注册表查找并构造 |
| 直接调用 channel.Send | 通过 per-channel Worker 队列 + 速率限制 + 重试 |
| 无消息分割 | 自动根据 MaxMessageLength 分割长消息 |
| 各 channel 自建 HTTP 服务器 | 统一共享 HTTP 服务器 |
| 无 Typing/Placeholder 管理 | 统一 preSend 处理 Typing 停止 + Reaction 撤销 + Placeholder 编辑;入站侧 BaseChannel.HandleMessage 自动编排 Typing/Reaction/Placeholder |
| 无 TTL 清理 | runTTLJanitor 定期清理过期 Typing/Reaction/Placeholder 条目 |
Agent Loop 的主要变化:
agentLoop.SetMediaStore(mediaStore) — Agent 通过 MediaStore 解析工具产生的媒体引用agentLoop.SetChannelManager(channelManager) — Agent 可查询 channel 状态bus.PublishOutboundMedia() 发送媒体消息,而非嵌入文本回复msg.Peer 结构化字段而非 Metadata 查找要添加一个新的聊天平台(例如 matrix),你需要:
pkg/channels/matrix/init.go — 工厂注册matrix.go — Channel 实现pkg/config/ 中添加配置结构体pkg/channels/matrix/init.gopackage matrix
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory(config.ChannelMatrix, func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.MatrixSettings)
if !ok { return nil, channels.ErrSendFailed }
return NewMatrixChannel(bc, c, b)
})
}
pkg/channels/matrix/matrix.gopackage matrix
import (
"context"
"fmt"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity"
"github.com/sipeed/picoclaw/pkg/logger"
)
// MatrixChannel implements channels.Channel for the Matrix protocol.
type MatrixChannel struct {
*channels.BaseChannel // 必须嵌入
config *config.Config
ctx context.Context
cancel context.CancelFunc
// ... Matrix SDK 客户端等
}
func NewMatrixChannel(cfg *config.Config, msgBus *bus.MessageBus) (*MatrixChannel, error) {
matrixCfg := cfg.Channels.Matrix // 假设配置中有此字段
base := channels.NewBaseChannel(
"matrix", // channel 名称(全局唯一)
matrixCfg, // 原始配置
msgBus, // 消息总线
matrixCfg.AllowFrom, // 允许列表
channels.WithMaxMessageLength(65536), // Matrix 消息长度限制
channels.WithGroupTrigger(matrixCfg.GroupTrigger),
channels.WithReasoningChannelID(matrixCfg.ReasoningChannelID), // 思维链路由(可选)
)
return &MatrixChannel{
BaseChannel: base,
config: cfg,
}, nil
}
// ========== 必须实现的 Channel 接口方法 ==========
func (c *MatrixChannel) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
// 1. 初始化 Matrix 客户端
// 2. 开始监听消息
// 3. 标记为运行中
c.SetRunning(true)
logger.InfoC("matrix", "Matrix channel started")
return nil
}
func (c *MatrixChannel) Stop(ctx context.Context) error {
c.SetRunning(false)
if c.cancel != nil {
c.cancel()
}
logger.InfoC("matrix", "Matrix channel stopped")
return nil
}
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
// 1. 检查运行状态
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
// 2. 发送消息到 Matrix
eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
if err != nil {
// 3. 必须使用错误分类包装
// 如果你有 HTTP 状态码:
// return nil, channels.ClassifySendError(statusCode, err)
// 如果是网络错误:
// return nil, channels.ClassifyNetError(err)
// 如果需要手动分类:
return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
}
return []string{eventID}, nil
}
// ========== 消息接收处理 ==========
func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content string, msgID string) {
// 1. 构造结构化发送者身份
sender := bus.SenderInfo{
Platform: "matrix",
PlatformID: senderID,
CanonicalID: identity.BuildCanonicalID("matrix", senderID),
Username: senderID,
DisplayName: displayName,
}
// 2. 确定 Peer 类型(直聊 vs 群聊)
peer := bus.Peer{
Kind: "group", // 或 "direct"
ID: roomID,
}
// 3. 群聊过滤(如适用)
isGroup := peer.Kind == "group"
if isGroup {
isMentioned := false // 根据平台特性检测 @提及
shouldRespond, cleanContent := c.ShouldRespondInGroup(isMentioned, content)
if !shouldRespond {
return
}
content = cleanContent
}
// 4. 处理媒体附件(如有)
var mediaRefs []string
store := c.GetMediaStore()
if store != nil {
// 下载附件到本地 → store.Store() → 获取 ref
// mediaRefs = append(mediaRefs, ref)
}
// 5. 调用 HandleMessage 发布到 bus
// HandleMessage 内部会:
// - 检查 IsAllowedSender/IsAllowed
// - 构建 MediaScope
// - 发布 InboundMessage
c.HandleMessage(
c.ctx,
peer,
msgID, // 平台消息 ID
senderID, // 原始发送者 ID
roomID, // 聊天/房间 ID
content, // 消息内容
mediaRefs, // 媒体引用列表
nil, // 额外 metadata(通常 nil)
sender, // SenderInfo(variadic 参数)
)
}
// ========== 内部方法 ==========
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) {
// 实际的 Matrix SDK 调用
return "event-id", nil
}
根据平台能力,你的 Channel 可以选择性实现以下接口:
// 如果平台支持发送图片/文件/音频/视频
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
store := c.GetMediaStore()
if store == nil {
return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed)
}
var messageIDs []string
for _, part := range msg.Parts {
localPath, err := store.Resolve(part.Ref)
if err != nil {
logger.ErrorCF("matrix", "Failed to resolve media", map[string]any{
"ref": part.Ref, "error": err.Error(),
})
continue
}
// 根据 part.Type ("image"|"audio"|"video"|"file") 调用对应 API
switch part.Type {
case "image":
// 上传图片到 Matrix
default:
// 上传文件到 Matrix
}
// 如果 API 能返回平台消息 ID,就在这里追加。
// messageIDs = append(messageIDs, uploadedMessageID)
}
return messageIDs, nil
}
// 如果平台支持 "正在输入..." 提示
func (c *MatrixChannel) StartTyping(ctx context.Context, chatID string) (stop func(), err error) {
// 调用 Matrix API 发送 typing 指示器
// 返回的 stop 函数必须是幂等的
stopped := false
return func() {
if !stopped {
stopped = true
// 调用 Matrix API 停止 typing
}
}, nil
}
// 如果平台支持对入站消息添加 emoji 反应(如 Slack 的 👀、OneBot 的表情 289)
func (c *MatrixChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error) {
// 调用 Matrix API 添加反应到消息
// 返回的 undo 函数移除反应,必须是幂等的
err = c.addReaction(chatID, messageID, "eyes")
if err != nil {
return func() {}, err
}
return func() {
c.removeReaction(chatID, messageID, "eyes")
}, nil
}
// 如果平台支持编辑已发送的消息(用于 Placeholder 替换)
func (c *MatrixChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error {
// 调用 Matrix API 编辑消息
return nil
}
// 如果平台支持发送占位消息(如 "Thinking... 💭"),并且实现了 MessageEditor,
// 则 Manager 的 preSend 会在出站时自动将占位消息编辑为最终回复。
// SendPlaceholder 内部根据 PlaceholderConfig.Enabled 决定是否发送;
// 返回 ("", nil) 表示跳过。
func (c *MatrixChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
cfg := c.config.Channels.Matrix.Placeholder
if !cfg.Enabled {
return "", nil
}
text := cfg.Text
if text == "" {
text = "Thinking... 💭"
}
// 调用 Matrix API 发送占位消息
msg, err := c.sendText(ctx, chatID, text)
if err != nil {
return "", err
}
return msg.ID, nil
}
// 如果 channel 通过 webhook 接收消息(而非长轮询/WebSocket)
func (c *MatrixChannel) WebhookPath() string {
return "/webhook/matrix" // 路径会被注册到共享 HTTP 服务器
}
func (c *MatrixChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 处理 webhook 请求
}
func (c *MatrixChannel) HealthPath() string {
return "/health/matrix"
}
func (c *MatrixChannel) HealthHandler(w http.ResponseWriter, r *http.Request) {
if c.IsRunning() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
}
BaseChannel.HandleMessage 在发布入站消息之前,自动检测 channel 是否实现了 TypingCapable、ReactionCapable 和/或 PlaceholderCapable,并触发相应的指示器。三条管道完全独立,互不干扰:
// BaseChannel.HandleMessage 内部自动执行(无需 channel 手动调用):
if c.owner != nil && c.placeholderRecorder != nil {
// Typing — 独立管道
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
}
}
// Reaction — 独立管道
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — 独立管道
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
这意味着:
TypingCapable 的 channel(Telegram、Discord、LINE、Pico)无需在 handleMessage 中手动调用 StartTyping + RecordTypingStopReactionCapable 的 channel(Slack、OneBot)无需在 handleMessage 中手动调用 AddReaction + RecordTypingStopPlaceholderCapable 的 channel(Telegram、Discord、Pico)无需在 handleMessage 中手动发送占位消息并调用 RecordPlaceholderHandleMessage 会自动完成编排PlaceholderCapable 的 SendPlaceholder 方法内部根据配置的 PlaceholderConfig.Enabled 决定是否发送;返回 ("", nil) 时跳过注册Owner 注入:Manager 在 initChannel 中自动调用 SetOwner(ch) 将具体 channel 注入 BaseChannel,无需开发者手动设置。
当 Agent 处理完消息后,Manager 的 preSend 会自动:
stop() 停止 Typingundo() 撤销 ReactionMessageEditor,尝试编辑 Placeholder 为最终回复(跳过 Send)Channels 现在使用统一的 map 类型配置(map[string]*config.Channel)。
每个 channel 条目将通用字段(enabled、type、allow_from 等)放在顶层,
channel 特定的设置放在 settings 子键中:
{
"channels": {
"matrix": {
"enabled": true,
"type": "matrix",
"allow_from": ["@user:example.com"],
"settings": {
"home_server": "https://matrix.org",
"user_id": "@bot:example.com",
"access_token": "enc://..."
}
}
}
}
安全字段(token、密码、API 密钥)放入 .security.yml:
channels:
matrix:
access_token: "your-matrix-access-token"
Channel 类型必须在 pkg/config/config_channel.go 的 channelSettingsFactory 中注册:
var channelSettingsFactory = map[string]any{
// ... 现有 channels
ChannelMatrix: (MatrixSettings{}),
}
Manager 使用 InitChannelList() 来验证类型和解码设置,
然后通过 bc.Type 查找工厂。不需要在 Manager 中添加每个 channel 的条目——
只需注册工厂和配置条目即可。
注意:如果你的 channel 有多种模式(如 WhatsApp Bridge vs Native), 在
channelSettingsFactory中注册两种类型,并根据配置分支:go// 在 config_channel.go 中: ChannelWhatsApp: (WhatsAppSettings{}), ChannelWhatsAppNative: (WhatsAppSettings{}),
// cmd/picoclaw/internal/gateway/helpers.go
import (
_ "github.com/sipeed/picoclaw/pkg/channels/matrix"
)
文件:pkg/bus/bus.go、pkg/bus/types.go
type MessageBus struct {
inbound chan InboundMessage // 缓冲区 = 64
outbound chan OutboundMessage // 缓冲区 = 64
outboundMedia chan OutboundMediaMessage // 缓冲区 = 64
done chan struct{} // 关闭信号
closed atomic.Bool // 防止重复关闭
}
关键行为:
| 方法 | 行为 |
|---|---|
PublishInbound(ctx, msg) | 检查 closed → 发送到 inbound channel → 阻塞/超时/关闭 |
ConsumeInbound(ctx) | 从 inbound 读取 → 阻塞/关闭/取消 |
PublishOutbound(ctx, msg) | 发送到 outbound channel |
SubscribeOutbound(ctx) | 从 outbound 读取(Manager dispatcher 调用) |
PublishOutboundMedia(ctx, msg) | 发送到 outboundMedia channel |
SubscribeOutboundMedia(ctx) | 从 outboundMedia 读取(Manager media dispatcher 调用) |
Close() | CAS 关闭 → close(done) → 排水所有 channel(不关闭 channel 本身,避免并发 send-on-closed panic) |
设计要点:
Close() 不关闭底层 channel(只关闭 done 信号通道),因为可能有正在并发 Publish 的 goroutine文件:pkg/bus/types.go
// 路由对等体
type Peer struct {
Kind string `json:"kind"` // "direct" | "group" | "channel" | ""
ID string `json:"id"`
}
// 发送者身份信息
type SenderInfo struct {
Platform string `json:"platform,omitempty"` // "telegram", "discord", ...
PlatformID string `json:"platform_id,omitempty"` // 平台原始 ID
CanonicalID string `json:"canonical_id,omitempty"` // "platform:id" 规范格式
Username string `json:"username,omitempty"`
DisplayName string `json:"display_name,omitempty"`
}
// 入站消息
type InboundMessage struct {
Channel string // 来源 channel 名称
SenderID string // 发送者 ID(优先使用 CanonicalID)
Sender SenderInfo // 结构化发送者信息
ChatID string // 聊天/房间 ID
Content string // 消息文本
Media []string // 媒体引用列表(media://...)
Peer Peer // 路由对等体(一等字段)
MessageID string // 平台消息 ID(一等字段)
MediaScope string // 媒体生命周期作用域
SessionKey string // 会话键
Metadata map[string]string // 仅用于 channel 特有扩展
}
// 出站文本消息
type OutboundMessage struct {
Channel string
ChatID string
Content string
}
// 出站媒体消息
type OutboundMediaMessage struct {
Channel string
ChatID string
Parts []MediaPart
}
// 媒体片段
type MediaPart struct {
Type string // "image" | "audio" | "video" | "file"
Ref string // "media://uuid"
Caption string
Filename string
ContentType string
}
文件:pkg/channels/base.go
BaseChannel 是所有 channel 的共享抽象层,提供以下能力:
| 方法/特性 | 说明 |
|---|---|
Name() string | Channel 名称 |
IsRunning() bool | 原子读取运行状态 |
SetRunning(bool) | 原子设置运行状态 |
MaxMessageLength() int | 消息长度限制(rune 计数),0 = 无限制 |
ReasoningChannelID() string | 思维链路由目标 channel ID(空 = 不路由) |
IsAllowed(senderID string) bool | 旧格式允许列表检查(支持 "id|username" 和 "@username" 格式) |
IsAllowedSender(sender SenderInfo) bool | 新格式允许列表检查(委托给 identity.MatchAllowed) |
ShouldRespondInGroup(isMentioned, content) (bool, string) | 统一群聊触发过滤逻辑 |
HandleMessage(...) | 统一入站消息处理:权限检查 → 构建 MediaScope → 自动触发 Typing/Reaction/Placeholder → 发布到 Bus |
SetMediaStore(s) / GetMediaStore() | Manager 注入的媒体存储 |
SetPlaceholderRecorder(r) / GetPlaceholderRecorder() | Manager 注入的占位符记录器 |
SetOwner(ch) | Manager 注入的具体 channel 引用(用于 HandleMessage 内部的 Typing/Reaction/Placeholder 类型断言) |
功能选项:
channels.WithMaxMessageLength(4096) // 设置平台消息长度限制
channels.WithGroupTrigger(groupTriggerCfg) // 设置群聊触发配置
channels.WithReasoningChannelID(id) // 设置思维链路由目标 channel
文件:pkg/channels/registry.go
type ChannelFactory func(channelName, channelType string, cfg *config.Config, bus *bus.MessageBus) (Channel, error)
func RegisterFactory(name string, f ChannelFactory) // 子包 init() 中调用
func getFactory(name string) (ChannelFactory, bool) // Manager 内部调用
func GetRegisteredFactoryNames() []string // 返回所有已注册的工厂名称
为方便使用,RegisterSafeFactory[S any] 提供自动类型安全的设置解码:
// 不使用 RegisterSafeFactory(手动 GetDecoded() + 类型断言):
channels.RegisterFactory(config.ChannelTelegram,
func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.TelegramSettings)
if !ok { return nil, ErrSendFailed }
return NewTelegramChannel(bc, c, b)
})
// 使用 RegisterSafeFactory(同等安全,减少样板代码):
channels.RegisterSafeFactory(config.ChannelTelegram, NewTelegramChannel)
工厂注册表使用 sync.RWMutex 保护,在 init() 阶段注册(进程启动时完成)。Manager 在 initChannel() 中通过名字查找工厂并调用它。
文件:pkg/channels/errors.go、pkg/channels/errutil.go
var (
ErrNotRunning = errors.New("channel not running") // 永久:不重试
ErrRateLimit = errors.New("rate limited") // 固定延迟:1s 后重试
ErrTemporary = errors.New("temporary failure") // 指数退避:500ms * 2^attempt,最大 8s
ErrSendFailed = errors.New("send failed") // 永久:不重试
)
// 根据 HTTP 状态码自动分类
func ClassifySendError(statusCode int, rawErr error) error {
// 429 → ErrRateLimit
// 5xx → ErrTemporary
// 4xx → ErrSendFailed
}
// 网络错误统一包装为临时错误
func ClassifyNetError(err error) error {
// → ErrTemporary
}
sendWithRetry)最大重试次数: 3
速率限制延迟: 1 秒
基础退避: 500 毫秒
最大退避: 8 秒
重试逻辑:
ErrNotRunning → 立即失败,不重试
ErrSendFailed → 立即失败,不重试
ErrRateLimit → 等待 1s → 重试
ErrTemporary → 等待 500ms * 2^attempt(最大 8s) → 重试
其他未知错误 → 等待 500ms * 2^attempt(最大 8s) → 重试
文件:pkg/channels/manager.go
type channelWorker struct {
ch Channel // channel 实例
queue chan bus.OutboundMessage // 出站文本队列(缓冲 16)
mediaQueue chan bus.OutboundMediaMessage // 出站媒体队列(缓冲 16)
done chan struct{} // 文本 worker 完成信号
mediaDone chan struct{} // 媒体 worker 完成信号
limiter *rate.Limiter // per-channel 速率限制器
}
var channelRateConfig = map[string]float64{
"telegram": 20, // 20 msg/s
"discord": 1, // 1 msg/s
"slack": 1, // 1 msg/s
"line": 10, // 10 msg/s
}
// 默认: 10 msg/s
// burst = max(1, ceil(rate/2))
StartAll:
1. 遍历已注册 channels → channel.Start(ctx)
2. 为每个启动成功的 channel 创建 channelWorker
3. 启动 goroutines:
- runWorker (per-channel 出站文本)
- runMediaWorker (per-channel 出站媒体)
- dispatchOutbound (从 bus 路由到 worker 队列)
- dispatchOutboundMedia (从 bus 路由到 media worker 队列)
- runTTLJanitor (每 10s 清理过期 typing/reaction/placeholder)
4. 启动共享 HTTP 服务器(如已配置)
StopAll:
1. 关闭共享 HTTP 服务器(5s 超时)
2. 取消 dispatcher context
3. 关闭 text worker 队列 → 等待排水完成
4. 关闭 media worker 队列 → 等待排水完成
5. 停止每个 channel(channel.Stop)
// Manager 实现 PlaceholderRecorder 接口
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string)
func (m *Manager) RecordTypingStop(channel, chatID string, stop func())
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func())
// 入站侧:BaseChannel.HandleMessage 自动编排
// BaseChannel.HandleMessage 在 PublishInbound 之前,通过 owner 类型断言自动触发:
// - TypingCapable.StartTyping → RecordTypingStop
// - ReactionCapable.ReactToMessage → RecordReactionUndo
// - PlaceholderCapable.SendPlaceholder → RecordPlaceholder
// 三者独立,互不干扰。Channel 无需手动调用。
// 出站侧:发送前处理
func (m *Manager) preSend(ctx, name, msg, ch) bool {
key := name + ":" + msg.ChatID
// 1. 停止 Typing(调用存储的 stop 函数)
// 2. 撤销 Reaction(调用存储的 undo 函数)
// 3. 尝试编辑 Placeholder(如果 channel 实现了 MessageEditor)
// 成功 → return true(跳过 Send)
// 失败 → return false(继续 Send)
}
Manager 存储完全分离,三条管道互不干扰:
Manager {
typingStops sync.Map // "channel:chatID" → typingEntry ← 管 TypingCapable
reactionUndos sync.Map // "channel:chatID" → reactionEntry ← 管 ReactionCapable
placeholders sync.Map // "channel:chatID" → placeholderEntry
}
TTL 清理:
文件:pkg/channels/split.go
SplitMessage(content string, maxLen int) []string
智能分割策略:
```)\n```\n + header)文件:pkg/media/store.go
type MediaStore interface {
Store(localPath string, meta MediaMeta, scope string) (ref string, err error)
Resolve(ref string) (localPath string, err error)
ResolveWithMeta(ref string) (localPath string, meta MediaMeta, err error)
ReleaseAll(scope string) error
}
FileMediaStore 实现:
media://<uuid>channel:chatID:messageID(由 BuildMediaScope 生成)NewFileMediaStoreWithCleanup → Start() 启动后台清理协程文件:pkg/identity/identity.go
// 构建规范 ID
func BuildCanonicalID(platform, platformID string) string
// → "telegram:123456"
// 解析规范 ID
func ParseCanonicalID(canonical string) (platform, id string, ok bool)
// 匹配允许列表(向后兼容)
func MatchAllowed(sender bus.SenderInfo, allowed string) bool
MatchAllowed 支持的允许列表格式:
| 格式 | 匹配方式 |
|---|---|
"123456" | 匹配 sender.PlatformID |
"@alice" | 匹配 sender.Username |
"123456|alice" | 匹配 PlatformID 或 Username(旧格式兼容) |
"telegram:123456" | 精确匹配 sender.CanonicalID(新格式) |
文件:pkg/channels/manager.go 的 SetupHTTPServer
Manager 创建单一 http.Server,自动发现和注册:
WebhookHandler 的 channel → 挂载到 wh.WebhookPath()HealthChecker 的 channel → 挂载到 hc.HealthPath()health.Server.RegisterOnMux 注册超时配置:ReadTimeout = 30s, WriteTimeout = 30s
错误分类是合约:Channel 的 Send 方法必须返回哨兵错误(或包装它们)。Manager 的重试策略完全依赖 errors.Is 检查。如果返回未分类的错误,Manager 会按"未知错误"处理(指数退避重试)。
SetRunning 是生命周期信号:Start 成功后必须调用 c.SetRunning(true),Stop 开始时必须调用 c.SetRunning(false)。Send 中必须检查 c.IsRunning() 并返回 ErrNotRunning。
HandleMessage 包含权限检查:不要在调用 HandleMessage 之前自行进行权限检查(除非你需要在检查前做平台特定的预处理)。HandleMessage 内部已经调用 IsAllowedSender/IsAllowed。
消息分割由 Manager 处理:Channel 的 Send 方法不需要处理长消息分割。Manager 会在调用 Send 之前根据 MaxMessageLength() 自动分割。Channel 只需通过 WithMaxMessageLength 声明限制。
Typing/Reaction/Placeholder 由 BaseChannel + Manager 自动处理:Channel 的 Send 方法不需要管理 Typing 停止、Reaction 撤销或 Placeholder 编辑。BaseChannel.HandleMessage 在入站侧自动触发 TypingCapable、ReactionCapable 和 PlaceholderCapable(通过 owner 类型断言);Manager 的 preSend 在出站侧自动停止 Typing、撤销 Reaction、编辑 Placeholder。Channel 只需实现对应接口即可。
工厂注册在 init() 中:每个子包必须有 init.go 文件调用 channels.RegisterFactory。Gateway 必须通过 blank import(_ "pkg/channels/xxx")触发注册。
不要再把以下信息放入 Metadata:
peer_kind / peer_id → 使用 InboundMessage.Peermessage_id → 使用 InboundMessage.MessageIDsender_platform / sender_username → 使用 InboundMessage.SenderMetadata 仅用于:
reply_to_message_id)BaseChannel.running:使用 atomic.Bool,线程安全Manager.channels / Manager.workers:使用 sync.RWMutex 保护Manager.placeholders / Manager.typingStops / Manager.reactionUndos:使用 sync.MapMessageBus.closed:使用 atomic.BoolFileMediaStore:使用 sync.RWMutex,两阶段操作减少持锁时间已有测试文件:
pkg/channels/base_test.go — BaseChannel 单元测试pkg/channels/manager_test.go — Manager 单元测试pkg/channels/split_test.go — 消息分割测试pkg/channels/errors_test.go — 错误类型测试pkg/channels/errutil_test.go — 错误分类测试为新 channel 添加测试时:
go test ./pkg/channels/matrix/ -v # 子包测试
go test ./pkg/channels/ -run TestSpecific -v # 框架测试
make test # 全量测试
| 文件 | 职责 |
|---|---|
pkg/channels/base.go | BaseChannel 结构体、Channel 接口、MessageLengthProvider、BaseChannelOption、HandleMessage |
pkg/channels/interfaces.go | TypingCapable、MessageEditor、ReactionCapable、PlaceholderCapable、PlaceholderRecorder 接口 |
pkg/channels/media.go | MediaSender 接口 |
pkg/channels/webhook.go | WebhookHandler、HealthChecker 接口 |
pkg/channels/errors.go | ErrNotRunning、ErrRateLimit、ErrTemporary、ErrSendFailed 哨兵 |
pkg/channels/errutil.go | ClassifySendError、ClassifyNetError 帮助函数 |
pkg/channels/registry.go | RegisterFactory、getFactory 工厂注册表 |
pkg/channels/manager.go | Manager:Worker 队列、速率限制、重试、preSend、共享 HTTP、TTL janitor |
pkg/channels/split.go | SplitMessage 长消息分割 |
pkg/bus/bus.go | MessageBus 实现 |
pkg/bus/types.go | Peer、SenderInfo、InboundMessage、OutboundMessage、OutboundMediaMessage、MediaPart |
pkg/media/store.go | MediaStore 接口、FileMediaStore 实现 |
pkg/identity/identity.go | BuildCanonicalID、ParseCanonicalID、MatchAllowed |
| 子包 | 注册名 | 可选接口 |
|---|---|---|
pkg/channels/telegram/ | "telegram" | TypingCapable, PlaceholderCapable, MessageEditor, MediaSender |
pkg/channels/discord/ | "discord" | TypingCapable, PlaceholderCapable, MessageEditor, MediaSender |
pkg/channels/slack/ | "slack" | ReactionCapable, MediaSender |
pkg/channels/line/ | "line" | TypingCapable, MediaSender, WebhookHandler |
pkg/channels/onebot/ | "onebot" | ReactionCapable, MediaSender |
pkg/channels/dingtalk/ | "dingtalk" | — |
pkg/channels/feishu/ | "feishu" | — (架构特定 build tags: feishu_32.go / feishu_64.go) |
pkg/channels/wecom/ | "wecom" | MediaSender |
pkg/channels/qq/ | "qq" | — |
pkg/channels/whatsapp/ | "whatsapp" | — (Bridge 模式) |
pkg/channels/whatsapp_native/ | "whatsapp_native" | — (原生 whatsmeow 模式) |
pkg/channels/maixcam/ | "maixcam" | — |
pkg/channels/pico/ | "pico" | TypingCapable, PlaceholderCapable, MessageEditor, WebhookHandler |
// ===== 必须实现 =====
type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error)
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
ReasoningChannelID() string
}
// ===== 可选实现 =====
type MediaSender interface {
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
}
type TypingCapable interface {
StartTyping(ctx context.Context, chatID string) (stop func(), err error)
}
type ReactionCapable interface {
ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error)
}
type PlaceholderCapable interface {
SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error)
}
type MessageEditor interface {
EditMessage(ctx context.Context, chatID, messageID, content string) error
}
type WebhookHandler interface {
WebhookPath() string
http.Handler
}
type HealthChecker interface {
HealthPath() string
HealthHandler(w http.ResponseWriter, r *http.Request)
}
type MessageLengthProvider interface {
MaxMessageLength() int
}
// ===== 由 Manager 注入 =====
type PlaceholderRecorder interface {
RecordPlaceholder(channel, chatID, placeholderID string)
RecordTypingStop(channel, chatID string, stop func())
RecordReactionUndo(channel, chatID string, undo func())
}
// 1. 创建核心组件
msgBus := bus.NewMessageBus()
provider := providers.CreateProvider(cfg)
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
// 2. 创建媒体存储(带 TTL 清理)
mediaStore := media.NewFileMediaStoreWithCleanup(cleanerConfig)
mediaStore.Start()
// 3. 创建 Channel Manager(触发 initChannels → 工厂查找 → 构造 → 注入 MediaStore/PlaceholderRecorder/Owner)
channelManager := channels.NewManager(cfg, msgBus, mediaStore)
// 4. 注入引用
agentLoop.SetChannelManager(channelManager)
agentLoop.SetMediaStore(mediaStore)
// 5. 配置共享 HTTP 服务器
channelManager.SetupHTTPServer(addr, healthServer)
// 6. 启动
channelManager.StartAll(ctx) // 启动 channels + workers + dispatchers + HTTP server
go agentLoop.Run(ctx) // 启动 Agent 消息循环
// 7. 关闭(信号触发)
cancel() // 取消 context
msgBus.Close() // 信号关闭 + 排水
channelManager.StopAll(shutdownCtx) // 停止 HTTP + workers + channels
mediaStore.Stop() // 停止 TTL 清理
agentLoop.Stop() // 停止 Agent
| Channel | 速率 (msg/s) | Burst |
|---|---|---|
| telegram | 20 | 10 |
| discord | 1 | 1 |
| slack | 1 | 1 |
| line | 10 | 5 |
| 其他 | 10 (默认) | 5 |
媒体清理暂时禁用:Agent loop 中的 ReleaseAll 调用被注释掉了(refactor(loop): disable media cleanup to prevent premature file deletion),因为会话边界尚未明确定义。TTL 清理仍然有效。
Feishu 架构特定编译:Feishu channel 使用 build tags 区分 32 位和 64 位架构(feishu_32.go / feishu_64.go)。Feishu 使用 SDK 的 WebSocket 模式(非 HTTP webhook),因此不实现 WebhookHandler。
WeCom 现在只有一个 channel:"wecom" 采用 WebSocket AI Bot 实现,带路由持久化;访问控制走统一的 channel 白名单机制,不再保留旧的 webhook/app 双分支。
Pico Protocol:pkg/channels/pico/ 实现了一个自定义的 PicoClaw 原生协议 channel,通过 WebSocket webhook (/pico/ws) 接收消息。
WhatsApp 有两种模式:"whatsapp"(Bridge 模式,通过外部 bridge URL 通信)和 "whatsapp_native"(原生 whatsmeow 模式,直接连接 WhatsApp)。Manager 根据 WhatsAppConfig.UseNative 决定初始化哪个。
DingTalk 使用 Stream 模式:DingTalk 使用 SDK 的 Stream/WebSocket 模式(非 HTTP webhook),因此不实现 WebhookHandler。
PlaceholderConfig 的配置与实现:PlaceholderConfig 出现在 6 个 channel config 中(Telegram、Discord、Slack、LINE、OneBot、Pico),但只有实现了 PlaceholderCapable + MessageEditor 的 channel(Telegram、Discord、Pico)能真正使用占位消息编辑功能。其余 channel 的 PlaceholderConfig 为预留字段。
ReasoningChannelID:大多数 channel config 都包含 reasoning_channel_id 字段,用于将 LLM 的思维链(reasoning/thinking)路由到指定 channel(WhatsApp、Telegram、Feishu、Discord、MaixCam、QQ、DingTalk、Slack、LINE、OneBot、WeCom)。注意:PicoConfig 目前不包含该字段。BaseChannel 通过 WithReasoningChannelID 选项和 ReasoningChannelID() 方法暴露此配置。