Back to Watermill

Requeuing After Error

docs/content/advanced/requeuing-after-error.md

1.5.11.9 KB
Original Source

+++ title = "Requeuing After Error" description = "How to requeue a message after it fails to process" weight = -20 draft = false bref = "How to requeue a message after it fails to process" +++

When a message fails to process (a nack is sent), it usually blocks other messages on the same topic (within the same consumer group or partition).

Depending on your setup, it may be useful to requeue the failed message back to the tail of the queue.

Consider this if:

  • You don't care about the order of messages.
  • Your system isn't resilient to blocked messages.

Requeuer

The Requeuer component is a wrapper on the Router that moves messages from one topic to another.

{{% load-snippet-partial file="src-link/components/requeuer/requeuer.go" first_line_contains="type Config" last_line_contains="}" %}}

A trivial usage can look like this. It requeues messages from one topic to the same topic after a delay.

{{< callout "danger" >}} Using the delay this way is not recommended, as it blocks the entire requeue process for the given time. {{< /callout >}}

go
req, err := requeuer.NewRequeuer(requeuer.Config{
    Subscriber:     sub,
    SubscribeTopic: "topic",
    Publisher:      pub,
    GeneratePublishTopic: func(params requeuer.GeneratePublishTopicParams) (string, error) {
        return "topic", nil
    },
    Delay: time.Millisecond * 200,
}, logger)
if err != nil {
	return err
}

err := req.Run(context.Background())
if err != nil {
    return err
}

A better way to use the Requeuer is to combine it with the Poison middleware. The middleware moves messages to a separate "poison" topic. Then, the requeuer moves them back to the original topic based on the metadata.

You combine this with a Pub/Sub that supports delayed messages. See the full example based on PostgreSQL.