docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_设计.md
| 时间 | 主要内容 | 作者 |
|---|---|---|
| 2021-11-01 | 初稿,包括背景、目标、SOT定义与持久化、SOT生命周期、SOT的使用、API逻辑修改、问题与风险 | dongeforever |
| 2021-11-15 | 修改 LogicQueue 的定义,不要引入新字段,完全复用旧的MessageQueue;RemappingStaticTopic时,不要迁移『位点』『幂等数据』等,而是采用Double-Check-Read 的机制 | dongforever |
| 2021-12-01 | 更新问题与风险,增加关于一致性、OutOfRange、拉取中断的详细说明 | dongforever |
| 2021-12-03 | 增加代码走读的说明 | dongforever |
| 2021-12-10 | 引入Scope概念,保留『多集群动态零耦合』的集群设计模型 | dongforever |
| 2021-12-23 | 梳理待完成事项;讨论Admin接口的适配方式 | dongforever |
| 2021-01-05 | Offset存储改成『转换制』,以更好适配原有逻辑 | dongforever |
中文文档在描述特定专业术语时,仍然使用英文。
StaticTopic/LogicQueue 本质上是解决『固定队列数量』的需求。 这个需求是不是必需的呢,如果是做应用集成,则可能不是必需的,但如果是做数据集成,则是必需的。
固定队列数量,首先可以解决『顺序性』的问题。 在应用集成场景下,应用是无需感知到队列的,只要MQ能保证按顺序投递给应用即可,MQ底层队列数量如何变化,对应用来说是不关心。比如,MQ之前的那套『禁读禁写』就是可以玩转的。
但在数据集成场景中,队列或者叫『分片』,是要暴露给客户端的,客户端所有的数据计算场景,都是基于『分片』来进行的,如果『分片』里的数据,发生了错乱,则计算结果都是错误的。比如,计算WordCount,源数据经过预处理之后,按key写入清洗后的Topic,然后计算侧根据清洗的结果,按照分片来并行计算。如果分片发生变化,则整个清洗逻辑,需要重新处理。
有人可能会反驳,说计算组件清洗后,可以以批的方式写入其它存储组件。这当然是可以的,但如果是这样,MQ的价值就纯粹是一个『源头』价值,而不是『通道』价值。
MQ要想成为一个『数据通道』,则必需要具备可以让计算组件『回写』数据的能力,具备存储『Clean Data』的能力,这样才让MQ有可能在数据集成领域站稳脚跟。
如果是 RocketMQ Streams 这种轻量化的组件,则『回写』会更频繁,更重要。
除此之外,『固定队列数据』对于,RocketMQ 自身后续的发展,也是至关重要的:
『固定队列数量』对于RocketMQ挺进『数据集成』这个领域,有着不可或缺的作用。 LogicQueue的思路就是为了解决这一问题。
提供『Static Topic』的特性。 引入以下核心概念:
『Static Topic』拥有固定的分片数量,每个分片称之为『Logic Queue』。 每个『Logic Queue』由多个『Physical Queue』进行纵向分段映射组成。
引入以下非核心概念,对用户无感知,但对于讨论问题非常重要:
单集群固定 和 全网固定,参考 The_Scope_Of_Static_Topic。
在客户端,LogicQueue 与 Physical Queue 使用体感上没有任何区别,使用一样的概念和对象,遵循一样的语义。 在服务端,针对 LogicQueue 去适配相关的API。
RocketMQ Physical Queue 含有以下语义:
LogicQueue 需要保障的语义:
LogicQueue 可以不保障的语义:
offset连续,是一个应该尽量保证的语义,可以允许有少量空洞,但不应该出现大面积不连续的位点。 offset不连续最直接的问题就是:
但只要空洞不是大量频繁出现的,那么也是问题不大的。
单机队列编号连续,除了在注册元数据时,可以简约部分字节外,没有其它实际用处,可以不保证。 当前客户端使用到『单机队列编号连续』这个特点的场景主要有:
当前 MessageQueue 的定义如下
private String topic;
private String brokerName;
private int queueId;
LogicQueue需要对客户直接暴露,为了保证使用习惯一致,采用同样的定义,其中 queueId相当于全局Id,而brokerName 固定如下:
MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME = "__logical_queue_broker__";
此时,brokerName没有实际含义,但可以用来识别是否是LogicQueue。
采用此种定义,对于客户端内部的实现习惯改变如下:
具体改法是MQClientInstance中维护一个映射关系
private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>> topicEndPointsTable = new ConcurrentHashMap<>();
基本目标与定义清楚了,接下来的设计,从 Source of Truth 开始。
LogicQueue 的 Source of Truth 就是 LogicQueue 到 Physical Queue 的映射关系。 只要这个映射关系不丢失,则整个系统的状态都是可以恢复的。 反之,整个系统可能陷入混乱。
这个SOT,命名为 TopicQueueMapping。
{
"version":"1",
"bname": "broker02" //标记这份数据的原始存储位置,如果发送误拷贝,可以利用这个字段来进行标识
"epoch": 0, //标记修改版本,用来做一致性校验
"totalQueues":"50", //当前Topic 总共有多少 LogicQueues
"hostedQueues": { //当前Broker 所拥有的 LogicQueues
"3" : [
{
"queue":"0",
"bname":"broker01"
"gen":"0", //标记切换代次
"logicOffset":"0", //logicOffset的起始位置
"startOffset":"0", // 物理offset的起始位置
"endOffset":"1000" // 可选,物理offset的最大位置,可以根据上下文算出来
"timeOfStart":"1561018349243" //可选,用来优化时间搜索
"timeOfEnd":"1561018349243" //可选,用来优化时间搜索
"updateTime":"1561018349243" //可选,记录更新的时间
},
{
"queue":"0",
"bname":"broker02",
"gen":"1", //可选,标记切换代次
"logicOffset":"1000", //logicOffset的起始位置
"startOffset":"0", // 物理offset的起始位置
"endOffset":"-1" // 可选,物理offset的最大位置,可以根据上下文算出来,最新的一个应该是活跃的
"timeOfStart":"1561018349243" //可选,用来优化时间搜索
"timeOfEnd":"1561018349243" //可选,用来优化时间搜索
"updateTime":"1561018349243" //可选,记录更新的时间
}
]
}
}
上述示例的含义是:
『拥有』的定义是指,Leader Queue 在当前Broker。注意,在实现时,也会把Second Leader Queue存储下来作为备份。
注意以下要点:
RocketMQ 没有中心化的元数据存储,那就遵循『Leader Completeness』原则。 对于每个逻辑队列,把所有映射关系存储在『最新队列所在的Broker上面』,最新队列,其实也是可写队列。 Leader Completeness,避免了数据的切割,对于后续其它操作有极大的便利。
对于每个Static Topic,在每个Broker都应该拥有一份『TopicQueueMapping』,每份都带有Epoch。 在创建和更新时,要对已有数据进行完备性校验,如果发现不完备,则说明上次操作失败,或者部分Broker数据丢失,应该先修复再操作。
注意: 即使当前Broker不拥有任何 LogicQueue 或者 PhysicalQueue,也应该存储一份,以做校验。 假设某个Static Topic只拥有1个Logic Queue,而对应的Broker整好宕机,则此时可以根据其它Broker的信息判断出该Topic不完备。
由于 RocketMQ 很多的运维习惯,都是直接拷贝 Topics.json 到别的机器进行部署的。 而 TopicQueueMapping 是 Broker 相关的,如果把 TopicQueueMapping 从一个Broker拷贝到另一个Broker,则会造成SOT冲突。
在设计上,TopicQueueMapping 采取独立文件,避免冲突。 在格式上,queue 里面要写上 bname,以具备自我识别能力,这样即使误拷贝到另一台机器,可以识别并报错,进行忽略即可。
映射关系的创建,第一期应该只由 MQAdmin 来进行操作。 后续,可以考虑引入自动化组件。 这里的要点是:
更多细节在API逻辑修改里面
按照 『Leader Completeness』原则进行存储。
如果为了保证严格顺序,则应该采取『禁旧再切新』的原则:
如果为了保证最高可用性,则应该采取『切新禁旧再切新』:
切换失败处理:
有两部分信息需要清除
SOT存储在Broker上,所以使用从 Broker开始。
在 RegisterBrokerBody 中,需要带上两个信息:
异常情况需要考虑,假如本 Broker 不拥有任何 LogicQueue 呢?依然需要带上 totalQueueNum 这个信息。 注意,不需要带上所有的映射关系,否则Nameserver很快会被打爆。
原先的 QueueData 增加2个字段:
如果 QueueData 里面 totalQueues 的值 > 0 则认为是逻辑队列,在客户端解析时要进行判断。
遗留问题: 是否需要尊重 readQueueNums 和 writeQueueNums ? 在LogicQueue这里,这个场景是没有意义的,但依然保持尊重。
改动两个方法即可:
注意,逻辑队列要求队列数是固定,如果发现,解析完之后,存在部分队列空洞,要用虚拟Broker值进行补全。 Producer 侧如果要对无 key 场景进行优化,可以通过虚拟Broker值来判断,当前队列是不可用的。 对于key场景,应该让客户端报错。
LogicQueue是为了解决『Static Sharding』的问题。对于客户来说,『LogicQueue』是手段,『Static』才是目的。本着『用户知晓目的,开发者才需要关心手段』的原则,对用户应该只暴露『Static』的概念。所有QueueMapping的生命周期维护,应该都对用户透明。
新增UpdateStaticTopic命令,对应RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC=513,主要参数是:
UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限于执行以下逻辑:
迁移动作不引入新命令,计算好之后,执行UPDATE_AND_CREATE_STATIC_TOPIC即可. 主要参数:
基本操作流程:
其中第二步,数据量可能会很大,导致迁移周期非常长,且不是并发安全的。 但这些数据,都是覆盖型的,因此可以改造成不迁移的方式,而是在API层做兼容,也即『Double-Read-Check』机制:
将来实现的幂等逻辑,也是类似。
服务端判断『StaticTopic』,禁止该命令进行修改。
复用现有逻辑,对于 StaticTopic,执行必要的清除动作。
复用现有逻辑,同时展现『Logical Queue』和『Physical Queue』。
分段映射,执行远程读,在返回消息时,不进行offset转换,而是返回 OffsetDelta 变量,由客户端进行转换。 这里的方式,类似于Batch。
寻找映射关系,读最早的队列的MinOffset
本机读,转换成logicOffset即可。
需要分段查找。 如果要优化查找速度,应该在映射关系里面,插入时间戳。
Offset的存储,进行转换,存储在对应PhysicalQueue 所在的 Broker上面。
读取时,采取『Double-Read-Check』机制,并进行转换。
这样可以最大程度与 PhysicalQueue 的相关逻辑进行适配,比如 ConsumerProgress 可以看到『最近拉取时间』。
如果要使用StaticTopic,则需要升级Client、Broker、Nameserver。
RocketMQ 没有引入中心化的存储组件,那么如何保证 SOT 的全局一致性呢? 主要利用两个信息
在更新或者切换时,获取所有Broker上的 TopicQueueMapping,校验 epoch 和 totalQueues,并且根据 TopicQueueMapping 可以完整地构建出对应的Logic Queue,则说明数据是完整一致的。
如果发现数据不一致,可能是以下因素引入的:
应该要先修复数据,再执行 更新或切换 操作
Target Brokers 是指拥有 LogicQueue 的 Broker。
考察1个场景,如果某个Topic 只有1个 LogicQueue,而拥有这个 LogicQueue 的 Broker 正好宕机了。此时去更新 Topic,会不会误认为该 Topic 不存在?
解决这个问题的办法是引入 No Target Brokers,也即集群中除去『Target Brokers』之外的 Broker。
对于 No Target Broker,依然需要写入一份 TopicQueueMapping,带上 epoch 和 totalQueues,但不拥有任何 LogicQueue。
有了这个信息之后,在进行一致性校验时,就可以识别出上述场景。
尤其要注意,如果 Nameserver 中没有任何信息,则需要主动去所有 Broker 拉取一遍。
logicOffset的决策,依赖于上一个 PhysicalQueue 的最大位点。
此时,要么跳跃位点,要么等待上一个 PhysicalQueue 确保已经禁写。
当前实现,为了保障高可用,采用『切新禁旧再切新』的方式,同时跳跃位点。
此时,可以写,但返回给 客户端的 offset 也是-1。
此时,不可以读最新 PhysicalQueue。
需要非常小心触发位点被重置:
目前只允许,SendMessage和GetMin时,返回-1。其余场景,要严格校验并报错。
如果允许1个 PhysicalQueue 被重复利用,也即多段映射给多个 LogicQueue,或者从非0开始映射。
会带来以下麻烦:
当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++。
后续实现,可以考虑复用已经被清除掉的Physical,也即已经没有数据,位点从0开始。
当前,admin操作都是要求在Master操作的。因此,没有这个问题。
Command操作时,提前预判Master是否存在,如果不存在,则提前报错,减少中间失败率。
以下情况会影响 OutOfRange 的判断
所以,OutOfRange 的判断,遵循以下原则:
如果没有恰当地处理 OFFSET_MOVED,可能造成位点被重置。
需要注意,这个地方,产生了对 PullMessageResponseHeader 中 minOffset 和 maxOffset 的强依赖。 在次此之前,这两个信息,只对客户端的限流有作用,对业务没有实际的影响。
当1个 PhysicalQueue 被拉取干净时,需要修正 nextBeginOffset 到下一个 PhysicalQueue。 如果没有处理好,则直接会导致拉取中断,无法前进。
类似于Batch,由客户端设置,避免服务端解开消息:
在PullResultExt中新增字段 offsetDelta。
User 接口,使用范围广泛如多语言等,应该尽可能简单,把适配逻辑做在服务端,对『客户端』透明。
那么 Admin 接口呢,比如 examineTopicStats,适配逻辑是做在『服务端』还是『客户端』?
一个 Admin 接口,通常需要访问所有 Broker 的所有队列。
如果做在服务端,则可能产生交叉访问,在极端情况下,性能会非常差,举个例子:
100 个 Broker,相互交叉映射过一遍,则Admin客户端首先要向 100 个 Broker 发请求,然后这 100 个 Broker 为了获取远程信息,各自向其余 Broker 发请求。
其实际网络请求数就是 100 * 100 = 10000 个网络请求。放大效应十分明显。
同时,考虑到 Admin 接口,使用范围是有限的,无需考虑多语言适配等问题,可以把适配逻辑做在 Admin 客户端。
从实战经验来看,性能损耗几乎不计。
利用新的创建命令进行隔离。
目前的实现里,消费Send Back,是直接传回Commitlog Pos,这个在LogicQueue里行不通。 需要修改API,改成传回『Logic Queue Offset』。
二阶消息,也即『原始消息』存储在『系统Topic』中,需要经过一轮『Read-ReWrite』逻辑才会被用户看见的消息。 例如,定时消息,事务消息。 二阶消息需要支持远程读写操作。 一期的LogicQueue不支持『二阶消息』。
旧的客户端无法解析逻辑路由,但可以识别物理路由。如果错误使用,则会影响映射关系的准确性。
不兼容 事务消息和定时消息。
LogicQueue 当前不支持Pop模式消费。
目前没有处理Nameserver中Mapping数据的生命周期
可能导致 LogicQueue 无法清除已经过期的 MappingItem。
可能导致 LogicQueue 时间搜索不准确。需要专项修复。
当前的更新机制太慢。且访问『不存在Broker』时,会频繁访问Nameserver,有打爆Nameserver的风险。
位点相关的消息可能不在本机,需要远程访问。
当前没有适配。重置位点,可能会得到不符合预期的结果。
当前只是产生遗留数据,占用一点点存储空间,没有太大影响。
如果将来要实现 物理 Queue 复用,则需要先完善相关逻辑。
主要看两个类:
UpdateStaticTopicSubCommand
RemappingStaticTopicSubCommand
主要看:
TopicQueueMappingManager
重点关注:
MQClientInstance.updateTopicRouteInfoFromNameServer
以 SendMessageProcessor 为例,插桩代码普遍是以下风格:
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult);
}
其它Processor类似
rocketmq-test模块,statictopic目录。