docs/nats-eventbus-integration-guide-en.md
This document provides a comprehensive guide for integrating NATS as an EventBus in Coze Studio, including architecture design, implementation details, configuration instructions, and usage guidelines.
In Coze Studio's architecture, EventBus plays a critical role in asynchronous message delivery, including workflow execution, Agent communication, data processing pipelines, and other core functions. NATS, as a lightweight and high-performance messaging system, brings the following core advantages to Coze Studio:
| Feature | NATS | NSQ | Kafka | RocketMQ | Pulsar |
|---|---|---|---|---|---|
| Deployment Complexity | Very Low | Low | Medium | Medium | Medium |
| Performance | Very High | Medium | High | High | High |
| Resource Usage | Very Low | Low | Medium | Medium | Medium |
| Message Persistence | JetStream | Limited | Strong | Strong | Strong |
| Message Ordering | Supported | Weak | Strong | Strong | Strong |
| Horizontal Scaling | Good | Medium | Good | Good | Excellent |
| Operational Complexity | Very Low | Low | High | Medium | Medium |
| Cloud Native Support | Excellent | Medium | Medium | Medium | Good |
Lightweight and High Performance:
Simplicity:
Cloud Native Features:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Coze Studio │ │ NATS Server │ │ JetStream │
│ Application │ │ │ │ Storage │
├─────────────────┤ ├─────────────────┤ ├─────────────────┤
│ Producer │───▶│ Core NATS │ │ Streams │
│ Consumer │◀───│ JetStream │◀───│ Consumers │
│ EventBus │ │ Clustering │ │ Key-Value │
└─────────────────┘ └─────────────────┘ └─────────────────┘
NATS supports two messaging modes in Coze Studio:
Core NATS: For real-time, lightweight message delivery
JetStream: For messages requiring persistence and high reliability
The Producer is responsible for sending messages to NATS, supporting the following features:
type Producer struct {
nc nats.Conn
js nats.JetStreamContext
closed bool
mu sync.RWMutex
}
func (p *Producer) SendMessage(ctx context.Context, topic string, message []byte) error {
// Supports both Core NATS and JetStream modes
if p.js != nil {
// JetStream mode: supports message persistence
_, err := p.js.Publish(topic, message)
return err
} else {
// Core NATS mode: lightweight publishing
return p.nc.Publish(topic, message)
}
}
The Consumer is responsible for receiving and processing messages from NATS:
func (c *Consumer) RegisterConsumer(serverURL, topic, group string, handler ConsumerHandler) error {
// Choose JetStream or Core NATS based on configuration
if c.useJetStream {
return c.startJetStreamConsumer(ctx, topic, group, handler)
} else {
return c.startCoreConsumer(ctx, topic, group, handler)
}
}
Add the following NATS-related configurations in docker/.env.example:
# Backend Event Bus
export COZE_MQ_TYPE="nats" # Set message queue type to NATS
export MQ_NAME_SERVER="nats:4222" # NATS server address
# NATS specific configuration
# NATS_SERVER_URL: NATS server connection URL, supports nats:// and tls:// protocols
# For cluster setup, use comma-separated URLs: "nats://nats1:4222,nats://nats2:4222"
# For TLS connection: "tls://nats:4222"
export NATS_SERVER_URL="nats://nats:4222"
# NATS_JWT_TOKEN: JWT token for NATS authentication (leave empty for no auth)
export NATS_JWT_TOKEN=""
# NATS_NKEY_SEED: Path to NATS seed file for NKey authentication (optional)
export NATS_NKEY_SEED=""
# NATS_USERNAME: Username for NATS authentication (optional)
export NATS_USERNAME=""
# NATS_PASSWORD: Password for NATS authentication (optional)
export NATS_PASSWORD=""
# NATS_TOKEN: Token for NATS authentication (optional)
export NATS_TOKEN=""
# NATS_STREAM_REPLICAS: Number of replicas for JetStream streams (default: 1)
export NATS_STREAM_REPLICAS="1"
# NATS_USE_JETSTREAM: Enable JetStream mode for message persistence and reliability (default: false)
export NATS_USE_JETSTREAM="true"
NATS service configuration in docker-compose.yml:
nats:
image: nats:2.10.24-alpine
container_name: nats
restart: unless-stopped
command:
- "--jetstream" # Enable JetStream
- "--store_dir=/data" # Data storage directory
- "--max_memory_store=1GB" # Memory storage limit
- "--max_file_store=10GB" # File storage limit
ports:
- "4222:4222" # Client connection port
- "8222:8222" # HTTP monitoring port
- "6222:6222" # Cluster communication port
volumes:
- ./volumes/nats:/data
networks:
- coze-network
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8222/"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
Configure NATS in Coze Studio application through environment variables:
// Read configuration from environment variables
mqType := os.Getenv("COZE_MQ_TYPE")
natsURL := os.Getenv("NATS_SERVER_URL")
jwtToken := os.Getenv("NATS_JWT_TOKEN")
seedFile := os.Getenv("NATS_NKEY_SEED")
streamReplicas := os.Getenv("NATS_STREAM_REPLICAS")
// Create NATS EventBus
if mqType == "nats" {
config := &nats.Config{
ServerURL: natsURL,
JWTToken: jwtToken,
SeedFile: seedFile,
StreamReplicas: streamReplicas,
}
eventBus, err := nats.NewProducer(config)
if err != nil {
log.Fatal("Failed to create NATS producer:", err)
}
}
Configure Environment Variables:
cp docker/.env.example docker/.env
# Edit .env file, set COZE_MQ_TYPE="nats"
Start Services:
cd docker
docker-compose up -d nats
Verify Deployment:
# Check NATS service status
docker-compose ps nats
# View NATS monitoring interface
curl http://localhost:8222/varz
Deploy NATS using the official Helm Chart:
# Add NATS Helm repository
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
# Install NATS
helm install nats nats/nats --set nats.jetstream.enabled=true
For production environments, the following configuration optimizations are recommended:
Cluster Deployment:
nats:
cluster:
enabled: true
replicas: 3
Persistent Storage:
nats:
jetstream:
fileStore:
pvc:
size: 100Gi
storageClassName: fast-ssd
Resource Limits:
nats:
resources:
limits:
cpu: 2000m
memory: 4Gi
requests:
cpu: 500m
memory: 1Gi
Security Configuration:
nats:
auth:
enabled: true
token: "your-secure-token"
tls:
enabled: true
NATS provides rich monitoring metrics accessible through HTTP endpoints:
GET /varzGET /connzGET /subszGET /jszPerformance Metrics:
Resource Metrics:
JetStream Metrics:
NATS supports multiple log levels and output formats:
# Enable debug logging
nats-server --debug
# Log output to file
nats-server --log /var/log/nats.log
# JSON format logging
nats-server --logtime --log_size_limit 100MB
// Configure connection options
opts := []nats.Option{
nats.MaxReconnects(10),
nats.ReconnectWait(2 * time.Second),
nats.Timeout(5 * time.Second),
}
nc, err := nats.Connect(serverURL, opts...)
// Configure JetStream options
jsOpts := []nats.JSOpt{
nats.PublishAsyncMaxPending(1000),
nats.PublishAsyncErrHandler(func(js nats.JetStream, originalMsg *nats.Msg, err error) {
log.Printf("Async publish error: %v", err)
}),
}
js, err := nc.JetStream(jsOpts...)
// Configure consumer options
consumerOpts := []nats.SubOpt{
nats.Durable("coze-consumer"),
nats.MaxDeliver(3),
nats.AckWait(30 * time.Second),
nats.MaxAckPending(100),
}
sub, err := js.PullSubscribe(topic, "coze-group", consumerOpts...)
Connection Failures:
Message Loss:
Performance Issues:
NATS provides rich debugging tools:
# NATS CLI tools
nats server info
nats stream list
nats consumer list
# Monitor message flow
nats sub "coze.>"
nats pub "coze.test" "hello world"
Recommend using hierarchical subject naming:
coze.workflow.{workflow_id}.{event_type}
coze.agent.{agent_id}.{action}
coze.knowledge.{kb_id}.{operation}
Implement comprehensive error handling mechanisms:
func (c *Consumer) handleMessage(msg *nats.Msg) {
defer func() {
if r := recover(); r != nil {
log.Printf("Message processing panic: %v", r)
msg.Nak() // Reject message, trigger retry
}
}()
if err := c.processMessage(msg.Data); err != nil {
log.Printf("Message processing error: %v", err)
msg.Nak()
return
}
msg.Ack() // Acknowledge successful message processing
}
Properly manage NATS connections and resources:
func (p *Producer) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil
}
p.closed = true
if p.nc != nil {
p.nc.Close()
}
return nil
}
NATS as Coze Studio's EventBus solution provides lightweight, high-performance, and easy-to-deploy messaging capabilities. Through JetStream extensions, NATS can also provide enterprise-grade message persistence and stream processing functionality.
Key advantages of choosing NATS:
NATS is particularly suitable for the following scenarios: