docs/advanced-guide/using-publisher-subscriber/page.md
Publisher Subscriber is an architectural design pattern for asynchronous communication between different entities. These could be different applications or different instances of the same application. Thus, the movement of messages between the components is made possible without the components being aware of each other's identities, meaning the components are decoupled. This makes the application/system more flexible and scalable as each component can be scaled and maintained according to its own requirement.
In GoFr application if a user wants to use the Publisher-Subscriber design, it supports several message brokers, including Apache Kafka, Google PubSub, MQTT, NATS JetStream, Redis Pub/Sub, Azure Event Hubs, and Amazon SQS. The initialization of the PubSub is done in an IoC container which handles the PubSub client dependency. With this, the control lies with the framework and thus promotes modularity, testability, and re-usability. Users can do publish and subscribe to multiple topics in a single application, by providing the topic name. Users can access the methods of the container to get the Publisher and Subscriber interface to perform subscription to get a single message or publish a message on the message broker.
Container is part of the GoFr Context
Some of the configurations that are required to configure the PubSub backend that an application is to use
that are specific for the type of message broker user wants to use.
PUBSUB_BACKEND defines which message broker the application needs to use.
{% table %}
PUBSUB_BACKEND+KAFKAPUBSUB_BROKER+localhost:9092 or localhost:8087,localhost:8088,localhost:8089CONSUMER_IDorder-consumerPUBSUB_OFFSET--110KAFKA_BATCH_SIZE-10010KAFKA_BATCH_BYTES-104857665536KAFKA_BATCH_TIMEOUT-1000300KAFKA_SECURITY_PROTOCOL-PLAINTEXTSASL_SSLKAFKA_SASL_MECHANISM-""PLAINKAFKA_SASL_USERNAME-""userKAFKA_SASL_PASSWORD-""passwordKAFKA_TLS_CERT_FILE-""/path/to/cert.pemKAFKA_TLS_KEY_FILE-""/path/to/key.pemKAFKA_TLS_CA_CERT_FILE-""/path/to/ca.pemKAFKA_TLS_INSECURE_SKIP_VERIFY-falsetrue{% /table %}
PUBSUB_BACKEND=KAFKA# using apache kafka as message broker
PUBSUB_BROKER=localhost:9092
CONSUMER_ID=order-consumer
KAFKA_BATCH_SIZE=1000
KAFKA_BATCH_BYTES=1048576
KAFKA_BATCH_TIMEOUT=300
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=user
KAFKA_SASL_PASSWORD=password
KAFKA_TLS_CERT_FILE=/path/to/cert.pem
KAFKA_TLS_KEY_FILE=/path/to/key.pem
KAFKA_TLS_CA_CERT_FILE=/path/to/ca.pem
KAFKA_TLS_INSECURE_SKIP_VERIFY=true
docker run --name kafka-1 -p 9092:9092 \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_BROKER_ID=1 \
-e [email protected]:9093 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_NODE_ID=1 \
-v kafka_data:/bitnami \
bitnami/kafka:3.4
PUBSUB_BACKEND=GOOGLE // using Google PubSub as message broker
GOOGLE_PROJECT_ID=project-order // google projectId where the PubSub is configured
GOOGLE_SUBSCRIPTION_NAME=order-consumer // unique subscription name to identify the subscribing entity
docker pull gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
docker run --name=gcloud-emulator -d -p 8086:8086 \
gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --project=test123 \
--host-port=0.0.0.0:8086
Note: To set GOOGLE_APPLICATION_CREDENTIAL - refer {% new-tab-link title="here" href="https://cloud.google.com/docs/authentication/application-default-credentials" /%}
Note: In Google PubSub only one subscription name can access one topic, framework appends the topic name and subscription name to form the unique subscription name on the Google client.
PUBSUB_BACKEND=MQTT // using MQTT as pubsub
MQTT_HOST=localhost // broker host URL
MQTT_PORT=1883 // broker port
MQTT_CLIENT_ID_SUFFIX=test // suffix to a random generated client-id(uuid v4)
#some additional configs(optional)
MQTT_PROTOCOL=tcp // protocol for connecting to broker can be tcp, tls, ws or wss
MQTT_MESSAGE_ORDER=true // config to maintain/retain message publish order, by default this is false
MQTT_USER=username // authentication username
MQTT_PASSWORD=password // authentication password
Note : If
MQTT_HOSTconfig is not provided, the application will connect to a public broker {% new-tab-link title="EMQX Broker" href="https://www.emqx.com/en/mqtt/public-mqtt5-broker" /%}
docker run -d \
--name mqtt \
-p 8883:8883 \
-v \
eclipse-mosquitto:latest <path-to >/mosquitto.conf:/mosquitto/config/mosquitto.conf
Note: find the default mosquitto config file {% new-tab-link title="here" href="https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf" /%}
NATS JetStream is supported as an external PubSub provider, meaning if you're not using it, it won't be added to your binary.
References
https://docs.nats.io/ https://docs.nats.io/nats-concepts/jetstream https://docs.nats.io/using-nats/developer/connecting/creds
PUBSUB_BACKEND=NATS
PUBSUB_BROKER=nats://localhost:4222
NATS_STREAM=mystream
NATS_SUBJECTS=orders.*,shipments.*
NATS_MAX_WAIT=5s
NATS_MAX_PULL_WAIT=500ms
NATS_CONSUMER=my-consumer
NATS_CREDS_FILE=/path/to/creds.json
To set up NATS JetStream, follow these steps:
go get gofr.dev/pkg/gofr/datasources/pubsub/nats
AddPubSub method to add the NATS JetStream driver to your application:app := gofr.New()
app.AddPubSub(nats.New(nats.Config{
Server: "nats://localhost:4222",
Stream: nats.StreamConfig{
Stream: "mystream",
Subjects: []string{"orders.*", "shipments.*"},
},
MaxWait: 5 * time.Second,
MaxPullWait: 500 * time.Millisecond,
Consumer: "my-consumer",
CredsFile: "/path/to/creds.json",
}))
docker run -d \
--name nats \
-p 4222:4222 \
-p 8222:8222 \
-v \
nats:2.9.16 <path-to >/nats.conf:/nats/config/nats.conf
| Name | Description | Required | Default | Example |
|---|---|---|---|---|
PUBSUB_BACKEND | Set to "NATS" to use NATS JetStream as the message broker | Yes | - | NATS |
PUBSUB_BROKER | NATS server URL | Yes | - | nats://localhost:4222 |
NATS_STREAM | Name of the NATS stream | Yes | - | mystream |
NATS_SUBJECTS | Comma-separated list of subjects to subscribe to | Yes | - | orders.*,shipments.* |
NATS_MAX_WAIT | Maximum wait time for batch requests | No | - | 5s |
NATS_MAX_PULL_WAIT | Maximum wait time for individual pull requests | No | 0 | 500ms |
NATS_CONSUMER | Name of the NATS consumer | No | - | my-consumer |
NATS_CREDS_FILE | Path to the credentials file for authentication | No | - | /path/to/creds.json |
When subscribing or publishing using NATS JetStream, make sure to use the appropriate subject name that matches your stream configuration. For more information on setting up and using NATS JetStream, refer to the official NATS documentation.
Redis Pub/Sub is a lightweight messaging system. GoFr supports two modes:
Redis Pub/Sub uses the same Redis connection configuration as the Redis datasource (REDIS_HOST, REDIS_PORT, REDIS_DB, TLS, etc.).
See the config reference: https://gofr.dev/docs/references/configs#redis.
.envPUBSUB_BACKEND=REDIS
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USER=myuser
REDIS_PASSWORD=mypassword
REDIS_DB=0
REDIS_PUBSUB_DB=1
REDIS_TLS_ENABLED=true
REDIS_TLS_CA_CERT=/path/to/ca.pem
REDIS_TLS_CERT=/path/to/cert.pem
REDIS_TLS_KEY=/path/to/key.pem
# Streams mode (default) - requires consumer group
REDIS_STREAMS_CONSUMER_GROUP=my-group
REDIS_STREAMS_CONSUMER_NAME=my-consumer
REDIS_STREAMS_BLOCK_TIMEOUT=5s
REDIS_STREAMS_PEL_RATIO=0.7 # 70% PEL, 30% new messages
REDIS_STREAMS_MAXLEN=1000
# To use PubSub mode instead, set:
# REDIS_PUBSUB_MODE=pubsub
docker run -d \
--name redis \
-p 6379:6379 \
redis:7-alpine
For Redis with password authentication:
docker run -d \
--name redis \
-p 6379:6379 \
redis:7-alpine redis-server --requirepass mypassword
The following configs apply specifically to Redis Pub/Sub behavior. For base Redis connection/TLS configs, refer to
https://gofr.dev/docs/references/configs#redis.
{% table %}
PUBSUB_BACKENDREDIS to use Redis as the Pub/Sub backend.REDISREDIS_PUBSUB_MODEstreams (default, at-least-once) or pubsub (at-most-once)streamspubsubREDIS_STREAMS_CONSUMER_GROUPmygroupREDIS_STREAMS_CONSUMER_NAMEconsumer-1REDIS_STREAMS_BLOCK_TIMEOUT5s2s or 30sImportant: If
REDIS_STREAMS_CONSUMER_GROUPis empty or not provided, an error will occur when attempting to subscribe. However, publishing will work correctly without it.
REDIS_STREAMS_PEL_RATIO0.70.5 or 0.8REDIS_STREAMS_MAXLEN0 for unlimited.0 (unlimited)10000REDIS_PUBSUB_DBREDIS_DB when using migrations + streams mode.151REDIS_PUBSUB_BUFFER_SIZE1001000REDIS_PUBSUB_QUERY_TIMEOUT5s30sREDIS_PUBSUB_QUERY_LIMIT1050{% /table %}
For Redis with TLS:
docker run -d \
--name redis \
-p 6379:6379 \
-v /path/to/certs:/tls \
redis:7-alpine redis-server \
--tls-port 6380 \
--port 0 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
Note: Topics are auto-created on first publish. When using GoFr migrations with Streams mode, keep
REDIS_DBandREDIS_PUBSUB_DBseparate (defaults: 0 and 15). ForREDIS_STREAMS_BLOCK_TIMEOUT: use 1s-2s for real-time or 10s-30s for batch processing.
GoFr supports Event Hubs starting gofr version v1.22.0.
While subscribing gofr reads from all the partitions of the consumer group provided in the configuration reducing hassle to manage them.
Azure Event Hubs is supported as an external PubSub provider such that if you are not using it, it doesn't get added in your binary.
Import the external driver for eventhub using the following command.
go get gofr.dev/pkg/gofr/datasource/pubsub/eventhub
Use the AddPubSub method of GoFr's app to connect
Example
app := gofr.New()
app.AddPubSub(eventhub.New(eventhub.Config{
ConnectionString: "Endpoint=sb://gofr-dev.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<key>",
ContainerConnectionString: "DefaultEndpointsProtocol=https;AccountName=gofrdev;AccountKey=<key>;EndpointSuffix=core.windows.net",
StorageServiceURL: "https://gofrdev.windows.net/",
StorageContainerName: "test",
EventhubName: "test1",
ConsumerGroup: "$Default",
}))
While subscribing/publishing from Event Hubs make sure to keep the topic-name same as event-hub name.
To set up Azure Event Hubs refer the following documentation.
As GoFr manages reading from all the partitions it needs to store the information about what has been read and what is left for that GoFr uses Azure Container which can be setup from the following documentation.
{% table %}
{% /table %}
GoFr supports Amazon Simple Queue Service (SQS) as an external PubSub provider. SQS is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications.
Import the external driver for sqs using the following command.
go get gofr.dev/pkg/gofr/datasource/pubsub/sqs
Use the AddPubSub method of GoFr's app to connect.
Example
package main
import (
"gofr.dev/pkg/gofr"
"gofr.dev/pkg/gofr/datasource/pubsub/sqs"
)
func main() {
app := gofr.New()
app.AddPubSub(sqs.New(&sqs.Config{
Region: "us-east-1",
AccessKeyID: "your-access-key-id", // optional if using IAM roles
SecretAccessKey: "your-secret-access-key", // optional if using IAM roles
// Endpoint: "http://localhost:4566", // optional: for LocalStack
}))
app.Run()
}
Note: When using IAM roles (e.g., on EC2 or ECS), you can omit
AccessKeyIDandSecretAccessKey. The SDK will automatically use the instance's IAM role credentials.
{% table %}
Regionus-east-1AccessKeyIDAKIAIOSFODNN7EXAMPLESecretAccessKeywJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEYSessionTokenFwoGZXIvYXdzE...Endpointhttp://localhost:4566{% /table %}
Note: SQS queues must be created before publishing or subscribing. Use AWS CLI, AWS Console, or the
CreateTopicmethod in migrations to create queues programmatically. GoFr supports Standard Queues by default—FIFO queues are not currently supported. Advanced features like Dead Letter Queues (DLQ) and Broadcast (SNS) can be configured at the infrastructure level.
LocalStack emulates AWS services locally, making it ideal for development and testing without an AWS account.
docker run -d \
--name localstack \
-p 4566:4566 \
-e SERVICES=sqs \
localstack/localstack:latest
After LocalStack is running, create queues using the AWS CLI:
aws --endpoint-url=http://localhost:4566 --region us-east-1 \
sqs create-queue --queue-name order-logs
aws --endpoint-url=http://localhost:4566 --region us-east-1 \
sqs create-queue --queue-name products
When using LocalStack, set the Endpoint field in sqs.Config to point at LocalStack and use dummy credentials:
app.AddPubSub(sqs.New(&sqs.Config{
Region: "us-east-1",
Endpoint: "http://localhost:4566",
AccessKeyID: "test",
SecretAccessKey: "test",
}))
Adding a subscriber is similar to adding an HTTP handler, which makes it easier to develop scalable applications,
as it decoupled from the Sender/Publisher.
Users can define a subscriber handler and do the message processing and
use app.Subscribe to inject the handler into the application.
This is inversion of control pattern, which lets the control stay with the framework and eases the development
and debugging process.
The subscriber handler has the following signature.
func (ctx *gofr.Context) error
Subscribe method of GoFr App will continuously read a message from the configured PUBSUB_BACKEND which
can be KAFKA, GOOGLE, MQTT, NATS, REDIS, or AZURE_EVENTHUB. For external providers like NATS JetStream, Azure Event Hubs, and Amazon SQS, use app.AddPubSub() instead. These can be configured in the configs folder under .env
The returned error determines which messages are to be committed and which ones are to be consumed again.
// First argument is the `topic name` followed by a handler which would process the
// published messages continuously and asynchronously.
app.Subscribe("order-status", func(ctx *gofr.Context)error{
// Handle the pub-sub message here
})
The context ctx provides user with the following methods:
Bind() - Binds the message value to a given data type. Message can be converted to struct, map[string]any, int, bool, float64 and string types.Param(p string)/PathParam(p string) - Returns the topic when the same is passed as param.package main
import (
"gofr.dev/pkg/gofr"
)
func main() {
app := gofr.New()
app.Subscribe("order-status", func(c *gofr.Context) error {
var orderStatus struct {
OrderId string `json:"orderId"`
Status string `json:"status"`
}
err := c.Bind(&orderStatus)
if err != nil {
c.Logger.Error(err)
// returning nil here as we would like to ignore the
// incompatible message and continue reading forward
return nil
}
c.Logger.Info("Received order ", orderStatus)
return nil
})
app.Run()
}
The publishing of message is advised to done at the point where the message is being generated.
To facilitate this, user can access the publishing interface from gofr Context(ctx) to publish messages.
ctx.GetPublisher().Publish(ctx, "topic", msg)
Users can provide the topic to which the message is to be published. GoFr also supports multiple topic publishing. This is beneficial as applications may need to send multiple kinds of messages in multiple topics.
package main
import (
"encoding/json"
"gofr.dev/pkg/gofr"
)
func main() {
app := gofr.New()
app.POST("/publish-order", order)
app.Run()
}
func order(ctx *gofr.Context) (any, error) {
type orderStatus struct {
OrderId string `json:"orderId"`
Status string `json:"status"`
}
var data orderStatus
err := ctx.Bind(&data)
if err != nil {
return nil, err
}
msg, _ := json.Marshal(data)
err = ctx.GetPublisher().Publish(ctx, "order-logs", msg)
if err != nil {
return nil, err
}
return "Published", nil
}
Check out the following examples on how to publish/subscribe to given topics:
Subscribing Topics
Publishing Topics
GoFr automatically traces every publish and subscribe call across Kafka, NATS JetStream, Google Pub/Sub, and Amazon SQS. No user code is required: as long as TRACE_EXPORTER is configured (see {% new-tab-link newtab=false title="Observability → Tracing" href="/docs/quick-start/observability#tracing" /%}), the framework wires everything in.
When you call ctx.GetPublisher().Publish(ctx, topic, msg), GoFr:
<backend>-publish (for example kafka-publish) with SpanKind=Producer and attributes messaging.system, messaging.destination.name, messaging.operation=publish.When the message is delivered to a subscriber registered with app.Subscribe(topic, handler), GoFr:
<backend>-subscribe with SpanKind=Consumer, as a child of the producer's span. This means the consumer span shares the same TraceID as the publisher and lists the publisher's span as its parent.The result is that an end-to-end flow such as HTTP → publish → subscribe → publish → subscribe shows up as one connected trace in any tracing UI, with the full waterfall visible:
[api-gateway ] POST /order (root)
[api-gateway ] kafka-publish child of POST /order
[order-service ] kafka-subscribe child of api-gateway's publish [+1 link]
[order-service ] kafka-publish child of order-service's subscribe
[notification-service ] kafka-subscribe child of order-service's publish [+1 link]
GoFr's tracer uses ParentBased(TraceIDRatioBased(TRACER_RATIO)) (see pkg/gofr/otel.go). Because the consumer span inherits the producer's sampling decision, head-based sampling via TRACER_RATIO is consistent across the entire chain — if the producer is sampled out, every downstream consumer span is dropped at creation as well.
For high-throughput pipelines, set TRACER_RATIO below 1.0 (for example 0.1 for 10% sampling) to keep trace volume manageable. For very long-lived async sagas (where one trace stays open for hours), prefer tail-based sampling at the OpenTelemetry Collector tier.
[!NOTE] Distributed tracing for pub/sub is fully transparent — the existing examples in {% new-tab-link title="examples/using-publisher" href="https://github.com/gofr-dev/gofr/tree/main/examples/using-publisher" /%} and {% new-tab-link title="examples/using-subscriber" href="https://github.com/gofr-dev/gofr/tree/main/examples/using-subscriber" /%} already produce connected traces without any tracing-specific code.