pip/pip-13.md
The consumer needs to handle subscription to topics represented by regular expressions. The scope is namespace in first stage, all topics/patten should be targeted in same namespace, This will make easy authentication and authorization control.
At last, we should add and implementation a serials of new methods in PulsarClient.java
Consumer subscribe(Collection<String> topics, String subscription);
Consumer subscribe(Pattern topicsPattern, String subscription);
The goals the should be achieved are these below, we could achieve it one by one:
This will need a new implementation of ConsumerBase which wrapper over multiple single-topic-consumers, let’s name it as TopicsConsumerImpl.
When user call new method
Consumer subscribe(Collection<String> topics, String subscription);
It will iteratively new a ConsumerImpl for each topic, and return a TopicsConsumerImpl. The main work is:
TopicsConsumerImpl class should provide implementation of abstract methods in ConsumerBase, Should also provide some specific methods such as:// maintain a map for all the <Topic, Consumer>, after we subscribe all the topics.
private final ConcurrentMap<String, ConsumerImpl> consumers = new ConcurrentHashMap<>();
// get topics
Set<String> getTopics();
// get consumers
List<ConsumerImpl> getConsumers();
// subscribe a topic
void subscribeTopic(String topic);
// unSubscribe a topic
void unSubscribeTopic(String topic);
ConsumerBase, because we have to add MessageId with additional String topic or consumer id, Or we may need to change MessageIdData in PulsarApi.proto.As mentioned before, the scope is namespace. The main work is:
TopicsConsumerImpl class, need to keep the Pattern, which was passed in from api for subscription.getList to get a list of Topics.
In interface PersistentTopics :List<String> getList(String namespace) throws PulsarAdminException;
List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException;
Consumer subscribe(String namespace, Pattern topicsPattern, String subscription) should be like this:List<String> getList(String namespace) to get all the topics;topicsPattern to filter out the matched sub-topics-list.TopicsConsumerImpl with the the sub-topics-list.The main work is:
Interface TopicsChangeListener {
// unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics.
void onTopicsRemoved(Collection<String> topics);
// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`.
void onTopicsAdded(Collection<String> topics);
}
Add a method void registerListener(TopicsChangeListener listener) to TopicsConsumerImpl
List<String> getList(String namespace). And comparing the filtered fresh sub-topics-list with current topics holden in TopicsConsumerImpl, try to get 2 lists: newAddedTopicsList and removedTopicsList.TopicsChangeListener.onTopicsAdded(newAddedTopicsList), and TopicsChangeListener.onTopicsRemoved(removedTopicsList) to do subscribe and unsubscribe, and update consumers map in TopicsConsumerImpl.The changes will be mostly on the surface and on client side:
org.apache.pulsar.client.api.PulsarClient.javaConsumer subscribe(Collection<String> topics, String subscription);
Consumer subscribe(Pattern topicsPattern, String subscription);
Consumer, which is TopicsConsumerImpl , returned by above subscribe method