# 1 专业术语

  • Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • PushConsumer Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。

  • PullConsumer

    Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

  • ProducerGroup

    一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。

  • ConsumerGroup

    一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。

  • Broker

    消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。

  • 广播消费

    一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。

  • 集群消费

    一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那举每个实例只消费其中的 3 条消息。

  • 顺序消息 消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要是指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

  • Message Queue 在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。

# 2 核心功能

本节阐述消息中间件通常需要解决哪些问题,在解决这些问题当中会遇到什么困难,RocketMQ 是否可以解决,规范中如何定义这些问题。

# 2.1 发布订阅

点对点(P2P)和发布订阅(Pub/Sub)是两种常见的消息队列模式,它们用于满足不同通信需求。

  1. 点对点(P2P)模式:
    • 在点对点模式中,消息发送者(生产者)将消息发送到一个特定的队列,而消息接收者(消费者)从该队列中接收消息。
    • 消息在队列中存储,一旦一个消息被消费者接收,它就从队列中移除,这确保了每个消息只被一个消费者处理。
    • 这种模式适用于一对一的通信,其中一个生产者向一个特定的消费者发送消息,确保消息的可靠传递和处理。
  2. 发布订阅(Pub/Sub)模式:
    • 在发布订阅模式中,消息发送者将消息发布到一个主题(topic),而消息订阅者则订阅感兴趣的主题。
    • 每个主题可以有多个订阅者,因此消息会被广播到所有订阅了相同主题的消费者。
    • 这种模式适用于一对多或多对多的通信,允许多个消费者同时接收和处理相同主题的消息。
    • 发布订阅模式通常用于构建实时事件处理系统、日志处理、通知系统等,其中多个消费者需要订阅相同类型的消息并进行处理。

点对点模式适用于一对一的通信,确保消息的可靠传递给一个特定的消费者,而发布订阅模式适用于一对多或多对多的通信,允许多个消费者同时接收相同主题的消息,用于构建实时事件系统和广播通信。

# 2.2 消息优先级

规范中描述的优先级是指在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消 息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。

由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级 的队列, 将不同优先级发送到不同队列即可。

对于优先级问题,可以归纳为 2 类:

  1. 只要达到优先级目的即可,不是严格意义上的优先级,通常将优先级划分为高、中、低,或者再多几个级别。每个优先级可以用不同的 topic 表示,发消息时,指定不同的 Topic 来表示优先级,这种方式可以解决绝大部分的优先级问题,但是对业务的优先级精确性做了妥协。
  2. 严格的优先级,优先级用整数表示,例如 0 ~ 65535,这种优先级问题一般使用不同 topic 解决就非常不合适。如果要让 MQ 解决此问题,会对 MQ 的性能造成非常大的影响。这里要确保一点,业务上是否确实需

要这种严格的优先级,如果将优先级压缩成几个,对业务的影响有多大 ?

# 2.3 消息有序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创 建,订单付款,订单完成。消费时,要按照这个顺序消费才能有意义。但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

# 2.4 消息过滤

  • Broker 端消息过滤 在 Broker 中,按照 Consumer 的要求做过滤,优点是减少了对于 Consumer 无用消息的网络传输。 缺点是增加了 Broker 的负担,实现相对复杂。

  • Consumer 端消息过滤

    这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到 Consumer 端。

# 2.5 消息持久化

消息中间件通常采用的几种持久化方式:

  1. 持久化到数据库,例如 Mysql 。
  2. 持久化到 KV 存储,例如 levelDB、伯克利 DB 等 KV 存储系统。
  3. 文件记录形式持久化,例如 Kafka,RocketMQ 。
  4. 对内存数据做一个持久化镜像,例如 beanstalkd,VisiNotify 。

1,2, 3 三种持久化方式都具有将内存队列 Buffer 进行扩展的能力,4 只是一个内存的镜像,作用是当 Broker 挂掉重启后仍然能将之前内存的数据恢复出来。

JMS 与 CORBA Notification 规范没有明确说明如何持久化,但是持久化部分的性能直接决定了整个消息中间件 的性能。

RocketMQ 参考了 Kafka 的持久化方式,充分利用 Linux 文件系统内存 cache 来提高性能。

# 2.6 消息可靠性

影响消息可靠性的几种情况:

  1. Broker 正常关闭

  2. Broker 异常 Crash

  3. OS Crash

  4. 机器掉电,但是能立即恢复供电情况。

  1. 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
  2. 磁盘设备损坏。

1、2、3、4 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5、6 属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。

RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点, 同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与金额相关的应用。

# 2.7 低延迟消费

