docs/content/pubsubs/aws.md
+++ title = "Amazon AWS SNS/SQS" description = "AWS SQS and SNS are fully-managed message queuing and Pub/Sub-like services that make it easy to decouple and scale microservices, distributed systems, and serverless applications." date = 2024-10-19T15:30:00+02:00 bref = "AWS SQS and SNS are fully-managed message queuing and Pub/Sub-like services that make it easy to decouple and scale microservices, distributed systems, and serverless applications." weight = 10 +++
AWS SQS and SNS are fully-managed message queuing and Pub/Sub-like services that make it easy to decouple and scale microservices, distributed systems, and serverless applications.
Watermill provides a simple way to use AWS SQS and SNS with Go. It handles all the AWS SDK internals and provides a simple API to publish and subscribe messages.
Official Documentation:
You can find a fully functional example with AWS SNS in the Watermill examples:
go get github.com/ThreeDotsLabs/watermill-aws
While both SQS and SNS are messaging services provided by AWS, they serve different purposes and are best suited for different scenarios in your Watermill applications.
To use SNS as a Pub/Sub (to have multiple subscribers receiving the same message), you need to create an SNS topic and subscribe to SQS queues.
When a message is published to the SNS topic, it will be delivered to all subscribed SQS queues.
We implemented this logic in the watermill-aws package out of the box.
When you subscribe to an SNS topic, Watermill AWS creates an SQS queue and subscribes to it.
We can say, that a single SQS queue acts as a consumer group or subscription in other Pub/Sub implementations.
The mechanism is detailed in AWS documentation.
Example use case: Processing user uploads in the background.
Example use case: Notifying multiple services about a new user registration.
Our SNS implementation in Watermill automatically creates and manages SQS queues for each subscriber, simplifying the process of using SNS with multiple SQS queues.
Remember, you can use both in the same application where appropriate. For instance, you might use SNS to broadcast events and SQS to process specific tasks triggered by those events.
To learn how SNS and SQS work together, see the How SNS is connected with SQS section.
| Feature | Implements | Note |
|---|---|---|
| ConsumerGroups | no | it's a queue, for consumer groups-like functionality use SNS |
| ExactlyOnceDelivery | no | yes |
| GuaranteedOrder | yes* | from AWS Docs: "(...) due to the highly distributed architecture, more than one copy of a message might be delivered, and messages may occasionally arrive out of order. Despite this, standard queues make a best-effort attempt to maintain the order in which messages are sent." |
| Persistent | yes |
"sqs:ReceiveMessage""sqs:DeleteMessage""sqs:GetQueueUrl""sqs:CreateQueue""sqs:GetQueueAttributes""sqs:SendMessage""sqs:ChangeMessageVisibility"[todo - verify]
{{% load-snippet-partial file="src-link/watermill-aws/sqs/config.go" first_line_contains="type SubscriberConfig struct " last_line_contains="type GenerateCreateQueueInputFunc" %}}
In the Watermill model, we are normalizing the AWS queue url to topic used in the Publish and Subscribe methods.
To give you flexibility of what you want to use as a topic in Watermill, you can customize resolving the queue URL.
{{% load-snippet-partial file="src-link/watermill-aws/sqs/url_resolver.go" first_line_contains="// QueueUrlResolver" last_line_contains="GenerateQueueUrlResolver" %}}
You can implement your own QueueUrlResolver or use one of the provided resolvers.
By default, GetQueueUrlByNameUrlResolver resolver is used:
{{% load-snippet-partial file="src-link/watermill-aws/sqs/url_resolver.go" first_line_contains="// GetQueueUrlByNameUrlResolver " last_line_contains="NewGetQueueUrlByNameUrlResolver" %}}
There are two more resolvers available:
{{% load-snippet-partial file="src-link/watermill-aws/sqs/url_resolver.go" first_line_contains="// GenerateQueueUrlResolver" last_line_contains="}" %}}
{{% load-snippet-partial file="src-link/watermill-aws/sqs/url_resolver.go" first_line_contains="// TransparentUrlResolver" last_line_contains="}" %}}
You may want to use goaws or localstack for local development or testing.
You can override the endpoint using the OptFns option in the SubscriberConfig or PublisherConfig.
package main
import (
amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/ThreeDotsLabs/watermill-amazonsqs/sqs"
)
func main() {
// ...
sqsOpts := []func(*amazonsqs.Options){
amazonsqs.WithEndpointResolverV2(sqs.OverrideEndpointResolver{
Endpoint: transport.Endpoint{
URI: *lo.Must(url.Parse("http://localstack:4566")),
},
}),
}
sqsConfig := sqs.SubscriberConfig{
AWSConfig: cfg,
OptFns: sqsOpts,
}
sub, err := sqs.NewSubscriber(sqsConfig, logger)
if err != nil {
panic(fmt.Errorf("unable to create new subscriber: %w", err))
}
// ...
}
| Feature | Implements | Note |
|---|---|---|
| ConsumerGroups | yes | yes |
| ExactlyOnceDelivery | no | yes |
| GuaranteedOrder | yes* | from AWS Docs: "(...) due to the highly distributed architecture, more than one copy of a message might be delivered, and messages may occasionally arrive out of order. Despite this, standard queues make a best-effort attempt to maintain the order in which messages are sent." |
| Persistent | yes |
sns:Subscribesns:ConfirmSubscriptionsns:Receivesns:Unsubscribeand all permissions required for SQS:
sqs:ReceiveMessagesqs:DeleteMessagesqs:GetQueueUrlsqs:CreateQueuesqs:GetQueueAttributessqs:SendMessagesqs:ChangeMessageVisibilitysqs:SetQueueAttributesAdditionally, if sns.SubscriberConfig.DoNotSetQueueAccessPolicy is not enabled, you should have the following:
sqs:SetQueueAttributes{{% load-snippet-partial file="src-link/watermill-aws/sns/config.go" first_line_contains="type SubscriberConfig struct " last_line_contains="type GenerateSqsQueueNameFn" %}}
Additionally, because SNS Subscriber uses SQS queues as "subscriptions", you need to pass SQS configuration as well.
In the Watermill model, we normalise AWS Topic ARN to the topic used in the Publish and Subscribe methods.
{{% load-snippet-partial file="src-link/watermill-aws/sns/topic.go" first_line_contains="// TopicResolver" last_line_contains="}" %}}
We are providing two out-of-the-box resolvers:
{{% load-snippet-partial file="src-link/watermill-aws/sns/topic.go" first_line_contains="// TransparentTopicResolver" last_line_contains="}" %}}
{{% load-snippet-partial file="src-link/watermill-aws/sns/topic.go" first_line_contains="// GenerateArnTopicResolver" last_line_contains="}" %}}
You may want to use goaws or localstack for local development or testing.
You can override the endpoint using the OptFns option in the SubscriberConfig or PublisherConfig.
package main
import (
amazonsns "github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/ThreeDotsLabs/watermill-amazonsns/sns"
)
func main() {
// ...
snsOpts := []func(*amazonsns.Options){
amazonsns.WithEndpointResolverV2(sns.OverrideEndpointResolver{
Endpoint: transport.Endpoint{
URI: *lo.Must(url.Parse("http://localstack:4566")),
},
}),
}
snsConfig := sns.SubscriberConfig{
AWSConfig: cfg,
OptFns: snsOpts,
}
sub, err := sns.NewSubscriber(snsConfig, sqsConfig, logger)
if err != nil {
panic(fmt.Errorf("unable to create new subscriber: %w", err))
}
// ...
}