Back to Go Cloud

Publish Messages to a Topic

internal/website/content/howto/pubsub/publish.md

0.45.011.7 KB
Original Source

Publishing a message to a topic with the Go CDK takes two steps:

  1. [Open a topic][] with the Pub/Sub provider of your choice (once per topic).
  2. [Send messages][] on the topic.

[Open a topic]: {{< ref "#opening" >}} [Send messages]: {{< ref "#sending" >}}

<!--more-->

Opening a Topic {#opening}

The first step in publishing messages to a topic is to instantiate a portable [*pubsub.Topic][] for your service.

The easiest way to do so is to use [pubsub.OpenTopic][] and a service-specific URL pointing to the topic, making sure you ["blank import"][] the driver package to link it in.

go
import (
    "context"

    "gocloud.dev/pubsub"
    _ "gocloud.dev/pubsub/<driver>"
)
...
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "<driver-url>")
if err != nil {
    return fmt.Errorf("could not open topic: %v", err)
}
defer topic.Shutdown(ctx)
// topic is a *pubsub.Topic; see usage below
...

See [Concepts: URLs][] for general background and the [guide below][] for URL usage for each supported service.

Alternatively, if you need fine-grained control over the connection settings, you can call the constructor function in the driver package directly (like gcppubsubv2.OpenTopic).

go
import "gocloud.dev/pubsub/<driver>"
...
topic, err := <driver>.OpenTopic(...)
...

You may find the [wire package][] useful for managing your initialization code when switching between different backing services.

See the [guide below][] for constructor usage for each supported service.

[guide below]: {{< ref "#services" >}} [*pubsub.Topic]: https://godoc.org/gocloud.dev/pubsub#Topic [pubsub.OpenTopic]: https://godoc.org/gocloud.dev/pubsub#OpenTopic ["blank import"]: https://golang.org/doc/effective_go.html#blank_import [Concepts: URLs]: {{< ref "/concepts/urls.md" >}} [wire package]: http://github.com/google/wire

