docs/pulsar-eventbus-integration-guide-en.md
This document provides a comprehensive guide for integrating Apache Pulsar as an EventBus in Coze Studio, including architecture design, implementation details, configuration instructions, and usage guidelines.
In Coze Studio's architecture, EventBus plays a critical role in asynchronous message delivery, including workflow execution, Agent communication, data processing pipelines, and other core functions. As user scale grows and business complexity increases, we need a more powerful and flexible message queue solution.
Pulsar, as a next-generation distributed messaging system, brings the following core advantages to Coze Studio:
| Feature | Pulsar | NSQ | Kafka | RocketMQ |
|---|---|---|---|---|
| Deployment Complexity | Medium | Low | Medium | Medium |
| Performance | High | Medium | High | High |
| Multi-tenancy | Native Support | Not Supported | Limited | Limited |
| Message Persistence | Strong | Limited | Strong | Strong |
| Message Ordering | Strong | Weak | Strong | Strong |
| Horizontal Scaling | Excellent | Medium | Good | Good |
| Scaling Speed | Fast | Medium | Slow | Medium |
| Operational Complexity | Medium | Low | High | Medium |
| Ecosystem | Rich | Simple | Very Rich | Rich |
Pulsar's Scaling Advantages:
Comparison with Other MQ Systems:
This excellent scaling capability makes Pulsar particularly suitable for scenarios like Coze Studio with rapid user growth and fluctuating business loads.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Coze Studio │ │ Pulsar │ │ EventBus │
│ Application │───▶│ Client │───▶│ Manager │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Apache Pulsar │
│ Cluster │
└─────────────────┘
File Location: backend/infra/impl/eventbus/pulsar/producer.go
Core Functions:
type Producer interface {
Send(ctx context.Context, body []byte, opts ...SendOpt) error
BatchSend(ctx context.Context, bodyArr [][]byte, opts ...SendOpt) error
}
type producerImpl struct {
topic string
client pulsar.Client
producer pulsar.Producer
}
Features:
File Location: backend/infra/impl/eventbus/pulsar/consumer.go
Core Functions:
func RegisterConsumer(serviceURL, topic, group string,
consumerHandler eventbus.ConsumerHandler,
opts ...eventbus.ConsumerOpt) error
Features:
File Location: backend/infra/impl/eventbus/eventbus.go
Integration Point:
case consts.MQTypePulsar:
return pulsar.NewProducer(nameServer, topic, group)
# Message queue type
COZE_MQ_TYPE=pulsar
# Pulsar service address
MQ_NAME_SERVER=pulsar://localhost:6650
# JWT authentication token (if authentication is enabled)
PULSAR_JWT_TOKEN=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.example_token
services:
pulsar:
image: apachepulsar/pulsar:3.0.12
container_name: coze-pulsar
restart: always
command: >
sh -c "bin/pulsar standalone"
ports:
- "6650:6650" # Pulsar service port
- "8080:8080" # Pulsar admin port
volumes:
- ./data/pulsar:/pulsar/data
networks:
- coze-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
For production environments, it's recommended to use Pulsar cluster deployment to achieve high availability and better performance. Cluster deployment involves configuring multiple components including ZooKeeper, BookKeeper, and Broker, which can be quite complex.
Production Environment Recommendations:
For detailed cluster deployment configuration, please refer to the Apache Pulsar Official Documentation.
# Clone the project
git clone https://github.com/coze-dev/coze-studio.git
cd coze-studio
Add the Pulsar service to your docker/docker-compose.yml file:
services:
# Add Pulsar service
pulsar:
image: apachepulsar/pulsar:3.0.12
container_name: coze-pulsar
restart: always
command: >
sh -c "bin/pulsar standalone"
ports:
- "6650:6650" # Pulsar service port
- "8080:8080" # Pulsar admin port
volumes:
- ./data/pulsar:/pulsar/data
networks:
- coze-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
# Other existing services...
Modify the .env file to configure Coze Studio to use Pulsar:
# Enter docker directory
cd docker
# Copy environment configuration file
cp .env.example .env
# Edit .env file and add the following configuration:
# Message queue type
COZE_MQ_TYPE=pulsar
# Pulsar service address
MQ_NAME_SERVER=pulsar://pulsar:6650
# JWT authentication token (optional, if authentication is enabled)
# PULSAR_JWT_TOKEN=your_jwt_token_here
# Start complete Coze Studio services including Pulsar
docker-compose up -d
# Check service startup status
docker-compose ps
# Check Pulsar container status
docker ps | grep pulsar
# Check Pulsar health status
curl -f http://localhost:8080/admin/v2/clusters
# View Pulsar logs
docker logs coze-pulsar
# Test Pulsar connection
docker exec -it coze-pulsar bin/pulsar-admin clusters list
http://localhost:3000 (based on actual configuration)http://localhost:8080Now Coze Studio has successfully integrated Pulsar as the message queue, and all EventBus functionality will be handled through Pulsar.
For production environments, it's recommended to use Pulsar cluster deployment to achieve high availability and better performance. Cluster deployment involves configuring multiple components including ZooKeeper, BookKeeper, and Broker, which can be quite complex.
Production Environment Recommendations:
For detailed cluster deployment configuration, please refer to the Apache Pulsar Official Documentation.
For users who need a graphical interface to manage Pulsar clusters, consider using ASP Community Edition. ASP Community Edition is a modern management platform designed specifically for Apache Pulsar, providing an intuitive web interface to manage clusters, tenants, namespaces, topics, and other resources. The platform supports real-time monitoring, performance metrics display, configuration management, and other features that greatly simplify the daily operations of Pulsar clusters.
For more information, please refer to: ASP Community Edition Documentation
Architecture Compatibility Design:
Performance First:
Easy Deployment:
JWT Authentication Support:
// Automatically detect and configure JWT authentication
if jwtToken := os.Getenv(consts.PulsarJWTToken); jwtToken != "" {
clientOptions.Authentication = pulsar.NewAuthenticationToken(jwtToken)
logs.Debugf("Using JWT authentication, token length: %d", len(jwtToken))
}
Batch Sending Optimization:
// Asynchronous batch sending for improved performance
for _, body := range bodyArr {
msg := &pulsar.ProducerMessage{Payload: body}
if option.ShardingKey != nil {
msg.Key = *option.ShardingKey
}
p.producer.SendAsync(ctx, msg, callback)
}
Graceful Shutdown Handling:
// Listen for system signals and gracefully close resources
safego.Go(context.Background(), func() {
signal.WaitExit()
logs.Infof("shutting down pulsar consumer for topic: %s, group: %s", topic, group)
cancel()
consumer.Close()
client.Close()
})
Connection Issues:
# Check Pulsar service status
docker exec -it coze-pulsar bin/pulsar-admin brokers healthcheck
# Check network connectivity
telnet localhost 6650
# View connection configuration
docker exec -it coze-pulsar cat conf/standalone.conf | grep -E "(advertisedAddress|bindAddress)"
Authentication Issues:
# Check JWT Token configuration
echo $PULSAR_JWT_TOKEN
# Verify token validity
docker exec -it coze-pulsar bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
--auth-params token:$PULSAR_JWT_TOKEN \
clusters list
Performance Issues:
# View topic backlog
docker exec -it coze-pulsar bin/pulsar-admin topics stats persistent://public/default/your-topic
# Adjust batch sending parameters
# Batch size and delay can be configured through SendOpt in code
# View Pulsar service logs
docker logs coze-pulsar
# View Pulsar-related information in application logs
tail -f logs/coze-studio.log | grep -i "pulsar\|eventbus"
# Enable verbose logging
# Set rootLogLevel=DEBUG in Pulsar configuration
# Get broker metrics
curl http://localhost:8080/metrics/
# Get specific topic metrics
curl http://localhost:8080/admin/v2/persistent/public/default/your-topic/stats
# Monitor consumption lag
docker exec -it coze-pulsar bin/pulsar-admin topics subscriptions persistent://public/default/your-topic
# Recommended production environment configuration
COZE_MQ_TYPE=pulsar
MQ_NAME_SERVER=pulsar://pulsar-broker-1:6650,pulsar://pulsar-broker-2:6650,pulsar://pulsar-broker-3:6650
PULSAR_JWT_TOKEN=your-production-jwt-token
# Pulsar cluster configuration
# Recommend at least 3 Broker nodes
# Recommend at least 3 BookKeeper nodes
# Recommend at least 3 ZooKeeper nodes
# Producer configuration optimization
# Batch size: 1000 messages or 1MB
# Send timeout: 30 seconds
# Compression algorithm: LZ4
# Consumer configuration optimization
# Receive queue size: 1000
# Acknowledgment timeout: 30 seconds
# Consumer type: Exclusive (ensures ordering)
# Enable JWT authentication
PULSAR_JWT_TOKEN=your-jwt-token
# Configure Access Control Lists (ACL)
# Configure topic-level permissions through Pulsar Admin tools
The Apache Pulsar EventBus integration in Coze Studio achieves the following goals:
Through this integration, Coze Studio provides users with a high-performance, highly reliable, and easily scalable message queue solution, particularly suitable for scenarios requiring high throughput, low latency, and enterprise-grade features.