在消息不堆积情况下,消息到达 Broker 后,能立刻到达 Consumer。 RocketMQ 使用长轮询 Pull 方式,可保证消息非常实时,消息实时性不低于 Push。

# 2.8 At least Once

是指每个消息必须投递一次。RocketMQ Consumer 先 pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息, 所以 RocketMQ 可以很好的支持此特性。

# 2.9 Exactly Only Once

1、发送消息阶段,不允许发送重复的消息。 2、消费消息阶段,不允许消费重复的消息。 只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能,并不保证此特性,要求在业务上进行去重, 也就是说消费消息要做到幂等性。RocketMQ 虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消 费情况,只有网络异常,Consumer 启停等异常情况下会出现消息重复。

此问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才产生了消息重复性问题。

# 2.10 Broker Buffer 满了怎么办?

Broker 的 Buffer 通常指的是 Broker 中一个队列的内存 Buffer 大小,这类 Buffer 通常大小有限,如果 Buffer 满 了以后怎么办?

下面是 CORBA Notification 规范中处理方式:

(1). RejectNewEvents 拒绝新来的消息,向 Producer 返回 RejectNewEvents 错误码。

(2). 按照特定策略丢弃已有消息

  1. AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.

  2. FifoOrder - The first event received will be the first discarded.

  3. LifoOrder - The last event received will be the first discarded.

  4. PriorityOrder - Events should be discarded in priority order, such that lower priority events will be discarded before higher priority events.

  5. DeadlineOrder - Events should be discarded in the order of shortest expiry deadline first. RocketMQ 没有内存 Buffer 概念,RocketMQ 的队列都是持久化磁盘,数据定期清除。

对于此问题的解决思路,RocketMQ 同其他 MQ 有非常显著的区别,RocketMQ 的内存 Buffer 抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker 会定期删除过期的数据,例如 Broker 只保存 3 天的消息,那么这个 Buffer 虽然长度无限,但是 3 天前的数据会被从队尾删除。

# 2.11 回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障, 恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。

RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

# 2.12 消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要 求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:

  1. 消息堆积在内存 Buffer,一旦超过内存 Buffer,可以根据一定的丢弃策略来丢弃消息,如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存 Buffer 大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
  2. 消息堆积到持久化存储系统中,例如 DB ,KV 存储,文件记录形式。 当消息不能在内存 Cache 命中时,要不可避免的访问磁盘,会产生大量读 IO,读 IO 的吞吐量直接决定了 消息堆积后的访问能力。

评估消息堆积能力主要有以下四点:

  1. 消息能堆积多少条,多少字节 ? 即消息的堆积容量。

  2. 消息堆积后,发消息的吞吐量大小,是否会受堆积影响 ?

  3. 消息堆积后,正常消费的 Consumer 是否会受影响 ?

  4. 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大 ?

# 2.13 分布式事务

已知的几个分布式事务规范,如 XA,JTA 等。其中 XA 规范被各大数据库厂商广泛支持,如 Oracle,Mysql 等。 其中 XA 的 TM 实现佼佼者如 Oracle Tuxedo,在金融、电信等领域被广泛应用。

分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然需要 KV 存储的支持,因为第二阶段的提交回滚需要修改消息状态,一定涉及到根据 Key 去查找 Message 的动作。RocketMQ 在第二阶段绕过了根据 Key 去查找 Message 的问题,采用第一阶段发送 Prepared 消息时,拿到了消息的 Offset,第二阶段通过 Offset 去访问消息, 并修改状态,Offset 就是数据的地址。

RocketMQ 这种实现事务方式,没有通过 KV 存储做,而是通过 Offset 方式,存在一个显著缺陷,即通过 Offset 更改数据,会令系统的脏页过多,需要特别关注。

# 2.14 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能 被消费。

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不 可避免的产生巨大性能开销。

RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。

# 2.15 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

消费失败通常可以认为有以下几种情况

  1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。

这种错误通常需要跳过这条消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,

所以最好提供一种定时重试机制,即过 10s 秒后再重试。

  1. 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。

遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s ,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

# 3 架构概览

我们先对 RocketMQ 4.9.X 架构做一个概览。

整体架构中包含四种角色 :

1、NameServer

名字服务是是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。它是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper ,支持 Broker 的动态注册与发现。

2、BrokerServer

Broker 主要负责消息的存储、投递和查询以及服务高可用保证 。

3、Producer

消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

4、Consumer

消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。

RocketMQ 集群工作流程:

1、启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer 、Consumer 连上来,相当于一个路由控制中心。

2、Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息( IP+端口等 )以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

3、收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

4、Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。

5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息

上次更新: 2026/6/28