Back to Watermill

Fanin

docs/content/advanced/fanin.md

1.5.11.5 KB
Original Source

+++ title = "FanIn (merging topics)" description = "Merging two topics into one with the FanIn component" date = 2023-01-21T12:47:30+01:00 weight = -100 draft = false bref = "Merging two topics into one with the FanIn component" +++

FanIn component

The FanIn component merges two topics into one.

Configuring

{{% load-snippet-partial file="src-link/components/fanin/fanin.go" first_line_contains="type Config struct {" last_line_contains="CloseTimeout time.Duration" padding_after="1" %}}

Running

You need to provide a Publisher and a Subscriber implementation for the FanIn component.

You can find the list of supported Pub/Subs on Supported Pub/Subs page. The Publisher and subscriber can be implemented by different message brokers (for example, you can merge a Kafka topic with a RabbitMQ topic).

go

logger := watermill.NewStdLogger(false, false)

// create Publisher and Subscriber
pub, err := // ...
sub, err := // ...

fi, err := fanin.NewFanIn(
    sub,
    pub,
    fanin.Config{
        SourceTopics: upstreamTopics,
        TargetTopic:  downstreamTopic,
    },
    logger,
)
if err != nil {
    panic(err)
}

if err := fi.Run(context.Background()); err != nil {
    panic(err)
}

Controlling FanIn component

The FanIn component can be stopped by cancelling the context passed to the Run method or by calling the Close method.

{{% load-snippet-partial file="src-link/components/fanin/fanin.go" first_line_contains="func (f *FanIn) Run" last_line_contains=" Close() error" padding_after="2" %}}