docs/cn/Example_Compaction_Topic_cn.md
CompactionTopic依赖顺序消息来保障一致性
$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+cleanup.policy=COMPACTION}]
与普通消息一样
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "ctopic";
String tag = "tag1";
String key = "key1";
Message msg = new Message(topic, tag, key, "bodys".getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}, key);
System.out.printf("%s%n", sendResult);
消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据 在compaction场景下,大部分消费都是从0开始消费完整的数据
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setPullThreadNums(4);
consumer.start();
Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
consumer.assign(messageQueueList);
messageQueueList.forEach(mq -> {
try {
consumer.seekToBegin(mq);
} catch (MQClientException e) {
e.printStackTrace();
}
});
Map<String, byte[]> kvStore = Maps.newHashMap();
while (true) {
List<MessageExt> msgList = consumer.poll(1000);
if (CollectionUtils.isNotEmpty(msgList)) {
msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
}
}
//use the kvStore