pkg/channels/README.md
Scope:
pkg/channels/,pkg/bus/,pkg/media/,pkg/identity/,cmd/picoclaw/internal/gateway/
Before Refactor (main branch):
pkg/channels/
├── telegram.go # Each channel directly in the channels package
├── discord.go
├── slack.go
├── manager.go # Manager directly references each channel type
├── ...
pkg/channels/switch or if-else chainsMetadata map[string]stringAfter Refactor (refactor/channel-system branch):
pkg/channels/
├── base.go # BaseChannel shared abstraction layer
├── interfaces.go # Optional capability interfaces (TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder)
├── README.md # English documentation
├── README.zh.md # Chinese documentation
├── media.go # MediaSender optional interface
├── webhook.go # WebhookHandler, HealthChecker optional interfaces
├── errors.go # Sentinel errors (ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed)
├── errutil.go # Error classification helpers
├── registry.go # Factory registry (RegisterFactory / getFactory)
├── manager.go # Unified orchestration: Worker queues, rate limiting, retries, Typing/Placeholder, shared HTTP
├── split.go # Smart long-message splitting (preserves code block integrity)
├── telegram/ # Each channel in its own sub-package
│ ├── init.go # Factory registration
│ ├── telegram.go # Implementation
│ └── telegram_commands.go
├── discord/
│ ├── init.go
│ └── discord.go
├── slack/ line/ onebot/ dingtalk/ feishu/ wecom/ qq/ whatsapp/ whatsapp_native/ maixcam/ pico/
│ └── ...
pkg/bus/
├── bus.go # MessageBus (buffer 64, safe close + drain)
├── types.go # Structured message types (Peer, SenderInfo, MediaPart, InboundMessage, OutboundMessage, OutboundMediaMessage)
pkg/media/
├── store.go # MediaStore interface + FileMediaStore implementation (two-phase release, TTL cleanup)
pkg/identity/
├── identity.go # Unified user identity: canonical "platform:id" format + backward-compatible matching
┌────────────┐ InboundMessage ┌───────────┐ LLM + Tools ┌────────────┐
│ Telegram │──┐ │ │ │ │
│ Discord │──┤ PublishInbound() │ │ PublishOutbound() │ │
│ Slack │──┼──────────────────────▶ │ MessageBus │ ◀─────────────────── │ AgentLoop │
│ LINE │──┤ (buffered chan, 64) │ │ (buffered chan, 64) │ │
│ ... │──┘ │ │ │ │
└────────────┘ └─────┬─────┘ └────────────┘
│
SubscribeOutbound() │ SubscribeOutboundMedia()
▼
┌───────────────────┐
│ Manager │
│ ├── dispatchOutbound() Route to Worker queues
│ ├── dispatchOutboundMedia()
│ ├── runWorker() Message split + sendWithRetry()
│ ├── runMediaWorker() sendMediaWithRetry()
│ ├── preSend() Stop Typing + Undo Reaction + Edit Placeholder
│ └── runTTLJanitor() Clean up expired Typing/Placeholder
└────────┬──────────┘
│
channel.Send() / SendMedia()
│
▼
┌────────────────┐
│ Platform APIs │
└────────────────┘
| Principle | Description |
|---|---|
| Sub-package Isolation | Each channel is a standalone Go sub-package, depending on BaseChannel and interfaces from the channels parent package |
| Factory Registration | Sub-packages self-register via init(), Manager looks up factories by name, eliminating import coupling |
| Capability Discovery | Optional capabilities are declared via interfaces (MediaSender, TypingCapable, ReactionCapable, PlaceholderCapable, MessageEditor, WebhookHandler, HealthChecker), discovered by Manager via runtime type assertions |
| Structured Messages | Peer, MessageID, and SenderInfo promoted from Metadata to first-class fields on InboundMessage |
| Error Classification | Channels return sentinel errors (ErrRateLimit, ErrTemporary, etc.), Manager uses these to determine retry strategy |
| Centralized Orchestration | Rate limiting, message splitting, retries, and Typing/Reaction/Placeholder management are all handled by Manager and BaseChannel; channels only need to implement Send |
On the main branch, channel files were directly in pkg/channels/ top level, e.g.:
pkg/channels/telegram.gopkg/channels/discord.goAfter refactoring, these files have been removed and code moved to corresponding sub-packages:
pkg/channels/telegram/telegram.gopkg/channels/discord/discord.go| main branch file | Refactored branch location | Changes |
|---|---|---|
pkg/channels/telegram.go | pkg/channels/telegram/telegram.go + init.go | Package name changed from channels to telegram |
pkg/channels/discord.go | pkg/channels/discord/discord.go + init.go | Same as above |
pkg/channels/manager.go | pkg/channels/manager.go | Extensively rewritten |
| (did not exist) | pkg/channels/base.go | New shared abstraction layer |
| (did not exist) | pkg/channels/registry.go | New factory registry |
| (did not exist) | pkg/channels/errors.go + errutil.go | New error classification system |
| (did not exist) | pkg/channels/interfaces.go | New optional capability interfaces |
| (did not exist) | pkg/channels/media.go | New MediaSender interface |
| (did not exist) | pkg/channels/webhook.go | New WebhookHandler/HealthChecker |
| (did not exist) | pkg/channels/whatsapp_native/ | New WhatsApp native mode (whatsmeow) |
| (did not exist) | pkg/channels/split.go | New message splitting (migrated from utils) |
| (did not exist) | pkg/bus/types.go | New structured message types |
| (did not exist) | pkg/media/store.go | New media file lifecycle management |
| (did not exist) | pkg/identity/identity.go | New unified user identity |
Using Telegram as an example, the main changes are:
3a. Package declaration and imports
// Old code (main branch)
package channels
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/config"
)
// New code (refactored branch)
package telegram
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels" // Reference parent package
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity" // New
"github.com/sipeed/picoclaw/pkg/media" // New (if media support needed)
)
3b. Struct embeds BaseChannel
// Old code: directly held bus, config, etc. fields
type TelegramChannel struct {
bus *bus.MessageBus
config *config.Config
running bool
allowList []string
// ...
}
// New code: embed BaseChannel, which provides bus, running, allowList, etc.
type TelegramChannel struct {
*channels.BaseChannel // Embed shared abstraction
bot *telego.Bot
config *config.Config
// ... only channel-specific fields
}
3c. Constructor
// Old code: direct assignment
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
return &TelegramChannel{
bus: bus,
config: cfg,
allowList: cfg.Channels.Telegram.AllowFrom,
// ...
}, nil
}
// New code: use NewBaseChannel + functional options
func NewTelegramChannel(cfg *config.Config, bus *bus.MessageBus) (*TelegramChannel, error) {
base := channels.NewBaseChannel(
"telegram", // Name
cfg.Channels.Telegram, // Raw config (any type)
bus, // Message bus
cfg.Channels.Telegram.AllowFrom, // Allow list
channels.WithMaxMessageLength(4096), // Platform message length limit
channels.WithGroupTrigger(cfg.Channels.Telegram.GroupTrigger), // Group trigger config
channels.WithReasoningChannelID(cfg.Channels.Telegram.ReasoningChannelID), // Reasoning chain routing
)
return &TelegramChannel{
BaseChannel: base,
bot: bot,
config: cfg,
}, nil
}
3d. Start/Stop lifecycle
// New code: use SetRunning atomic operation
func (c *TelegramChannel) Start(ctx context.Context) error {
// ... initialize bot, webhook, etc.
c.SetRunning(true) // Must be called after ready
go bh.Start()
return nil
}
func (c *TelegramChannel) Stop(ctx context.Context) error {
c.SetRunning(false) // Must be called before cleanup
// ... stop bot handler, cancel context
return nil
}
3e. Send method error returns
// Old code: returned only error
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) error {
if !c.running { return fmt.Errorf("not running") }
// ...
if err != nil { return err }
}
// New code: return delivered message IDs plus sentinel errors
func (c *TelegramChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning // ← Manager will not retry
}
// ...
if err != nil {
// Use ClassifySendError to wrap error based on HTTP status code
return nil, channels.ClassifySendError(statusCode, err)
// Or manually wrap:
// return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrRateLimit, err)
// return nil, fmt.Errorf("%w: %v", channels.ErrSendFailed, err)
}
return []string{deliveredID}, nil // or return nil, nil if IDs are unavailable
}
3f. Message reception (Inbound)
// Old code: directly construct InboundMessage and publish
msg := bus.InboundMessage{
Channel: "telegram",
SenderID: senderID,
ChatID: chatID,
Content: content,
Metadata: map[string]string{
"peer_kind": "group", // Routing info buried in metadata
"peer_id": chatID,
"message_id": msgID,
},
}
c.bus.PublishInbound(ctx, msg)
// New code: use BaseChannel.HandleMessage with structured fields
sender := bus.SenderInfo{
Platform: "telegram",
PlatformID: strconv.FormatInt(from.ID, 10),
CanonicalID: identity.BuildCanonicalID("telegram", strconv.FormatInt(from.ID, 10)),
Username: from.Username,
DisplayName: from.FirstName,
}
peer := bus.Peer{
Kind: "group", // or "direct"
ID: chatID,
}
// HandleMessage internally calls IsAllowedSender for permission checks, builds MediaScope, and publishes to bus
c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, mediaRefs, metadata, sender)
3g. Add factory registration (required)
Create init.go for your channel:
// pkg/channels/telegram/init.go
package telegram
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory(config.ChannelTelegram, func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.TelegramSettings)
if !ok { return nil, channels.ErrSendFailed }
return NewTelegramChannel(bc, c, b)
})
}
3h. Import sub-package in Gateway
// cmd/picoclaw/internal/gateway/helpers.go
import (
_ "github.com/sipeed/picoclaw/pkg/channels/telegram" // Triggers init() registration
_ "github.com/sipeed/picoclaw/pkg/channels/discord"
_ "github.com/sipeed/picoclaw/pkg/channels/your_new_channel" // New addition
)
If your code directly reads routing fields from InboundMessage.Metadata:
// Old code
peerKind := msg.Metadata["peer_kind"]
peerID := msg.Metadata["peer_id"]
msgID := msg.Metadata["message_id"]
// New code
peerKind := msg.Peer.Kind // First-class field
peerID := msg.Peer.ID // First-class field
msgID := msg.MessageID // First-class field
sender := msg.Sender // bus.SenderInfo struct
scope := msg.MediaScope // Media lifecycle scope
// Old code
if !c.isAllowed(senderID) { return }
// New code: prefer structured check
if !c.IsAllowedSender(sender) { return }
// Or fall back to string check:
if !c.IsAllowed(senderID) { return }
BaseChannel.HandleMessage already handles this logic internally — no need to duplicate the check in your channel.
The Manager has been completely rewritten. Your modifications will need to account for the new architecture:
| Old Manager Responsibility | New Manager Responsibility |
|---|---|
| Directly construct channels (switch/if-else) | Look up and construct via factory registry |
| Directly call channel.Send | Per-channel Worker queues + rate limiting + retries |
| No message splitting | Automatic splitting based on MaxMessageLength |
| Each channel runs its own HTTP server | Unified shared HTTP server |
| No Typing/Placeholder management | Unified preSend handles Typing stop + Reaction undo + Placeholder edit; inbound-side BaseChannel.HandleMessage auto-orchestrates Typing/Reaction/Placeholder |
| No TTL cleanup | runTTLJanitor periodically cleans up expired Typing/Reaction/Placeholder entries |
Main changes to the Agent Loop:
agentLoop.SetMediaStore(mediaStore) — Agent resolves media references produced by tools via MediaStoreagentLoop.SetChannelManager(channelManager) — Agent can query channel statebus.PublishOutboundMedia() instead of embedding them in text repliesmsg.Peer structured fields instead of Metadata lookupsTo add a new chat platform (e.g., matrix), you need to:
pkg/channels/matrix/init.go — factory registrationmatrix.go — channel implementationpkg/config/pkg/channels/matrix/init.gopackage matrix
import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)
func init() {
channels.RegisterFactory(config.ChannelMatrix, func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.MatrixSettings)
if !ok { return nil, channels.ErrSendFailed }
return NewMatrixChannel(bc, c, b)
})
}
pkg/channels/matrix/matrix.gopackage matrix
import (
"context"
"fmt"
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
"github.com/sipeed/picoclaw/pkg/identity"
"github.com/sipeed/picoclaw/pkg/logger"
)
// MatrixChannel implements channels.Channel for the Matrix protocol.
type MatrixChannel struct {
*channels.BaseChannel // Must embed
config *config.Config
ctx context.Context
cancel context.CancelFunc
// ... Matrix SDK client, etc.
}
func NewMatrixChannel(cfg *config.Config, msgBus *bus.MessageBus) (*MatrixChannel, error) {
matrixCfg := cfg.Channels.Matrix // Assumes this field exists in config
base := channels.NewBaseChannel(
"matrix", // Channel name (globally unique)
matrixCfg, // Raw config
msgBus, // Message bus
matrixCfg.AllowFrom, // Allow list
channels.WithMaxMessageLength(65536), // Matrix message length limit
channels.WithGroupTrigger(matrixCfg.GroupTrigger),
channels.WithReasoningChannelID(matrixCfg.ReasoningChannelID), // Reasoning chain routing (optional)
)
return &MatrixChannel{
BaseChannel: base,
config: cfg,
}, nil
}
// ========== Required Channel Interface Methods ==========
func (c *MatrixChannel) Start(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
// 1. Initialize Matrix client
// 2. Start listening for messages
// 3. Mark as running
c.SetRunning(true)
logger.InfoC("matrix", "Matrix channel started")
return nil
}
func (c *MatrixChannel) Stop(ctx context.Context) error {
c.SetRunning(false)
if c.cancel != nil {
c.cancel()
}
logger.InfoC("matrix", "Matrix channel stopped")
return nil
}
func (c *MatrixChannel) Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error) {
// 1. Check running state
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
// 2. Send message to Matrix
eventID, err := c.sendToMatrix(ctx, msg.ChatID, msg.Content)
if err != nil {
// 3. Must use error classification wrapping
// If you have an HTTP status code:
// return nil, channels.ClassifySendError(statusCode, err)
// If it's a network error:
// return nil, channels.ClassifyNetError(err)
// If manual classification is needed:
return nil, fmt.Errorf("%w: %v", channels.ErrTemporary, err)
}
return []string{eventID}, nil
}
// ========== Incoming Message Handling ==========
func (c *MatrixChannel) handleIncoming(roomID, senderID, displayName, content string, msgID string) {
// 1. Construct structured sender identity
sender := bus.SenderInfo{
Platform: "matrix",
PlatformID: senderID,
CanonicalID: identity.BuildCanonicalID("matrix", senderID),
Username: senderID,
DisplayName: displayName,
}
// 2. Determine Peer type (direct vs group)
peer := bus.Peer{
Kind: "group", // or "direct"
ID: roomID,
}
// 3. Group chat filtering (if applicable)
isGroup := peer.Kind == "group"
if isGroup {
isMentioned := false // Detect @mentions based on platform specifics
shouldRespond, cleanContent := c.ShouldRespondInGroup(isMentioned, content)
if !shouldRespond {
return
}
content = cleanContent
}
// 4. Handle media attachments (if any)
var mediaRefs []string
store := c.GetMediaStore()
if store != nil {
// Download attachment locally → store.Store() → get ref
// mediaRefs = append(mediaRefs, ref)
}
// 5. Call HandleMessage to publish to bus
// HandleMessage internally will:
// - Check IsAllowedSender/IsAllowed
// - Build MediaScope
// - Publish InboundMessage
c.HandleMessage(
c.ctx,
peer,
msgID, // Platform message ID
senderID, // Raw sender ID
roomID, // Chat/room ID
content, // Message content
mediaRefs, // Media reference list
nil, // Extra metadata (usually nil)
sender, // SenderInfo (variadic parameter)
)
}
// ========== Internal Methods ==========
func (c *MatrixChannel) sendToMatrix(ctx context.Context, roomID, content string) (string, error) {
// Actual Matrix SDK call
return "event-id", nil
}
Depending on platform capabilities, your channel can optionally implement the following interfaces:
// If the platform supports sending images/files/audio/video
func (c *MatrixChannel) SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error) {
if !c.IsRunning() {
return nil, channels.ErrNotRunning
}
store := c.GetMediaStore()
if store == nil {
return nil, fmt.Errorf("no media store: %w", channels.ErrSendFailed)
}
var messageIDs []string
for _, part := range msg.Parts {
localPath, err := store.Resolve(part.Ref)
if err != nil {
logger.ErrorCF("matrix", "Failed to resolve media", map[string]any{
"ref": part.Ref, "error": err.Error(),
})
continue
}
// Call the appropriate API based on part.Type ("image"|"audio"|"video"|"file")
switch part.Type {
case "image":
// Upload image to Matrix
default:
// Upload file to Matrix
}
// Append platform IDs here when the API returns them.
// messageIDs = append(messageIDs, uploadedMessageID)
}
return messageIDs, nil
}
// If the platform supports "typing..." indicators
func (c *MatrixChannel) StartTyping(ctx context.Context, chatID string) (stop func(), err error) {
// Call Matrix API to send typing indicator
// The returned stop function must be idempotent
stopped := false
return func() {
if !stopped {
stopped = true
// Call Matrix API to stop typing
}
}, nil
}
// If the platform supports adding emoji reactions to inbound messages (e.g., Slack's 👀, OneBot's emoji 289)
func (c *MatrixChannel) ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error) {
// Call Matrix API to add reaction to message
// The returned undo function removes the reaction, must be idempotent
err = c.addReaction(chatID, messageID, "eyes")
if err != nil {
return func() {}, err
}
return func() {
c.removeReaction(chatID, messageID, "eyes")
}, nil
}
// If the platform supports editing sent messages (used for Placeholder replacement)
func (c *MatrixChannel) EditMessage(ctx context.Context, chatID, messageID, content string) error {
// Call Matrix API to edit message
return nil
}
// If the platform supports sending placeholder messages (e.g. "Thinking... 💭"),
// and the channel also implements MessageEditor, then Manager's preSend will
// automatically edit the placeholder into the final response on outbound.
// SendPlaceholder checks PlaceholderConfig.Enabled internally;
// returning ("", nil) means skip.
func (c *MatrixChannel) SendPlaceholder(ctx context.Context, chatID string) (string, error) {
cfg := c.config.Channels.Matrix.Placeholder
if !cfg.Enabled {
return "", nil
}
text := cfg.Text
if text == "" {
text = "Thinking... 💭"
}
// Call Matrix API to send placeholder message
msg, err := c.sendText(ctx, chatID, text)
if err != nil {
return "", err
}
return msg.ID, nil
}
// If the channel receives messages via webhook (rather than long-polling/WebSocket)
func (c *MatrixChannel) WebhookPath() string {
return "/webhook/matrix" // Path will be registered on the shared HTTP server
}
func (c *MatrixChannel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Handle webhook request
}
func (c *MatrixChannel) HealthPath() string {
return "/health/matrix"
}
func (c *MatrixChannel) HealthHandler(w http.ResponseWriter, r *http.Request) {
if c.IsRunning() {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
}
BaseChannel.HandleMessage automatically detects whether the channel implements TypingCapable, ReactionCapable, and/or PlaceholderCapable before publishing the inbound message, and triggers the corresponding indicators. The three pipelines are completely independent and do not interfere with each other:
// Automatically executed inside BaseChannel.HandleMessage (no manual calls needed):
if c.owner != nil && c.placeholderRecorder != nil {
// Typing — independent pipeline
if tc, ok := c.owner.(TypingCapable); ok {
if stop, err := tc.StartTyping(ctx, chatID); err == nil {
c.placeholderRecorder.RecordTypingStop(c.name, chatID, stop)
}
}
// Reaction — independent pipeline
if rc, ok := c.owner.(ReactionCapable); ok && messageID != "" {
if undo, err := rc.ReactToMessage(ctx, chatID, messageID); err == nil {
c.placeholderRecorder.RecordReactionUndo(c.name, chatID, undo)
}
}
// Placeholder — independent pipeline
if pc, ok := c.owner.(PlaceholderCapable); ok {
if phID, err := pc.SendPlaceholder(ctx, chatID); err == nil && phID != "" {
c.placeholderRecorder.RecordPlaceholder(c.name, chatID, phID)
}
}
}
This means:
TypingCapable (Telegram, Discord, LINE, Pico) do not need to manually call StartTyping + RecordTypingStop in handleMessageReactionCapable (Slack, OneBot) do not need to manually call AddReaction + RecordTypingStop in handleMessagePlaceholderCapable (Telegram, Discord, Pico) do not need to manually send placeholder messages and call RecordPlaceholder in handleMessageHandleMessage handles orchestration automaticallyPlaceholderCapable's SendPlaceholder method internally decides whether to send based on the configured PlaceholderConfig.Enabled; returning ("", nil) skips registrationOwner Injection: Manager automatically calls SetOwner(ch) in initChannel to inject the concrete channel into BaseChannel — no manual setup required from developers.
When the Agent finishes processing a message, Manager's preSend automatically:
stop() to stop Typingundo() to undo ReactionMessageEditor, attempts to edit the Placeholder with the final reply (skipping Send)Channels now use a unified map-based configuration (map[string]*config.Channel).
Each channel entry stores common fields (enabled, type, allow_from, etc.) at
the top level, with channel-specific settings in the settings sub-key:
{
"channels": {
"matrix": {
"enabled": true,
"type": "matrix",
"allow_from": ["@user:example.com"],
"settings": {
"home_server": "https://matrix.org",
"user_id": "@bot:example.com",
"access_token": "enc://..."
}
}
}
}
Secure fields (tokens, passwords, API keys) go into .security.yml:
channels:
matrix:
access_token: "your-matrix-access-token"
Channel types must be registered in channelSettingsFactory in
pkg/config/config_channel.go:
var channelSettingsFactory = map[string]any{
// ... existing channels
ChannelMatrix: (MatrixSettings{}),
}
The Manager uses InitChannelList() to validate types and decode settings,
then looks up factories by bc.Type. No per-channel entry needed in Manager —
just register the factory and the config entry.
Note: If your channel has multiple modes (like WhatsApp Bridge vs Native), register both types in
channelSettingsFactoryand branch on config:go// In config_channel.go: ChannelWhatsApp: (WhatsAppSettings{}), ChannelWhatsAppNative: (WhatsAppSettings{}),
// cmd/picoclaw/internal/gateway/helpers.go
import (
_ "github.com/sipeed/picoclaw/pkg/channels/matrix"
)
Files: pkg/bus/bus.go, pkg/bus/types.go
type MessageBus struct {
inbound chan InboundMessage // buffer = 64
outbound chan OutboundMessage // buffer = 64
outboundMedia chan OutboundMediaMessage // buffer = 64
done chan struct{} // Close signal
closed atomic.Bool // Prevents double-close
}
Key Behaviors:
| Method | Behavior |
|---|---|
PublishInbound(ctx, msg) | Check closed → send to inbound channel → block/timeout/close |
ConsumeInbound(ctx) | Read from inbound → block/close/cancel |
PublishOutbound(ctx, msg) | Send to outbound channel |
SubscribeOutbound(ctx) | Read from outbound (called by Manager dispatcher) |
PublishOutboundMedia(ctx, msg) | Send to outboundMedia channel |
SubscribeOutboundMedia(ctx) | Read from outboundMedia (called by Manager media dispatcher) |
Close() | CAS close → close(done) → drain all channels (does not close the channels themselves to avoid concurrent send-on-closed panic) |
Design Notes:
Close() does not close the underlying channels (only closes the done signal channel), because there may be concurrent Publish goroutinesFile: pkg/bus/types.go
// Routing peer
type Peer struct {
Kind string `json:"kind"` // "direct" | "group" | "channel" | ""
ID string `json:"id"`
}
// Sender identity information
type SenderInfo struct {
Platform string `json:"platform,omitempty"` // "telegram", "discord", ...
PlatformID string `json:"platform_id,omitempty"` // Platform-native ID
CanonicalID string `json:"canonical_id,omitempty"` // "platform:id" canonical format
Username string `json:"username,omitempty"`
DisplayName string `json:"display_name,omitempty"`
}
// Inbound message
type InboundMessage struct {
Channel string // Source channel name
SenderID string // Sender ID (prefer CanonicalID)
Sender SenderInfo // Structured sender info
ChatID string // Chat/room ID
Content string // Message text
Media []string // Media reference list (media://...)
Peer Peer // Routing peer (first-class field)
MessageID string // Platform message ID (first-class field)
MediaScope string // Media lifecycle scope
SessionKey string // Session key
Metadata map[string]string // Only for channel-specific extensions
}
// Outbound text message
type OutboundMessage struct {
Channel string
ChatID string
Content string
}
// Outbound media message
type OutboundMediaMessage struct {
Channel string
ChatID string
Parts []MediaPart
}
// Media part
type MediaPart struct {
Type string // "image" | "audio" | "video" | "file"
Ref string // "media://uuid"
Caption string
Filename string
ContentType string
}
File: pkg/channels/base.go
BaseChannel is the shared abstraction layer for all channels, providing the following capabilities:
| Method/Feature | Description |
|---|---|
Name() string | Channel name |
IsRunning() bool | Atomically read running state |
SetRunning(bool) | Atomically set running state |
MaxMessageLength() int | Message length limit (rune count), 0 = unlimited |
ReasoningChannelID() string | Reasoning chain routing target channel ID (empty = no routing) |
IsAllowed(senderID string) bool | Legacy allow-list check (supports "id|username" and "@username" formats) |
IsAllowedSender(sender SenderInfo) bool | New allow-list check (delegates to identity.MatchAllowed) |
ShouldRespondInGroup(isMentioned, content) (bool, string) | Unified group chat trigger filtering logic |
HandleMessage(...) | Unified inbound message handling: permission check → build MediaScope → auto-trigger Typing/Reaction/Placeholder → publish to Bus |
SetMediaStore(s) / GetMediaStore() | MediaStore injected by Manager |
SetPlaceholderRecorder(r) / GetPlaceholderRecorder() | PlaceholderRecorder injected by Manager |
SetOwner(ch) | Concrete channel reference injected by Manager (used for Typing/Reaction/Placeholder type assertions in HandleMessage) |
Functional Options:
channels.WithMaxMessageLength(4096) // Set platform message length limit
channels.WithGroupTrigger(groupTriggerCfg) // Set group trigger configuration
channels.WithReasoningChannelID(id) // Set reasoning chain routing target channel
File: pkg/channels/registry.go
type ChannelFactory func(channelName, channelType string, cfg *config.Config, bus *bus.MessageBus) (Channel, error)
func RegisterFactory(name string, f ChannelFactory) // Called in sub-package init()
func getFactory(name string) (ChannelFactory, bool) // Called internally by Manager
func GetRegisteredFactoryNames() []string // Returns all registered factory names
For convenience, RegisterSafeFactory[S any] provides automatic type-safe settings decoding:
// Instead of manual GetDecoded() + type assertion:
channels.RegisterFactory(config.ChannelTelegram,
func(channelName, channelType string, cfg *config.Config, b *bus.MessageBus) (Channel, error) {
bc := cfg.Channels[channelName]
decoded, err := bc.GetDecoded()
if err != nil { return nil, err }
c, ok := decoded.(*config.TelegramSettings)
if !ok { return nil, ErrSendFailed }
return NewTelegramChannel(bc, c, b)
})
// You can use RegisterSafeFactory (same safety, less boilerplate):
channels.RegisterSafeFactory(config.ChannelTelegram, NewTelegramChannel)
The factory registry is protected by sync.RWMutex and registrations occur during init() phase (completed at process startup). Manager looks up factories by name in initChannel() and calls them.
Files: pkg/channels/errors.go, pkg/channels/errutil.go
var (
ErrNotRunning = errors.New("channel not running") // Permanent: do not retry
ErrRateLimit = errors.New("rate limited") // Fixed delay: retry after 1s
ErrTemporary = errors.New("temporary failure") // Exponential backoff: 500ms * 2^attempt, max 8s
ErrSendFailed = errors.New("send failed") // Permanent: do not retry
)
// Automatically classify based on HTTP status code
func ClassifySendError(statusCode int, rawErr error) error {
// 429 → ErrRateLimit
// 5xx → ErrTemporary
// 4xx → ErrSendFailed
}
// Wrap network errors as temporary
func ClassifyNetError(err error) error {
// → ErrTemporary
}
sendWithRetry)Max retries: 3
Rate limit delay: 1 second
Base backoff: 500 milliseconds
Max backoff: 8 seconds
Retry logic:
ErrNotRunning → Fail immediately, no retry
ErrSendFailed → Fail immediately, no retry
ErrRateLimit → Wait 1s → retry
ErrTemporary → Wait 500ms * 2^attempt (max 8s) → retry
Other unknown → Wait 500ms * 2^attempt (max 8s) → retry
File: pkg/channels/manager.go
type channelWorker struct {
ch Channel // Channel instance
queue chan bus.OutboundMessage // Outbound text queue (buffered 16)
mediaQueue chan bus.OutboundMediaMessage // Outbound media queue (buffered 16)
done chan struct{} // Text worker completion signal
mediaDone chan struct{} // Media worker completion signal
limiter *rate.Limiter // Per-channel rate limiter
}
var channelRateConfig = map[string]float64{
"telegram": 20, // 20 msg/s
"discord": 1, // 1 msg/s
"slack": 1, // 1 msg/s
"line": 10, // 10 msg/s
}
// Default: 10 msg/s
// burst = max(1, ceil(rate/2))
StartAll:
1. Iterate registered channels → channel.Start(ctx)
2. Create channelWorker for each successfully started channel
3. Start goroutines:
- runWorker (per-channel outbound text)
- runMediaWorker (per-channel outbound media)
- dispatchOutbound (route from bus to worker queues)
- dispatchOutboundMedia (route from bus to media worker queues)
- runTTLJanitor (every 10s clean up expired typing/reaction/placeholder)
4. Start shared HTTP server (if configured)
StopAll:
1. Shut down shared HTTP server (5s timeout)
2. Cancel dispatcher context
3. Close text worker queues → wait for drain to complete
4. Close media worker queues → wait for drain to complete
5. Stop each channel (channel.Stop)
// Manager implements PlaceholderRecorder interface
func (m *Manager) RecordPlaceholder(channel, chatID, placeholderID string)
func (m *Manager) RecordTypingStop(channel, chatID string, stop func())
func (m *Manager) RecordReactionUndo(channel, chatID string, undo func())
// Inbound side: BaseChannel.HandleMessage auto-orchestrates
// BaseChannel.HandleMessage, before PublishInbound, auto-triggers via owner type assertions:
// - TypingCapable.StartTyping → RecordTypingStop
// - ReactionCapable.ReactToMessage → RecordReactionUndo
// - PlaceholderCapable.SendPlaceholder → RecordPlaceholder
// All three are independent and do not interfere with each other. Channels don't need to call these manually.
// Outbound side: pre-send processing
func (m *Manager) preSend(ctx, name, msg, ch) bool {
key := name + ":" + msg.ChatID
// 1. Stop Typing (call stored stop function)
// 2. Undo Reaction (call stored undo function)
// 3. Attempt to edit Placeholder (if channel implements MessageEditor)
// Success → return true (skip Send)
// Failure → return false (proceed with Send)
}
Manager storage is fully separated; three pipelines do not interfere:
Manager {
typingStops sync.Map // "channel:chatID" → typingEntry ← manages TypingCapable
reactionUndos sync.Map // "channel:chatID" → reactionEntry ← manages ReactionCapable
placeholders sync.Map // "channel:chatID" → placeholderEntry
}
TTL Cleanup:
File: pkg/channels/split.go
SplitMessage(content string, maxLen int) []string
Smart splitting strategy:
```)\n```\n + header)File: pkg/media/store.go
type MediaStore interface {
Store(localPath string, meta MediaMeta, scope string) (ref string, err error)
Resolve(ref string) (localPath string, err error)
ResolveWithMeta(ref string) (localPath string, meta MediaMeta, err error)
ReleaseAll(scope string) error
}
FileMediaStore Implementation:
media://<uuid>channel:chatID:messageID (generated by BuildMediaScope)NewFileMediaStoreWithCleanup → Start() launches background cleanup goroutineFile: pkg/identity/identity.go
// Build canonical ID
func BuildCanonicalID(platform, platformID string) string
// → "telegram:123456"
// Parse canonical ID
func ParseCanonicalID(canonical string) (platform, id string, ok bool)
// Match against allow list (backward-compatible)
func MatchAllowed(sender bus.SenderInfo, allowed string) bool
MatchAllowed supported allow-list formats:
| Format | Matching |
|---|---|
"123456" | Matches sender.PlatformID |
"@alice" | Matches sender.Username |
"123456|alice" | Matches PlatformID or Username (legacy format compatibility) |
"telegram:123456" | Exact match on sender.CanonicalID (new format) |
File: pkg/channels/manager.go's SetupHTTPServer
Manager creates a single http.Server and auto-discovers and registers:
WebhookHandler → mounted at wh.WebhookPath()HealthChecker → mounted at hc.HealthPath()health.Server.RegisterOnMuxTimeout configuration: ReadTimeout = 30s, WriteTimeout = 30s
Error classification is a contract: A channel's Send method must return sentinel errors (or wrap them). Manager's retry strategy relies entirely on errors.Is checks. Returning unclassified errors will cause Manager to treat them as "unknown errors" (exponential backoff retry).
SetRunning is a lifecycle signal: Must call c.SetRunning(true) after successful Start, and must call c.SetRunning(false) at the beginning of Stop. Must check c.IsRunning() in Send and return ErrNotRunning.
HandleMessage includes permission checks: Do not perform your own permission checks before calling HandleMessage (unless you need platform-specific preprocessing before the check). HandleMessage already calls IsAllowedSender/IsAllowed internally.
Message splitting is handled by Manager: A channel's Send method does not need to handle long message splitting. Manager automatically splits based on MaxMessageLength() before calling Send. Channels only need to declare the limit via WithMaxMessageLength.
Typing/Reaction/Placeholder is handled by BaseChannel + Manager automatically: A channel's Send method does not need to manage Typing stop, Reaction undo, or Placeholder editing. BaseChannel.HandleMessage auto-triggers TypingCapable, ReactionCapable, and PlaceholderCapable on the inbound side (via owner type assertions); Manager's preSend auto-stops Typing, undoes Reaction, and edits Placeholder on the outbound side. Channels only need to implement the corresponding interfaces.
Factory registration belongs in init(): Each sub-package must have an init.go file calling channels.RegisterFactory. Gateway must trigger registration via blank imports (_ "pkg/channels/xxx").
Do NOT put the following information in Metadata anymore:
peer_kind / peer_id → Use InboundMessage.Peermessage_id → Use InboundMessage.MessageIDsender_platform / sender_username → Use InboundMessage.SenderMetadata should only be used for:
reply_to_message_id)BaseChannel.running: Uses atomic.Bool, thread-safeManager.channels / Manager.workers: Protected by sync.RWMutexManager.placeholders / Manager.typingStops / Manager.reactionUndos: Uses sync.MapMessageBus.closed: Uses atomic.BoolFileMediaStore: Uses sync.RWMutex, two-phase operation to minimize lock-hold timeExisting test files:
pkg/channels/base_test.go — BaseChannel unit testspkg/channels/manager_test.go — Manager unit testspkg/channels/split_test.go — Message splitting testspkg/channels/errors_test.go — Error type testspkg/channels/errutil_test.go — Error classification testsTo add tests for a new channel:
go test ./pkg/channels/matrix/ -v # Sub-package tests
go test ./pkg/channels/ -run TestSpecific -v # Framework tests
make test # Full test suite
| File | Responsibility |
|---|---|
pkg/channels/base.go | BaseChannel struct, Channel interface, MessageLengthProvider, BaseChannelOption, HandleMessage |
pkg/channels/interfaces.go | TypingCapable, MessageEditor, ReactionCapable, PlaceholderCapable, PlaceholderRecorder interfaces |
pkg/channels/media.go | MediaSender interface |
pkg/channels/webhook.go | WebhookHandler, HealthChecker interfaces |
pkg/channels/errors.go | ErrNotRunning, ErrRateLimit, ErrTemporary, ErrSendFailed sentinels |
pkg/channels/errutil.go | ClassifySendError, ClassifyNetError helpers |
pkg/channels/registry.go | RegisterFactory, getFactory factory registry |
pkg/channels/manager.go | Manager: Worker queues, rate limiting, retries, preSend, shared HTTP, TTL janitor |
pkg/channels/split.go | SplitMessage long-message splitting |
pkg/bus/bus.go | MessageBus implementation |
pkg/bus/types.go | Peer, SenderInfo, InboundMessage, OutboundMessage, OutboundMediaMessage, MediaPart |
pkg/media/store.go | MediaStore interface, FileMediaStore implementation |
pkg/identity/identity.go | BuildCanonicalID, ParseCanonicalID, MatchAllowed |
| Sub-package | Registered Name | Optional Interfaces |
|---|---|---|
pkg/channels/telegram/ | "telegram" | TypingCapable, PlaceholderCapable, MessageEditor, MediaSender |
pkg/channels/discord/ | "discord" | TypingCapable, PlaceholderCapable, MessageEditor, MediaSender |
pkg/channels/slack/ | "slack" | ReactionCapable, MediaSender |
pkg/channels/line/ | "line" | TypingCapable, MediaSender, WebhookHandler |
pkg/channels/onebot/ | "onebot" | ReactionCapable, MediaSender |
pkg/channels/dingtalk/ | "dingtalk" | — |
pkg/channels/feishu/ | "feishu" | — (architecture-specific build tags: feishu_32.go / feishu_64.go) |
pkg/channels/wecom/ | "wecom" | MediaSender |
pkg/channels/qq/ | "qq" | — |
pkg/channels/whatsapp/ | "whatsapp" | — (Bridge mode) |
pkg/channels/whatsapp_native/ | "whatsapp_native" | — (Native whatsmeow mode) |
pkg/channels/maixcam/ | "maixcam" | — |
pkg/channels/pico/ | "pico" | TypingCapable, PlaceholderCapable, MessageEditor, WebhookHandler |
// ===== Required =====
type Channel interface {
Name() string
Start(ctx context.Context) error
Stop(ctx context.Context) error
Send(ctx context.Context, msg bus.OutboundMessage) ([]string, error)
IsRunning() bool
IsAllowed(senderID string) bool
IsAllowedSender(sender bus.SenderInfo) bool
ReasoningChannelID() string
}
// ===== Optional =====
type MediaSender interface {
SendMedia(ctx context.Context, msg bus.OutboundMediaMessage) ([]string, error)
}
type TypingCapable interface {
StartTyping(ctx context.Context, chatID string) (stop func(), err error)
}
type ReactionCapable interface {
ReactToMessage(ctx context.Context, chatID, messageID string) (undo func(), err error)
}
type PlaceholderCapable interface {
SendPlaceholder(ctx context.Context, chatID string) (messageID string, err error)
}
type MessageEditor interface {
EditMessage(ctx context.Context, chatID, messageID, content string) error
}
type WebhookHandler interface {
WebhookPath() string
http.Handler
}
type HealthChecker interface {
HealthPath() string
HealthHandler(w http.ResponseWriter, r *http.Request)
}
type MessageLengthProvider interface {
MaxMessageLength() int
}
// ===== Injected by Manager =====
type PlaceholderRecorder interface {
RecordPlaceholder(channel, chatID, placeholderID string)
RecordTypingStop(channel, chatID string, stop func())
RecordReactionUndo(channel, chatID string, undo func())
}
// 1. Create core components
msgBus := bus.NewMessageBus()
provider := providers.CreateProvider(cfg)
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
// 2. Create media store (with TTL cleanup)
mediaStore := media.NewFileMediaStoreWithCleanup(cleanerConfig)
mediaStore.Start()
// 3. Create Channel Manager (triggers initChannels → factory lookup → construct → inject MediaStore/PlaceholderRecorder/Owner)
channelManager := channels.NewManager(cfg, msgBus, mediaStore)
// 4. Inject references
agentLoop.SetChannelManager(channelManager)
agentLoop.SetMediaStore(mediaStore)
// 5. Configure shared HTTP server
channelManager.SetupHTTPServer(addr, healthServer)
// 6. Start
channelManager.StartAll(ctx) // Start channels + workers + dispatchers + HTTP server
go agentLoop.Run(ctx) // Start Agent message loop
// 7. Shutdown (signal-triggered)
cancel() // Cancel context
msgBus.Close() // Signal close + drain
channelManager.StopAll(shutdownCtx) // Stop HTTP + workers + channels
mediaStore.Stop() // Stop TTL cleanup
agentLoop.Stop() // Stop Agent
| Channel | Rate (msg/s) | Burst |
|---|---|---|
| telegram | 20 | 10 |
| discord | 1 | 1 |
| slack | 1 | 1 |
| line | 10 | 5 |
| others | 10 (default) | 5 |
Media cleanup temporarily disabled: The ReleaseAll call in the Agent loop is commented out (refactor(loop): disable media cleanup to prevent premature file deletion) because session boundaries are not yet clearly defined. TTL cleanup remains active.
Feishu architecture-specific compilation: The Feishu channel uses build tags to distinguish 32-bit and 64-bit architectures (feishu_32.go / feishu_64.go). Feishu uses the SDK's WebSocket mode (not HTTP webhook), so it does not implement WebhookHandler.
WeCom is now a single channel: "wecom" is implemented as a WebSocket-based AI Bot channel with route persistence. Access control uses the shared channel allowlist mechanism. It no longer exposes the legacy webhook/app split.
Pico Protocol: pkg/channels/pico/ implements a custom PicoClaw native protocol channel that receives messages via WebSocket webhook (/pico/ws).
WhatsApp has two modes: "whatsapp" (Bridge mode, communicates via external bridge URL) and "whatsapp_native" (native whatsmeow mode, connects directly to WhatsApp). Manager selects which to initialize based on WhatsAppConfig.UseNative.
DingTalk uses Stream mode: DingTalk uses the SDK's Stream/WebSocket mode (not HTTP webhook), so it does not implement WebhookHandler.
PlaceholderConfig vs implementation: PlaceholderConfig appears in 6 channel configs (Telegram, Discord, Slack, LINE, OneBot, Pico), but only channels that implement both PlaceholderCapable + MessageEditor (Telegram, Discord, Pico) can actually use placeholder message editing. The rest are reserved fields.
ReasoningChannelID: Most channel configs include a reasoning_channel_id field to route LLM reasoning/thinking output to a designated channel (WhatsApp, Telegram, Feishu, Discord, MaixCam, QQ, DingTalk, Slack, LINE, OneBot, WeCom). Note: PicoConfig does not currently expose this field. BaseChannel exposes this via the WithReasoningChannelID option and ReasoningChannelID() method.