docs/high-performance/message-queue/rabbitmq-questions.md
RabbitMQ 作为老牌消息中间件,凭借其成熟的路由机制、丰富的协议支持和完善的可靠性保障,在企业级应用中占据重要地位。但自 RabbitMQ 3.8 引入 Quorum Queue、3.9 引入 Streams、4.0 移除镜像队列以来,其技术架构发生了重大变化,许多传统的最佳实践已不再适用。
本文已针对 RabbitMQ 4.0 进行全面更新,明确标注各特性的版本依赖,特别强调了镜像队列(已移除)、Quorum Queue(推荐)和 Streams(3.9+)的选型差异。
RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP、XMPP、SMTP、STOMP,也正是如此,使得它变得非常重量级,更适合于企业级的开发。它同时实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load Balance)或者数据持久化都有很好的支持。
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。
RabbitMQ 的整体模型架构如下:
下面我会一一介绍上图中的一些概念。
消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。消息体也可以称为 payload,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。
在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。
Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。
RabbitMQ 的 Exchange(交换器) 有 4 种类型,不同的类型对应着不同的路由策略:direct,fanout, topic, 和 headers,不同类型的 Exchange 转发消息的策略有所区别。这个会在介绍 Exchange Types(交换器类型) 的时候介绍到。
注意:AMQP 规范定义了一个默认交换器(Default Exchange),它是一个 pre-declared 的 direct 类型交换器,但创建新交换器时必须显式指定类型,不能省略。
生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定键) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
RabbitMQ 在经典架构中,消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
版本说明(3.9+ 重要更新):从 RabbitMQ 3.9 版本开始,官方引入了 Streams 数据结构。Streams 提供了一种类似 Kafka 的 append-only 日志存储模型,支持非破坏性消费、大规模消息堆积以及基于 Offset 的历史数据重放(Replay)。
架构选型建议:
- 普通队列:适用于传统消息队列场景,消息被消费后即删除
- Streams:适用于需要高频重放、海量堆积或事件溯源的场景
- 核心瓶颈差异:使用 Stream 时,磁盘 I/O 吞吐量(MB/s)取代了传统的每秒入队率(msg/s)成为核心瓶颈指标
多个消费者可以订阅同一个队列,默认情况下队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
注意:实际分发策略受
prefetch_count参数影响。默认行为(prefetch_count=0)会尽可能多地分发消息给各 Consumer,可能导致负载不均。推荐设置prefetch_count=1或更高值,让 Consumer 确认后再发送下一条,实现公平分发。
RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与自定义,这里不予以描述)。
1、fanout(广播模式)
2、direct(直连模式)
error、warning、info)order-service、payment-service)示例:以上图为例,如果发送消息时设置路由键为 "warning",消息会路由到 Queue1 和 Queue2;如果设置路由键为 "info" 或 "debug",消息只会路由到 Queue2。
3、topic(主题模式)
"." 分隔的字符串(如 com.rabbitmq.client、order.china.beijing)"*":匹配一个单词"#":匹配零个或多个单词order.china.* 匹配中国所有地区订单)com.rabbitmq.client、java.util.concurrent)示例:
"com.rabbitmq.client" 的消息会同时路由到绑定 "*.rabbitmq.*" 和 "*.client.#" 的队列"order.china.beijing" 的消息会路由到绑定 "order.china.*" 的队列4、headers(不推荐)
x-match=all(全部匹配)或 x-match=any(任一匹配)RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP、MQTT 等协议)。AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相应的概念。
版本说明:
- AMQP 0-9-1:RabbitMQ 的传统协议,广泛使用,功能完整
- AMQP 1.0:RabbitMQ 4.x 已将其提升为一等公民协议,显著优化了原生 AMQP 1.0 的解析效率,不再需要像旧版本那样通过复杂的插件转换。这提升了与其他消息中间件(如 ActiveMQ、Service Bus)的互操作性,适合需要跨平台集成的场景
- 新项目可考虑使用 AMQP 1.0 以获得更好的跨平台兼容性
AMQP 协议的三层:
AMQP 模型的三大组件:
生产者:
消费者:
DLX,全称为 Dead-Letter-Exchange(死信交换器),当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
导致的死信的几种原因:
Basic.Reject 或 Basic.Nack)且 requeue = false。延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OTP 18.0 及以上。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。
可以通过x-max-priority参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
由于 TCP 链接的创建和销毁开销较大(三次握手、慢启动等),且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接。
注意:
- 单个 TCP 连接可承载多个 Channel,但官方建议不超过 100-200 个/连接
- 每个 Channel 有独立的编号,但共享同一 TCP 连接的流量控制
- Channel 不是线程安全的,多线程应使用不同 Channel 实例
消息可能在三个环节丢失:生产者 → Broker、Broker 存储期间、Broker → 消费者
1. 生产者 → Broker
保证生产者端零丢失需要双重机制兜底:
Publisher Confirms(异步确认):确认消息是否到达 Broker
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// 消息已到达 Broker 并落盘/同步到镜像
}, (sequenceNumber, multiple) -> {
// 消息未到达 Broker,记录日志并重试
});
Mandatory + Return Listener(路由失败处理):捕获消息到达 Exchange 但无法路由到 Queue 的情况
// 开启 mandatory 模式
channel.basicPublish("exchange", "routingKey",
true, // mandatory=true
null,
messageBody);
// 配置 Return Listener
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
// 消息到达 Exchange 但路由失败,记录日志或发送到备用交换器
log.error("Message returned: {}", replyText);
});
关键警告:若仅开启 Confirm 未处理 Return,配置漂移(如误删队列或绑定)会导致生产者认为发送成功,但消息在 Broker 内部被静默丢弃,形成消息黑洞。
2. Broker 存储期间
delivery_mode=2,消息写入磁盘durable=true,重启后队列重建3. Broker → 消费者
basicAck(deliveryTag, multiple),确保消费成功后再确认basicNack 或 basicReject 并 requeue=true以下时序图展示了从生产者到消费者的完整消息流转及各环节的异常处理策略:
sequenceDiagram
participant P as 生产者 (Producer)
participant E as 交换器 (Exchange)
participant DLX as 死信交换器 (DLX)
participant Q as 队列 (Quorum Queue)
participant C as 消费者 (Consumer)
P->>E: 1. 发送消息 (开启 Confirm & Mandatory)
alt 路由成功
E->>Q: 2. 消息进入队列
Q-->>P: 3. Raft 多数派落盘后返回 Confirm Ack
else 路由失败 (无匹配 Queue, mandatory=true)
E-->>P: 2a. 触发 Return Listener 返回消息
Note over P: 记录日志或告警
end
Q->>C: 4. 推送消息 (开启手动 Ack)
alt 消费成功
C-->>Q: 5. 发送 basic.ack
Q->>Q: 6. 标记消息可删除
else 业务异常且可重试
C-->>Q: 5a. basic.nack (requeue=true)
Q->>Q: 6a. 消息重回队列尾部 (注意:顺序破坏)
else 致命异常 / 重试超限
C-->>Q: 5b. basic.reject (requeue=false)
Q->>DLX: 6b. 路由至死信交换机 (DLX)
end
关键路径说明:
注意:Alternate Exchange(备用交换器)是另一种独立的路由失败处理机制,与 Mandatory + Return Listener 互斥。配置 Alternate Exchange 后,路由失败的消息会被转发到备用交换器,生产者收到的是正常的 Confirm Ack 而非 Return。
RabbitMQ 仅保证单个 Queue 内的 FIFO 顺序,但多消费者场景下可能出现乱序。解决方案:
1. 单 Consumer 模式
2. 分区有序(推荐,但需注意失效模式)
失效模式警告:
- 拓扑变更乱序:当后端队列扩缩容导致哈希环发生变化时,同一个业务 Key 的新老消息可能进入不同队列
- 重试乱序:若消费者内部处理失败执行 Nack 并 Requeue,该消息会被重新推入队列尾部,导致后续消息先被消费
- 应用层防护:极端严格顺序场景下,消费者业务表必须设计基于状态机或版本号的幂等与防并发覆盖机制
3. 内部内存队列(慎重)
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有四种模式:单机模式、普通集群模式、镜像集群模式(已废弃)、Quorum Queue(推荐)。
版本演进说明:
- 3.8 前:镜像队列(Classic Queue Mirroring)是主要高可用方案
- 3.8+:Quorum Queue 作为推荐替代方案,镜像队列被标记为 deprecated
- 3.13:镜像队列仍可用但已废弃
- 4.0+:镜像队列完全移除,Quorum Queue 成为默认高可用方案
网络分区警告(严重):无论是普通集群还是早期的镜像集群,均依赖 Erlang 内部的分布式同步机制,对网络抖动极度敏感。在多机房或跨可用区部署时,极易发生网络分区(Split-brain)。必须在
rabbitmq.conf中明确配置分区恢复策略:
pause_minority:少数派节点自动暂停服务以防数据分化(推荐)autoheal:自动选择一方继续运行(有数据丢失风险)- 对于 3.8 以上版本,强烈建议直接使用基于 Raft 一致性算法的 Quorum Queue,从根本上解决网络分区导致的消息丢失与状态不一致问题
单机模式
Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。
普通集群模式
意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。
你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式(Classic Queue Mirroring,已废弃)
⚠️ 重要警告:镜像队列已在 RabbitMQ 4.0 中被完全移除。RabbitMQ 3.8 引入 Quorum Queue 作为推荐替代方案,3.13 版本镜像队列仍可用但已废弃,4.0 版本正式移除。新项目请使用 Quorum Queue 或 Streams。
这种模式是 RabbitMQ 早期版本的高可用方案。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据。每次写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
工作原理:
优点:
缺点:
Quorum Queue(3.8+ 推荐,4.0 后为默认高可用方案)
基于 Raft 协议的复制队列,是 RabbitMQ 3.8+ 推荐的高可用方案,4.0 后成为默认选项:
声明方式(客户端):
Java:
// Java 客户端声明 Quorum Queue
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum"); // 关键参数,必须在声明时指定
channel.queueDeclare("my-queue", true, false, false, args);
Python:
# Python (pika) 客户端声明 Quorum Queue
channel.queue_declare(
queue='my-queue',
durable=True,
arguments={'x-queue-type': 'quorum'} # 关键参数
)
重要提示:
x-queue-type参数必须在队列声明时由客户端提供,不能通过 Policy 设置或修改。Policy 只能配置 max-length、delivery-limit 等运行时参数。
RabbitMQ 可以设置消息过期时间(TTL)。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 清理掉,导致数据丢失。
批量重导方案(适用于数据可恢复的场景):
当大量消息积压或过期时,可采取以下步骤:
示例场景:
注意事项:
1. 内存水位线告警(严重)
rabbitmq_memory_limit 占比{rabbit, [
{vm_memory_high_watermark, 0.4}, % 内存高水位 40%
{vm_memory_high_watermark_paging_ratio, 0.5} % 开始分页的比例
]}
2. 文件句柄消耗
ulimit -n 100000)3. Channel Churn Rate
4. 消息积压深度
5. 磁盘空间与 I/O
误区 1:Quorum Queue 是银弹,能解决所有问题
delivery_mode=1 的非持久化消息)强制持久化存储到磁盘误区 2:Prefetch Count 设置越大越好
channel.basicQos(20); // 推荐起始值
误区 3:延迟队列插件可以无限制使用
误区 4:网络分区不会发生在我们环境
cluster_partition_handling = pause_minority误区 5:开启了事务机制就万无一失
重要说明:RabbitMQ 3.7+ 使用新的
rabbitmq.conf格式(sysctl 风格),而非旧的advanced.config(Erlang 术语格式)。以下配置适用于rabbitmq.conf:
# rabbitmq.conf 生产环境推荐配置
# 内存管理
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.5
# 磁盘管理
disk_free_limit.absolute = 5GB
# 连接与通道
channel_max = 200
connection_max = infinity
# 心跳检测(秒)
heartbeat = 60
# 网络分区处理(重要)
cluster_partition_handling = pause_minority
# 默认用户(生产环境请修改或删除)
default_user = guest
default_pass = guest
loopback_users = none
# 管理插件监听端口
management.tcp.port = 15672
如需使用 Erlang 术语格式(高级配置),请使用 advanced.config 文件,但不要与 rabbitmq.conf 混用。
本文系统梳理了 RabbitMQ 的核心知识点,从基础概念到生产实践,涵盖了面试和实际应用中最重要的内容。让我们回顾一下关键要点:
| 版本里程碑 | 重要变化 | 生产影响 |
|---|---|---|
| 3.8 前 | 镜像队列(Classic Queue Mirroring)时代 | 主从复制,脑裂风险 |
| 3.8+ | Quorum Queue 引入 | 基于 Raft,推荐用于高可靠场景 |
| 3.9+ | Streams 引入 | Kafka-like 架构,支持事件溯源 |
| 4.0+ | 镜像队列完全移除 | 新项目必须使用 Quorum Queue 或 Streams |
必知必会:
order.china.*)常见追问:
1. 队列类型选型
高可靠性需求 → Quorum Queue(默认推荐)
高吞吐量需求 → Classic Queue(单节点)或 Streams(3.9+)
事件溯源需求 → Streams(支持非破坏性消费)
2. 消息可靠性配置
// 生产者端:双重保障
channel.confirmSelect(); // Confirm
channel.basicPublish(exchange, routingKey, true, ...); // Mandatory
channel.addReturnListener(...); // Return Listener
// 消费者端:手动确认
channel.basicQos(20); // Fair dispatch
channel.basicConsume(queue, false, ...); // Manual ack
3. 高可用配置要点
# 网络分区处理(跨机房部署必配)
cluster_partition_handling = pause_minority
# 使用 Quorum Queue(客户端声明)
arguments.put("x-queue-type", "quorum");
4. 监控告警指标