publish-subscribe/README.md
Defines a one-to-many dependency between objects, enabling automatic notification of multiple subscribers when a publisher's state changes or an event occurs.
Real-world example
An analogous real-world example of the Publish-Subscribe pattern is a news broadcasting system. A news agency (publisher) broadcasts breaking news stories without knowing who specifically receives them. Subscribers, such as television stations, news websites, or mobile news apps, independently decide which types of news they want to receive (e.g., sports, politics, weather) and are automatically notified whenever relevant events occur. This approach keeps the news agency unaware of subscribers' specifics, allowing flexible and scalable distribution of information.
In plain words
The Publish-Subscribe design pattern allows senders (publishers) to broadcast messages to multiple receivers (subscribers) without knowing who they are, enabling loose coupling and asynchronous communication in a system.
Wikipedia says
In software architecture, publish–subscribe or pub/sub is a messaging pattern where publishers categorize messages into classes that are received by subscribers. This is contrasted to the typical messaging pattern model where publishers send messages directly to subscribers. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are. Publish–subscribe is a sibling of the message queue paradigm, and is typically one part of a larger message-oriented middleware system. Most messaging systems support both the pub/sub and message queue models in their API; e.g., Java Message Service (JMS).
Sequence diagram
First, we identify events that trigger the publisher-subscriber interactions. Common examples include:
We start with a simple message class encapsulating the information sent from publishers to subscribers.
public record Message(Object content) {
}
A Topic represents an event category that subscribers can register to and publishers can publish messages to. Each topic has:
Subscribers can dynamically subscribe or unsubscribe.
@Getter
@Setter
@RequiredArgsConstructor
public class Topic {
private final String topicName;
private final Set<Subscriber> subscribers = new CopyOnWriteArraySet<>();
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}
public void removeSubscriber(Subscriber subscriber) {
subscribers.remove(subscriber);
}
public void publish(Message message) {
for (Subscriber subscriber : subscribers) {
CompletableFuture.runAsync(() -> subscriber.onMessage(message));
}
}
}
The Publisher maintains a collection of topics it can publish to.
public class PublisherImpl implements Publisher {
private static final Logger logger = LoggerFactory.getLogger(PublisherImpl.class);
private final Set<Topic> topics = new HashSet<>();
@Override
public void registerTopic(Topic topic) {
topics.add(topic);
}
@Override
public void publish(Topic topic, Message message) {
if (!topics.contains(topic)) {
logger.error("This topic is not registered: {}", topic.getName());
return;
}
topic.publish(message);
}
}
Subscribers implement an interface that handles incoming messages.
public interface Subscriber {
void onMessage(Message message);
}
Subscriber examples:
Here's how all components connect:
public static void main(String[] args) throws InterruptedException {
final String topicWeather = "WEATHER";
final String topicTemperature = "TEMPERATURE";
final String topicCustomerSupport = "CUSTOMER_SUPPORT";
// 1. create the publisher.
Publisher publisher = new PublisherImpl();
// 2. define the topics and register on publisher
Topic weatherTopic = new Topic(topicWeather);
publisher.registerTopic(weatherTopic);
Topic temperatureTopic = new Topic(topicTemperature);
publisher.registerTopic(temperatureTopic);
Topic supportTopic = new Topic(topicCustomerSupport);
publisher.registerTopic(supportTopic);
// 3. Create the subscribers and subscribe to the relevant topics
// weatherSub1 will subscribe to two topics WEATHER and TEMPERATURE.
Subscriber weatherSub1 = new WeatherSubscriber();
weatherTopic.addSubscriber(weatherSub1);
temperatureTopic.addSubscriber(weatherSub1);
// weatherSub2 will subscribe to WEATHER topic
Subscriber weatherSub2 = new WeatherSubscriber();
weatherTopic.addSubscriber(weatherSub2);
// delayedWeatherSub will subscribe to WEATHER topic
// NOTE :: DelayedWeatherSubscriber has a 0.2 sec delay of processing message.
Subscriber delayedWeatherSub = new DelayedWeatherSubscriber();
weatherTopic.addSubscriber(delayedWeatherSub);
// subscribe the customer support subscribers to the CUSTOMER_SUPPORT topic.
Subscriber supportSub1 = new CustomerSupportSubscriber();
supportTopic.addSubscriber(supportSub1);
Subscriber supportSub2 = new CustomerSupportSubscriber();
supportTopic.addSubscriber(supportSub2);
// 4. publish message from each topic
publisher.publish(weatherTopic, new Message("earthquake"));
publisher.publish(temperatureTopic, new Message("23C"));
publisher.publish(supportTopic, new Message("[email protected]"));
// 5. unregister subscriber from TEMPERATURE topic
temperatureTopic.removeSubscriber(weatherSub1);
// 6. publish message under TEMPERATURE topic
publisher.publish(temperatureTopic, new Message("0C"));
/*
* Finally, we wait for the subscribers to consume messages to check the output.
* The output can change on each run, depending on how long the execution on each
* subscriber would take
* Expected behavior:
* - weatherSub1 will consume earthquake and 23C
* - weatherSub2 will consume earthquake
* - delayedWeatherSub will take longer and consume earthquake
* - supportSub1, supportSub2 will consume [email protected]
* - the message 0C will not be consumed because weatherSub1 unsubscribed from TEMPERATURE topic
*/
TimeUnit.SECONDS.sleep(2);
}
Output may vary due to asynchronous subscriber processing:
14:01:45.599 [ForkJoinPool.commonPool-worker-6] INFO com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber -- Customer Support Subscriber: 1416331388 sent the email to: [email protected]
14:01:45.599 [ForkJoinPool.commonPool-worker-4] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber -- Weather Subscriber: 1949521124 issued message: 23C
14:01:45.599 [ForkJoinPool.commonPool-worker-2] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber -- Weather Subscriber: 60629172 issued message: earthquake
14:01:45.599 [ForkJoinPool.commonPool-worker-5] INFO com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber -- Customer Support Subscriber: 1807508804 sent the email to: [email protected]
14:01:45.599 [ForkJoinPool.commonPool-worker-1] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber -- Weather Subscriber: 1949521124 issued message: earthquake
14:01:47.600 [ForkJoinPool.commonPool-worker-3] INFO com.iluwatar.publish.subscribe.subscriber.DelayedWeatherSubscriber -- Delayed Weather Subscriber: 2085808749 issued message: earthquake
This demonstrates:
Benefits:
Trade-offs: