internal/docs/pubsub/design.md
pubsub DesignThis document proposes a new pubsub package for the Go CDK.
A developer designing a new system with cross-cloud portability in mind could choose a messaging system supporting pubsub, such as ZeroMQ, Kafka or RabbitMQ. These pubsub systems run on AWS, Azure, GCP and others, so they pose no obstacle to portability between clouds. They can also be run on-prem. Users wanting managed pubsub could go with Confluent Cloud for Kafka (AWS, GCP), or CloudAMQP for RabbitMQ (AWS, Azure) without losing much in the way of portability.
So what’s missing? The solution described above means being locked into a particular implementation of pubsub. There is also a potential for lock-in when building systems in terms of the cloud-specific services such as AWS SNS+SQS, GCP PubSub or Azure Service Bus.
Developers may wish to compare different pubsub systems in terms of their
performance, reliability, cost or other factors, and they may want the option to
move between these systems without too much friction. A pubsub package in the
Go CDK could lower the cost of such experiments and migrations.
Create new topics in the cloud. The Go CDK focuses on developer concerns, but topic creation is an operator concern.
Create new subscriptions in the cloud. The subscribers are assumed to correspond to components of a distributed system rather than to users of that system.
Pubsub is a frequently requested feature for the Go CDK project [github issue]. A key use case motivating these requests is to support event driven architectures.
There are several pubsub systems available that could be made to work with the Go CDK by writing drivers for them. Here is a table comparing some of them.
Given a topic that has already been created on the pubsub server, messages can
be sent to that topic by calling acmepubsub.OpenTopic and calling the Send
method of the returned Topic, like this (assuming a fictional pubsub service
called "acme"):
package main
import (
"context"
"log"
"net/http"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
func main() {
log.Fatal(serve())
}
func serve() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
t, err := acmepubsub.OpenTopic(ctx, client, "user-signup", nil)
if err != nil {
return err
}
defer t.Close()
http.HandleFunc("/signup", func(w http.ResponseWriter, r *http.Request) {
err := t.Send(r.Context(), pubsub.Message{Body: []byte("Someone signed up")})
if err != nil {
log.Println(err)
}
})
return http.ListenAndServe(":8080", nil)
}
The call to Send will only return after the message has been sent to the
server or its sending has failed.
Messages can be received from an existing subscription to a topic by calling the
Receive method on a Subscription object returned from
acmepubsub.OpenSubscription, like this:
package main
import (
"context"
"fmt"
"log"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
func main() {
if err := receive(); err != nil {
log.Fatal(err)
}
}
func receive() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
s, err := acmepubsub.OpenSubscription(ctx, client, "user-signup-minder", nil)
if err != nil {
return err
}
defer s.Close()
msg, err := s.Receive(ctx)
if err != nil {
return err
}
// Do something with msg.
fmt.Printf("Got message: %s\n", msg.Body)
// Acknowledge that we handled the message.
msg.Ack()
}
A more realistic subscriber client would process messages in a loop, like this:
package main
import (
"context"
"log"
"os"
"os/signal"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
func main() {
if err := receive(); err != nil {
log.Fatal(err)
}
}
func receive() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
s, err := acmepubsub.OpenSubscription(ctx, client, "signup-minder", nil)
if err != nil {
return err
}
defer s.Close()
// Process messages.
for {
msg, err := s.Receive(ctx)
if err {
return err
}
log.Printf("Got message: %s\n", msg.Body)
msg.Ack()
}
}
The messages can be processed concurrently with an inverted worker pool, like this:
package main
import (
"context"
"log"
"os"
"os/signal"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
func main() {
if err := receive(); err != nil {
log.Fatal(err)
}
}
func receive() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
s, err := acmepubsub.OpenSubscription(ctx, client, "user-signup-minder", nil)
if err != nil {
return err
}
defer s.Close()
// Process messages.
const poolSize = 10
// Use a buffered channel as a semaphore.
sem := make(chan struct{}, poolSize)
for {
msg, err := s.Receive(ctx)
if err {
return err
}
sem <- struct{}{}
go func() {
log.Printf("Got message: %s", msg.Body)
msg.Ack()
<-sem
}()
}
for n := poolSize; n > 0; n-- {
sem <- struct{}{}
}
}
Adding support for a new pubsub system involves the following steps, continuing with the "acme" example:
acmepubsub.topic and subscription types to acmepubsub implementing
the corresponding interfaces in the github.com/go-cloud/pubsub/driver
package.func OpenTopic(...) that creates an acmepubsub.topic and returns a
concrete pubsub.Topic object made from it.func OpenSubscription(...) that creates an acmepubsub.subscription
and returns a pubsub.Subscription object made from it.Here is a sketch of what the acmepubsub package could look like:
package acmepubsub
import (
"context"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/driver"
)
// OpenTopic opens an existing topic on the pubsub server and returns a Topic
// that can be used to send messages to that topic.
func OpenTopic(ctx context.Context, client *rawacmepubsub.Client, topicName string) (*pubsub.Topic, error) {
rt, err := client.Topic(ctx, topicName)
if err != nil {
return nil, err
}
rt, err := client.Topic(ctx, topicName)
if err != nil {
return err
}
t := &topic{ rawTopic: rt }
return pubsub.NewTopic(t)
}
// OpenSubscription opens an existing subscription on the server and returns a
// Subscription that can be used to receive messages.
func OpenSubscription(ctx context.Context, client *rawacmepubsub.Client, subscriptionName string) (*pubsub.Subscription, error) {
rs, err := client.Subscription(ctx, subscriptionName)
if err != nil {
return err
}
s := &subscription{ rawSub: rs }
return pubsub.NewSubscription(s)
}
type topic struct {
rawTopic *rawacmepubsub.Topic
}
func (t *topic) SendBatch(ctx context.Context, []*pubsub.Message) error {
// ...
}
func (t *topic) Close() error {
// ...
}
type subscription struct {
rawSub *rawacmepubsub.Subscription
}
func (s *subscription) ReceiveBatch(ctx context.Context) ([]*pubsub.Message, error) {
// ...
}
func (s *subscription) SendAcks(ctx context.Context, []pubsub.AckID) error {
// ...
}
func (s *subscription) Close() error {
// ...
}
The driver interfaces are batch-oriented because some pubsub systems can more efficiently deal with batches of messages than with one at a time. Streaming was considered but it does not appear to provide enough of a performance gain to be worth the additional complexity of supporting it across different pubsub systems [benchmarks].
The driver interfaces will be located in the
github.com/google/go-cloud/pubsub/driver package and will look something like
this:
package driver
type AckID interface{}
type Message struct {
// Body contains the content of the message.
Body []byte
// Attributes has key/value metadata for the message.
Attributes map[string]string
// AckID identifies the message on the server.
// It can be used to ack the message after it has been received.
AckID AckID
}
// Topic publishes messages.
type Topic interface {
// SendBatch publishes all the messages in ms.
SendBatch(ctx context.Context, ms []*Message) error
// Close disconnects the Topic.
Close() error
}
// Subscription receives published messages.
type Subscription interface {
// ReceiveBatch should return a batch of messages that have queued up
// for the subscription on the server.
//
// If there is a transient failure, this method should not retry but
// should return a nil slice and an error. The concrete API will take
// care of retry logic.
//
// If the service returns no messages for some other reason, this
// method should return the empty slice of messages and not attempt to
// retry.
//
// ReceiveBatch is only called sequentially for individual
// Subscriptions.
ReceiveBatch(ctx context.Context) ([]*Message, error)
// SendAcks acknowledges the messages with the given ackIDs on the
// server so that they
// will not be received again for this subscription. This method
// returns only after all the ackIDs are sent.
SendAcks(ctx context.Context, ackIDs []interface{}) error
// Close disconnects the Subscription.
Close() error
}
The developer experience of using Go CDK's pubsub involves sending, receiving and acknowledging one message at a time, all in terms of synchronous calls. Behind the scenes, the driver implementations deal with batches of messages and acks. The concrete API, to be written by the Go CDK team, takes care of creating the batches in the case of Send or Ack, and dealing out messages one at a time in the case of Receive.
The concrete API will be located at github.com/google/go-cloud/pubsub and will
look something like this:
package pubsub
import (
"context"
"github.com/google/go-cloud/pubsub/driver"
)
// Message contains data to be published.
type Message struct {
// Body contains the content of the message.
Body []byte
// Attributes contains key/value pairs with metadata about the message.
Attributes map[string]string
// ackID is an ID for the message on the server, used for acking.
ackID AckID
// sub is the Subscription this message was received from.
sub *Subscription
// isAcked is true if Ack has been called on this message.
isAcked bool
}
type AckID interface{}
// Ack acknowledges the message, telling the server that it does not need to
// be sent again to the associated Subscription. This method returns
// immediately. If Ack has already been called on the message, Ack panics.
func (m *Message) Ack() {
// Send the ack ID back to the subscriber for batching.
// The ack is sent to the server in a separate goroutine
// managed by the Subscription from which this message was
// received.
// ...
}
// Topic publishes messages to all its subscribers.
type Topic struct {
driver driver.Topic
mcChan chan msgCtx
doneChan chan struct{}
}
// msgCtx pairs a Message with the Context of its Send call.
type msgCtx struct {
msg *Message
ctx context.Context
}
// Send publishes a message. It only returns after the message has been
// sent, or failed to be sent. The call will fail if ctx is canceled.
// Send can be called from multiple goroutines at once.
func (t *Topic) Send(ctx context.Context, m *Message) error {
// Send this message over t.mcChan and then wait for the batch including
// this message to be sent to the server.
// ...
}
// Close disconnects the Topic.
func (t *Topic) Close() error {
close(t.doneChan)
return t.driver.Close()
}
// NewTopic makes a pubsub.Topic from a driver.Topic.
func NewTopic(d driver.Topic) *Topic {
t := &Topic{
driver: d,
mcChan: make(chan msgCtx),
doneChan: make(chan struct{}),
}
go func() {
// Pull messages from t.mcChan and put them in batches. Send the current
// batch whenever it is large enough or enough time has elapsed since
// the last send.
// ...
}()
return t
}
// Subscription receives published messages.
type Subscription struct {
driver driver.Subscription
// ackChan conveys ackIDs from Message.Ack to the ack batcher goroutine.
ackChan chan AckID
// ackErrChan reports errors back to Message.Ack.
ackErrChan chan error
// doneChan tells the goroutine from startAckBatcher to finish.
doneChan chan struct{}
// q is the local queue of messages downloaded from the server.
q []*Message
}
// Receive receives and returns the next message from the Subscription's queue,
// blocking if none are available. This method can be called concurrently from
// multiple goroutines. On systems that support acks, the Ack() method of the
// returned Message has to be called once the message has been processed, to
// prevent it from being received again.
func (s *Subscription) Receive(ctx context.Context) (*Message, error) {
if len(s.q) == 0 {
// Get the next batch of messages from the server.
// ...
}
m := s.q[0]
s.q = s.q[1:]
return m, nil
}
// Close disconnects the Subscription.
func (s *Subscription) Close() error {
close(s.doneChan)
return s.driver.Close()
}
// NewSubscription creates a Subscription from a driver.Subscription and opts to
// tune sending and receiving of acks and messages. Behind the scenes,
// NewSubscription spins up a goroutine to gather acks into batches and
// periodically send them to the server.
func NewSubscription(s driver.Subscription) *Subscription {
// Details similar to the body of NewTopic should go here.
}
Topics will gather messages into batches for sending. The batch size will be dynamically tuned according to how many messages are being sent concurrently.
Subscriptions will gather message acks into batches the same way, also
dynamically tuning the batch size. If sending acks back to the server fails
transiently then it will be retried, most likely within a loop in the concrete
API. If an unrecoverable error occurs while sending acks then a flag will be set
on the pubsub.Subscription saying that the whole Subscription is no longer
usable. Calls to Receive will fail from then on.
In this alternative, the application code sends, receives and acknowledges messages in batches. Here is an example of how it would look from the developer's perspective, in a situation where not too many signups are happening per second.
package main
import (
"context"
"log"
"net/http"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
func main() {
log.Fatal(serve())
}
func serve() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
t, err := acmepubsub.OpenTopic(ctx, client, "user-signup", nil)
if err != nil {
return err
}
defer t.Close()
http.HandleFunc("/signup", func(w http.ResponseWriter, r *http.Request) {
err := t.Send(r.Context(), []pubsub.Message{{Body: []byte("Someone signed up")}})
if err != nil {
log.Println(err)
}
})
return http.ListenAndServe(":8080", nil)
}
For a company experiencing explosive growth or enthusiastic spammers creating more signups than this simple-minded implementation can handle, the app would have to be adapted to create non-singleton batches, like this:
package main
import (
"context"
"log"
"net/http"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
const batchSize = 1000
func main() {
log.Fatal(serve())
}
func serve() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
t, err := acmepubsub.OpenTopic(ctx, client, "user-signup", nil)
if err != nil {
return err
}
defer t.Close()
c := make(chan *pubsub.Message)
go sendBatches(ctx, t, c)
http.HandleFunc("/signup", func(w http.ResponseWriter, r *http.Request) {
c <- &pubsub.Message{Body: []byte("Someone signed up")}
})
return http.ListenAndServe(":8080", nil)
}
func sendBatches(ctx context.Context, t *pubsub.Topic, c chan *pubsub.Message) {
batch := make([]*pubsub.Message, batchSize)
for {
for i := 0; i < batchSize; i++ {
batch[i] = <-c
}
if err := t.Send(ctx, batch); err != nil {
log.Println(err)
}
}
}
This shows how the complexity of batching has been pushed onto the application code. Removing messages from the batch when HTTP/2 requests are canceled would require the application code to be even more complex, adding more risk of bugs.
In this API, the application code has to either request batches of size 1, meaning more network traffic, or it has to explicitly manage the batches of messages it receives. Here is an example of how this API would be used for serial message processing:
package main
import (
"context"
"log"
"os"
"os/signal"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
const batchSize = 10
func main() {
if err := receive(); err != nil {
log.Fatal(err)
}
}
func receive() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
s, err := acmepubsub.OpenSubscription(ctx, client, "signup-minder", nil)
if err != nil {
return err
}
defer s.Close()
// Process messages.
for {
msgs, err := s.Receive(ctx, batchSize)
if err {
return err
}
acks := make([]pubsub.AckID, 0, batchSize)
for _, msg := range msgs {
// Do something with msg.
fmt.Printf("Got message: %q\n", msg.Body)
acks = append(acks, msg.AckID)
}
err := s.SendAcks(ctx, acks)
if err != nil {
return err
}
}
}
Here’s what it might look like to use this batch-only API with the inverted worker pool pattern:
package main
import (
"context"
"log"
"os"
"os/signal"
rawacmepubsub "github.com/acme/pubsub"
"github.com/google/go-cloud/pubsub"
"github.com/google/go-cloud/pubsub/acmepubsub"
)
const batchSize = 100
const poolSize = 10
func main() {
if err := receive(); err != nil {
log.Fatal(err)
}
}
func receive() error {
ctx := context.Background()
client, err := rawacmepubsub.NewClient(ctx, "unicornvideohub")
if err != nil {
return err
}
s, err := acmepubsub.OpenSubscription(ctx, client, "user-signup-minder", nil)
if err != nil {
return err
}
defer s.Close()
// Receive the messages and forward them to a chan.
msgsChan := make(chan *pubsub.Message)
go func() {
for {
msgs, err := s.Receive(ctx, batchSize)
if err {
log.Fatal(err)
}
for _, m := range msgs {
msgsChan <- m
}
}
}
// Get the acks from a chan and send them back to the
// server in batches.
acksChan := make(chan pubsub.AckID)
go func() {
for {
batch := make([]pubsub.AckID, batchSize)
for i := 0; i < len(batch); i++ {
batch[i] = <-acksChan
}
if err := s.SendAcks(ctx, batch); err != nil {
/* handle err */
}
}
}
// Use a buffered channel as a semaphore.
sem := make(chan struct{}, poolSize)
for msg := range msgsChan {
sem <- struct{}{}
go func(msg *pubsub.Message) {
log.Printf("Got message: %s", msg.Body)
acksChan <- msg.AckID
<-sem
}(msg)
}
for n := poolSize; n > 0; n-- {
sem <- struct{}{}
}
}
Here are some trade-offs of this design:
Pro:
Con:
Here is an example of what application code could look like for a pubsub API
inspired by go-micro's broker package:
b := somepubsub.NewBroker(...)
if err := b.Connect(); err != nil {
/* handle err */
}
topic := "user-signups"
subID := "user-signups-subscription-1"
s, err := b.Subscription(ctx, topic, subID, func(pub broker.Publication) error {
fmt.Printf("%s\n", pub.Message.Body)
return nil
})
if err := b.Publish(ctx, topic, &broker.Message{ Body: []byte("alice signed up") }); err != nil {
/* handle err */
}
// Sometime later:
if err := s.Unsubscribe(ctx); err != nil {
/* handle err */
}
Pro:
Con:
In pubsub systems with acknowledgement, messages are kept in a queue associated with the subscription on the server. When a client receives one of these messages, its counterpart on the server is marked as being processed. Once the client finishes processing the message, it sends an acknowledgement (or "ack") to the server and the server removes the message from the subscription queue. There may be a deadline for the acknowledgement, past which the server unmarks the message so that it can be received again for another try at processing.
Redis Pub/Sub and ZeroMQ don’t support acking, but many others do including GCP PubSub, Azure Service Bus, RabbitMQ, and Redis Streams. Given the wide support and usefulness, it makes sense to support message acking in the Go CDK.
As of this writing, it is an open question as to what should be done about pubsub systems that do not support acks. Some possibilities have been discussed, but no clear best option has emerged yet:
Ack, and if so then doesn't that unduly
complicate the code for apps that never use non-acking systems? This option
is also potentially misleading for developers who would naturally assume
that un-acked messages would be redelivered.Receive method returns an ack funcIn this alternative, the application code would look something like this:
msg, ack, err := s.Receive(ctx)
log.Printf("Received message: %q", msg.Body)
ack(msg)
Pro:
ack function is not used.Con:
ack around along with msg is inconvenient.github.com/go-cloud/pubsub)We can test that the batched sending, receiving and acking work as intended by making mock implementations of the driver interfaces.
At least the following things should be tested:
pubsub.Message.Ack causes driver.Subscription.SendAcks to be
called.pubsub.Topic.Send causes driver.Topic.SendBatch to be called.pubsub.Subscription.Receive causes
driver.Subscription.ReceiveBatch to be called.github.com/go-cloud/pubsub/acmepubsub)What is the throughput and latency of the Go CDK's pubsub package, relative to
directly using the APIs for various services?