Sending Messages on a Topic {#sending}

Sending a message on a Topic looks like this:

{{< goexample src="gocloud.dev/pubsub.ExampleTopic_Send" imports="0" >}}

Note that the semantics of message delivery can vary by backing service.

Other Usage Samples

Supported Pub/Sub Services {#services}

Google Cloud Pub/Sub {#gcp}

The Go CDK can publish to a Google Cloud Pub/Sub topic. The URLs use the project ID and the topic ID.

pubsub.OpenTopic will use Application Default Credentials; if you have authenticated via gcloud auth application-default login, it will use those credentials. See Application Default Credentials to learn about authentication alternatives, including using environment variables.

{{< goexample "gocloud.dev/pubsub/gcppubsubv2.Example_openTopicFromURL" >}}

Google Cloud Pub/Sub Constructor {#gcp-ctor}

The gcppubsubv2.OpenTopic constructor opens a Cloud Pub/Sub topic. You must first obtain GCP credentials and then create a gRPC connection to Cloud Pub/Sub. (This gRPC connection can be reused among topics.)

{{< goexample "gocloud.dev/pubsub/gcppubsubv2.ExampleOpenTopic" >}}

Amazon Simple Notification Service {#sns}

The Go CDK can publish to an Amazon Simple Notification Service (SNS) topic. SNS URLs in the Go CDK use the Amazon Resource Name (ARN) to identify the topic. You should specify the region query parameter to ensure your application connects to the correct region.

It will create an AWS Config based on the AWS SDK V2; see [AWS V2 Config][] to learn more.

{{< goexample "gocloud.dev/pubsub/awssnssqs.Example_openSNSTopicFromURL" >}}

SNS messages are restricted to UTF-8 clean payloads. If your application sends a message that contains non-UTF-8 bytes, then the Go CDK will automatically Base64 encode the message and add a base64encoded message attribute. When subscribing to messages on the topic through the Go CDK, these will be [automatically Base64 decoded][SQS Subscribe], but if you are receiving messages from a topic in a program that does not use the Go CDK, you may need to manually Base64 decode the message payload.

[SQS Subscribe]: {{< relref "./subscribe.md#sqs" >}} [AWS V2 Config]: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/

Amazon SNS Constructor {#sns-ctor}

The awssnssqs.OpenSNSTopic constructor opens an SNS topic. You must first create an AWS Config with the same region as your topic:

{{< goexample "gocloud.dev/pubsub/awssnssqs.ExampleOpenSNSTopic" >}}

Amazon Simple Queue Service {#sqs}

The Go CDK can publish to an Amazon [Simple Queue Service][SQS] (SQS) topic. SQS URLs closely resemble the the queue URL, except the leading https:// is replaced with awssqs://. You can specify the region query parameter to ensure your application connects to the correct region, but otherwise pubsub.OpenTopic will use the region found in the environment variables or your AWS CLI configuration.

{{< goexample "gocloud.dev/pubsub/awssnssqs.Example_openSQSTopicFromURL" >}}

SQS messages are restricted to UTF-8 clean payloads. If your application sends a message that contains non-UTF-8 bytes, then the Go CDK will automatically Base64 encode the message and add a base64encoded message attribute. When subscribing to messages on the topic through the Go CDK, these will be [automatically Base64 decoded][SQS Subscribe], but if you are receiving messages from a topic in a program that does not use the Go CDK, you may need to manually Base64 decode the message payload.

[SQS Subscribe]: {{< relref "./subscribe.md#sqs" >}} [SQS]: https://aws.amazon.com/sqs/

Amazon SQS Constructor {#sqs-ctor}

The awssnssqs.OpenSQSTopic constructor opens an SQS topic. You must first create an AWS Config with the same region as your topic:

{{< goexample "gocloud.dev/pubsub/awssnssqs.ExampleOpenSQSTopic" >}}

Azure Service Bus {#azure}

The Go CDK can publish to an Azure Service Bus topic. The URL for publishing is the topic name. pubsub.OpenTopic will use the environment variable SERVICEBUS_CONNECTION_STRING to obtain the Service Bus connection string. The connection string can be obtained from the Azure portal.

{{< goexample "gocloud.dev/pubsub/azuresb.Example_openTopicFromURL" >}}

Azure Service Bus Constructor {#azure-ctor}

The azuresb.OpenTopic constructor opens an Azure Service Bus topic. You must first connect to the topic using the Azure Service Bus library and then pass it to azuresb.OpenTopic. There are also helper functions in the azuresb package to make this easier.

{{< goexample "gocloud.dev/pubsub/azuresb.ExampleOpenTopic" >}}

RabbitMQ {#rabbitmq}

The Go CDK can publish to an AMQP 0.9.1 fanout exchange, the dialect of AMQP spoken by RabbitMQ. A RabbitMQ URL only includes the exchange name. The RabbitMQ's server is discovered from the RABBIT_SERVER_URL environment variable (which is something like amqp://guest:guest@localhost:5672/).

{{< goexample "gocloud.dev/pubsub/rabbitpubsub.Example_openTopicFromURL" >}}

RabbitMQ Constructor {#rabbitmq-ctor}

The rabbitpubsub.OpenTopic constructor opens a RabbitMQ exchange. You must first create an *amqp.Connection to your RabbitMQ instance.

{{< goexample "gocloud.dev/pubsub/rabbitpubsub.ExampleOpenTopic" >}}

NATS {#nats}

The Go CDK can publish to a NATS subject. A NATS URL only includes the subject name. The NATS server is discovered from the NATS_SERVER_URL environment variable (which is something like nats://nats.example.com).

{{< goexample "gocloud.dev/pubsub/natspubsub.Example_openTopicFromURL" >}}

Because NATS does not natively support metadata, messages sent to NATS will be encoded with gob.

NATS Constructor {#nats-ctor}

The natspubsub.OpenTopic constructor opens a NATS subject as a topic. You must first create an *nats.Conn to your NATS instance.

{{< goexample "gocloud.dev/pubsub/natspubsub.ExampleOpenTopic" >}}

Kafka {#kafka}

The Go CDK can publish to a Kafka cluster. A Kafka URL only includes the topic name. The brokers in the Kafka cluster are discovered from the KAFKA_BROKERS environment variable (which is a comma-delimited list of hosts, something like 1.2.3.4:9092,5.6.7.8:9092).

{{< goexample "gocloud.dev/pubsub/kafkapubsub.Example_openTopicFromURL" >}}

Kafka Constructor {#kafka-ctor}

The kafkapubsub.OpenTopic constructor opens a Kafka topic to publish messages to. Depending on your Kafka cluster configuration (see auto.create.topics.enable), you may need to provision the topic beforehand.

In addition to the list of brokers, you'll need a *sarama.Config, which exposes many knobs that can affect performance and semantics; review and set them carefully. kafkapubsub.MinimalConfig provides a minimal config to get you started.

{{< goexample "gocloud.dev/pubsub/kafkapubsub.ExampleOpenTopic" >}}

In-Memory {#mem}

The Go CDK includes an in-memory Pub/Sub provider useful for local testing. The names in mem:// URLs are a process-wide namespace, so subscriptions to the same name will receive messages posted to that topic. This is detailed more in the [subscription guide][subscribe-mem].

{{< goexample "gocloud.dev/pubsub/mempubsub.Example_openTopicFromURL" >}}

[subscribe-mem]: {{< ref "./subscribe.md#mem" >}}

In-Memory Constructor {#mem-ctor}

To create an in-memory Pub/Sub topic, use the mempubsub.NewTopic function. You can use the returned topic to create in-memory subscriptions, as detailed in the [subscription guide][subscribe-mem-ctor].

{{< goexample "gocloud.dev/pubsub/mempubsub.ExampleNewTopic" >}}

[subscribe-mem-ctor]: {{< ref "./subscribe.md#mem-ctor" >}}