RocketMQ 消费者
RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。
这篇文章,笔者梳理了 RocketMQ 的消费逻辑,希望对大家有所启发。
1 架构概览
在展开集群消费逻辑细节前,我们先对 RocketMQ 4.X 架构做一个概览。
整体架构中包含四种角色 :
1、NameServer
名字服务是是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。它是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper ,支持 Broker 的动态注册与发现。
2、BrokerServer
Broker 主要负责消息的存储、投递和查询以及服务高可用保证 。
3、Producer
消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
4、Consumer
消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。
RocketMQ 集群工作流程:
1、启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer 、Consumer 连上来,相当于一个路由控制中心。
2、Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息( IP+端口等 )以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
3、收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
4、Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
2 发布订阅
RocketMQ 的传输模型是:发布订阅模型 。
发布订阅模型具有如下特点:
消费独立
相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
一对多通信
基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。
集群消费:同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。
广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
为了实现这种发布订阅模型 , RocketMQ 精心设计了它的存储模型。先进入 Broker 的文件存储目录。
RocketMQ 采用的是混合型的存储结构。
1、Broker 单个实例下所有的队列共用一个数据文件(commitlog)来存储
生产者发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 commitlog 文件中。只要消息被刷盘持久化至磁盘文件 commitlog 中,那么生产者发送的消息就不会丢失。
单个文件大小默认 1G , 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0 ,文件大小为1 G = 1073741824 。
这种设计有两个优点:
充分利用顺序写,大大提升写入数据的吞吐量;
快读定位消息。
因为消息是一条一条写入到 commitlog 文件 ,写入完成后,我们可以得到这条消息的物理偏移量。
每条消息的物理偏移量是唯一的, commitlog 文件名是递增的,可以根据消息的物理偏移量通过二分查找,定位消息位于那个文件中,并获取到消息实体数据。
2、Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)
进入消费文件存储目录 :
1、消费文件按照主题存储,每个主题下有不同的队列,图中主题 my-mac-topic 有 16 个队列 (0 到 15) ;
2、每个队列目录下 ,存储 consumequeue 文件,每个 consumequeue 文件也是顺序写入,数据格式见下图。
每个 consumequeue 文件包含 30 万个条目,每个条目大小是 20 个字节,每个文件的大小是 30 万 * 20 = 60万字节,每个文件大小约 5.72M 。
和 commitlog 文件类似,consumequeue 文件的名称也是以偏移量来命名的,可以通过消息的逻辑偏移量定位消息位于哪一个文件里。
消费文件按照主题-队列来保存 ,这种方式特别适配发布订阅模型。
消费者从 Broker 获取订阅消息数据时,不用遍历整个 commitlog 文件,只需要根据逻辑偏移量从 consumequeue 文件查询消息偏移量 , 最后通过定位到 commitlog 文件, 获取真正的消息数据。
要实现发布订阅模型,还需要一个重要文件:消费进度文件。原因有两点:
不同消费组之间相互独立,不会相互影响 ;
消费者下次拉取数据时,需要知道从哪个进度开始拉取 ,就像我们小时候玩单机游戏存盘一样。
因此消费进度文件需要保存消费组所订阅主题的消费进度。
我们浏览下集群消费场景下的 Broker 端的消费进度文件 consumerOffset.json 。
在进度文件 consumerOffset.json 里,数据以 key-value 的结构存储,key 表示:主题@消费者组 , value 是 consumequeue 中每个队列对应的逻辑偏移量 。
写到这里,我们粗糙模拟下 RocketMQ 存储模型如何满足发布订阅模型 。
1、发送消息:生产者发送消息到 Broker ;
2、保存消息:Broker 将消息存储到 commitlog 文件 ,异步线程会构建消费文件 consumequeue ;
3、消费流程:消费者启动后,会通过负载均衡分配对应的队列,然后向 Broker 发送拉取消息请求。Broker 收到消费者拉取请求之后,根据订阅组,消费者编号,主题,队列名,逻辑偏移量等参数 ,从该主题下的 consumequeue 文件查询消息消费条目,然后从 commitlog 文件中获取消息实体。消费者在收到消息数据之后,执行消费监听器,消费完消息;
4、保存进度:消费者将消费进度提交到 Broker ,Broker 会将该消费组的消费进度存储在进度文件里。
3 消费流程
我们重点讲解下集群消费的消费流程 ,因为集群消费是使用最普遍的消费模式,理解了集群消费,广播消费也就能顺理成章的掌握了。
集群消费示例代码里,启动消费者,我们需要配置三个核心属性:消费组名、订阅主题、消息监听器,最后调用 start 方法启动。
首先进入 DefaultMQPushConsumerImpl
类的 start
方法 。
消费者启动后,我们可以将整个流程简化成:
4 负载均衡
消费端的负载均衡是指将 Broker 端中多个队列按照某种算法分配给同一个消费组中的不同消费者,负载均衡是客户端开始消费的起点。
RocketMQ 负载均衡的核心设计理念是
消费队列在同一时间只允许被同一消费组内的一个消费者消费
一个消费者能同时消费多个消息队列
负载均衡是每个客户端独立进行计算,那么何时触发呢 ?
消费端启动时,立即进行负载均衡;
消费端定时任务每隔 20 秒触发负载均衡;
消费者上下线,Broker 端通知消费者触发负载均衡。
负载均衡流程如下:
1、发送心跳
消费者启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(消息消费分组名称、订阅关系集合、消息通信模式和客户端实例编号等信息)。
Broker 端在收到消费者的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 channelInfoTable 中,为之后做消费端的负载均衡提供可以依据的元数据信息。
2、启动负载均衡服务
负载均衡服务会根据消费模式为”广播模式”还是“集群模式”做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:
(1) 获取该主题下的消息消费队列集合;
(2) 查询 Broker 端获取该消费组下消费者 Id 列表;
(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;
这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。
(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作。
消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的处理队列 processQueue (消费快照)。
标红的 Entry 部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。
绿色的 Entry 部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。
黄色的 Entry 部分表示这些队列需要添加到 processQueueTable 对象中,为每个分配的新队列创建一个消息拉取请求
pullRequest
, 在消息拉取请求中保存一个处理队列processQueue
(队列消费快照),内部是红黑树(TreeMap
),用来保存拉取到的消息。
最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。
5 长轮询
在负载均衡这一小节,我们已经知道负载均衡触发了拉取消息的流程。
消费者启动的时候,会创建一个拉取消息服务 PullMessageService ,它是一个单线程的服务。
核心流程如下:
1、负载均衡服务将消息拉取请求放入到拉取请求队列 pullRequestQueue , 拉取消息服务从队列中获取拉取消息请求 ;
2、拉取消息服务向 Brorker 服务发送拉取请求 ,拉取请求的通讯模式是异步回调模式 ;
消费者的拉取消息服务本身就是一个单线程,使用异步回调模式,发送拉取消息请求到 Broker 后,拉取消息线程并不会阻塞 ,可以继续处理队列 pullRequestQueue 中的其他拉取任务。
3、Broker 收到消费者拉取消息请求后,从存储中查询出消息数据,然后返回给消费者;
4、消费者的网络通讯层会执行拉取回调函数相关逻辑,首先会将消息数据存储在队列消费快照 processQueue 里;
消费快照使用红黑树 msgTreeMap 存储拉取服务拉取到的消息 。
5、回调函数将消费请求提交到消息消费服务 ,而消息消费服务会异步的消费这些消息;
6、回调函数会将处理中队列的拉取请放入到定时任务中;
7、定时任务再次将消息拉取请求放入到队列 pullRequestQueue 中,形成了闭环:负载均衡后的队列总会有任务执行拉取消息请求,不会中断。
细心的同学肯定有疑问:既然消费端是拉取消息,为什么是长轮询呢 ?
虽然拉模式的主动权在消费者这一侧,但是缺点很明显。
因为消费者并不知晓 Broker 端什么时候有新的消息 ,所以会不停地去 Broker 端拉取消息,但拉取频率过高, Broker 端压力就会很大,频率过低则会导致消息延迟。
所以要想消费消息的延迟低,服务端的推送必不可少。
下图展示了 RocketMQ 如何通过长轮询减小拉取消息的延迟。
核心流程如下:
1、Broker 端接收到消费者的拉取消息请求后,拉取消息处理器开始处理请求,根据拉取请求查询消息存储 ;
2、从消息存储中获取消息数据 ,若存在新消息 ,则将消息数据通过网络返回给消费者。若无新消息,则将拉取请求放入到拉取请求表 pullRequestTable 。
3、长轮询请求管理服务 pullRequestHoldService 每隔 5 秒从拉取请求表中判断拉取消息请求的队列是否有新的消息。
判定标准是:拉取消息请求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了。
若存在新的消息 , 长轮询请求管理服务会触发拉取消息处理器重新处理该拉取消息请求。
4、当 commitlog 中新增了新的消息,消息分发服务会构建消费文件和索引文件,并且会通知长轮询请求管理服务,触发拉取消息处理器重新处理该拉取消息请求。
6 消费消息
在拉取消息的流程里, Broker 端返回消息数据,消费者的通讯框架层会执行回调函数。
回调线程会将数据存储在队列消费快照 processQueue(内部使用红黑树 msgTreeMap)里,然后将消息提交到消费消息服务,消费消息服务会异步消费这些消息。
消息消费服务有两种类型:并发消费服务和顺序消费服务 。
6.1 并发消费
并发消费是指消费者将并发消费消息,消费的时候可能是无序的。
消费消息并发服务启动后,会初始化三个组件:消费线程池、清理过期消息定时任务、处理失败消息定时任务。
核心流程如下:
0、通讯框架回调线程会将数据存储在消费快照里,然后将消息列表 msgList 提交到消费消息服务
1、 消息列表 msgList 组装成消费对象
2、将消费对象提交到消费线程池
我们看到10 条消息被组装成三个消费请求对象,不同的消费线程会执行不同的消费请求对象。
3、消费线程执行消息监听器
执行完消费监听器,会返回消费结果。
4、处理异常消息
当消费异常时,异常消息将重新发回 Broker 端的重试队列( RocketMQ 会为每个 topic 创建一个重试队列,以 %RETRY% 开头),达到重试时间后将消息投递到重试队列中进行消费重试。
我们将在重试机制这一节重点讲解 RocketMQ 如何实现延迟消费功能 。
假如异常的消息发送到 Broker 端失败,则重新将这些失败消息通过处理失败消息定时任务重新提交到消息消费服务。
5、更新本地消费进度
消费者消费一批消息完成之后,需要保存消费进度到进度管理器的本地内存。
首先我们会从队列消费快照 processQueue 中移除消息,返回消费快照 msgTreeMap 第一个偏移量 ,然后调用消费消息进度管理器 offsetStore 更新消费进度。
待更新的偏移量是如何计算的呢?
场景1:快照中1001(消息1)到1010(消息10)消费了,快照中没有了消息,返回已消费的消息最大偏移量 + 1 也就是1011。
场景2:快照中1001(消息1)到1008(消息8)消费了,快照中只剩下两条消息了,返回最小的偏移量 1009。
- 场景3:1001(消息1)在消费对象中因为某种原因一直没有被消费,即使后面的消息1005-1010都消费完成了,返回的最小偏移量是1001。
在场景3,RocketMQ 为了保证消息肯定被消费成功,消费进度只能维持在1001(消息1),直到1001也被消费完,本地的消费进度才会一下子更新到1011。
假设1001(消息1)还没有消费完成,消费者实例突然退出(机器断电,或者被 kill ),就存在重复消费的风险。
因为队列的消费进度还是维持在1001,当队列重新被分配给新的消费者实例的时候,新的实例从 Broker 上拿到的消费进度还是维持在1001,这时候就会又从1001开始消费,1001-1010这批消息实际上已经被消费过还是会投递一次。
所以业务必须要保证消息消费的幂等性。
写到这里,我们会有一个疑问:假设1001(消息1)因为加锁或者消费监听器逻辑非常耗时,导致极长时间没有消费完成,那么消费进度就会一直卡住 ,怎么解决呢 ?
RocketMQ 提供两种方式一起配合解决:
拉取服务根据并发消费间隔配置限流
拉取消息服务在拉取消息时候,会判断当前队列的 processQueue 消费快照里消息的最大偏移量 - 消息的最小偏移量大于消费并发间隔(2000)的时候 , 就会触发流控 , 这样就可以避免消费者无限循环的拉取新的消息。
清理过期消息
消费消息并发服务启动后,会定期扫描所有消费的消息,若当前时间减去开始消费的时间大于消费超时时间,首先会将过期消息发送 sendMessageBack 命令发送到 Broker ,然后从快照中删除该消息。
6.2 顺序消费
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
1、分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
示例:电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
2、全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。
因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
消息的顺序需要由两个阶段保证:
消息发送
如上图所示,A1、B1、A2、A3、B2、B3 是订单 A 和订单 B 的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单 A 的消息发送和消费都按照 A1、A2、A3 的顺序。
如果是普通消息,订单A 的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时 RocketMQ 支持将 Sharding Key 相同(例如同一订单号)的消息序路由到同一个队列中。
下图是生产者发送顺序消息的封装,原理是发送消息时,实现 MessageQueueSelector 接口, 根据 Sharding Key 使用 Hash 取模法来选择待发送的队列。
消息消费
消费者消费消息时,需要保证单线程消费每个队列的消息数据,从而实现消费顺序和发布顺序的一致。
顺序消费服务的类是 ConsumeMessageOrderlyService ,在负载均衡阶段,并发消费和顺序消费并没有什么大的差别。
最大的差别在于:顺序消费会向 Borker 申请锁 。消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。
顺序消费核心流程如下:
1、 组装成消费对象
2、 将请求对象提交到消费线程池
和并发消费不同的是,这里的消费请求包含消费快照 processQueue ,消息队列 messageQueue 两个对象,并不对消息列表做任何处理。
3、 消费线程内,对消费队列加锁
顺序消费也是通过线程池消费的,synchronized 锁用来保证同一时刻对于同一个队列只有一个线程去消费它
4、 从消费快照中取得待消费的消息列表
消费快照 processQueue 对象里,创建了一个红黑树对象 consumingMsgOrderlyTreeMap 用于临时存储的待消费的消息。
5、 执行消息监听器
消费快照的消费锁 consumeLock 的作用是:防止负载均衡线程把当前消费的 MessageQueue 对象移除掉。
6、 处理消费结果
消费成功时,首先计算需要提交的偏移量,然后更新本地消费进度。
消费失败时,分两种场景:
假如已消费次数小于最大重试次数,则将对象 consumingMsgOrderlyTreeMap 中临时存储待消费的消息,重新加入到消费快照红黑树 msgTreeMap 中,然后使用定时任务尝试重新消费。
假如已消费次数大于等于最大重试次数,则将失败消息发送到 Broker ,Broker 接收到消息后,会加入到死信队列里 , 最后计算需要提交的偏移量,然后更新本地消费进度。
我们做一个关于顺序消费的总结 :
顺序消费需要由两个阶段消息发送和消息消费协同配合,底层支撑依靠的是 RocketMQ 的存储模型;
顺序消费服务启动后,队列的数据都会被消费者实例单线程的执行消费;
假如消费者扩容,消费者重启,或者 Broker 宕机 ,顺序消费也会有一定几率较短时间内乱序,所以消费者的业务逻辑还是要保障幂等。
7 保存进度
RocketMQ 消费者消费完一批数据后, 会将队列的进度保存在本地内存,但还需要将队列的消费进度持久化。
1、 集群模式
集群模式下,分两种场景:
拉取消息服务会在拉取消息时,携带该队列的消费进度,提交给 Broker 的拉取消息处理器。
消费者定时任务,每隔5秒将本地缓存中的消费进度提交到 Broker 的消费者管理处理器。
Broker 的这两个处理器都调用消费者进度管理器 consumerOffsetManager 的 commitOffset 方法,定时任务异步将消费进度持久化到消费进度文件 consumerOffset.json 中。
2、 广播模式
广播模式消费进度存储在消费者本地,定时任务每隔 5 秒通过 LocalFileOffsetStore 持久化到本地文件offsets.json
,数据格式为 MessageQueue:Offset
。
广播模式下,消费进度和消费组没有关系,本地文件 offsets.json
存储在配置的目录,文件中包含订阅主题中所有的队列以及队列的消费进度。
8 重试机制
集群消费下,重试机制的本质是 RocketMQ 的延迟消息功能。
消费消息失败后,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。
Broker 端会为每个 topic 创建一个重试队列 ,队列名称是:%RETRY% + 消费者组名 ,达到重试时间后将消息投递到重试队列中进行消费重试(消费者组会自动订阅重试 Topic)。最多重试消费 16 次,重试的时间间隔逐渐变长,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
开源 RocketMQ 4.X 支持延迟消息,默认支持18 个 level 的延迟消息,这是通过 broker 端的 messageDelayLevel 配置项确定的,如下:
Broker 在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟 level 的个数,创建对应数量的队列,也就是说18个 level 对应了18个队列。
我们先梳理下延迟消息的实现机制。
1、生产者发送延迟消息
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);
2、Broker端存储延迟消息
延迟消息在 RocketMQ Broker 端的流转如下图所示:
第一步:修改消息 Topic 名称和队列信息
Broker 端接收到生产者的写入消息请求后,首先都会将消息写到 commitlog 中。假如是正常非延迟消息,MessageStore 会根据消息中的 Topic 信息和队列信息,将其转发到目标 Topic 的指定队列 consumequeue 中。
但由于消息一旦存储到 consumequeue 中,消费者就能消费到,而延迟消息不能被立即消费,所以 RocketMQ 将 Topic 的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
同时,还会将消息原来要发送到的目标 Topic 和队列信息存储到消息的属性中。
第二步:构建 consumequeue 文件时,计算并存储投递时间
上图是 consumequeue 文件一条消息的格式,最后 8 个字节存储 Tag 的哈希值,此时存储消息的投递时间。
第三步:定时调度服务启动
ScheduleMessageService 类是一个定时调度服务,读取 SCHEDULE_TOPIC_XXXX 队列的消息,并将消息投递到目标 Topic 中。
定时调度服务启动时,创建一个定时调度线程池 ,并根据延迟级别的个数,启动对应数量的 HandlePutResultTask ,每个 HandlePutResultTask 负责一个延迟级别的消费与投递。
第四步:投递时间到了,将消息数据重新写入到 commitlog
消息到期后,需要投递到目标 Topic 。第一步已经记录了原来的 Topic 和队列信息,这里需要重新设置,再存储到 commitlog 中。
第五步:将消息投递到目标 Topic 中
Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)。因此消息会直接投递到目标 Topic 的 consumequeue 中,之后消费者就可以消费到这条消息。
回顾了延迟消息的机制,消费消息失败后,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。
Broker 端 SendMessageProcessor 处理器会调用 asyncConsumerSendMsgBack 方法。
首先判断消息的当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则重新创建 Topic ,规则是 %DLQ% + consumerGroup,后续处理消息发送到死信队列。
正常的消息会进入 else 分支,对于首次重试的消息,默认的 delayLevel 是 0 ,RocketMQ 会将 delayLevel + 3,也就是加到 3 ,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,也就是距离首次发送 10s 后重试,其主题的默认规则是 %RETRY% + consumerGroup。
当延时级别设置完成,刷新消息的重试次数为当前次数加 1 ,Broker 端将该消息刷盘,逻辑如下:
延迟消息写入到 commitlog 里 ,这里其实和延迟消息机制的第一步类似,后面按照延迟消息机制的流程执行即可(第二步到第六步)。
9 总结
下图展示了集群模式下消费者并发消费流程 :
核心流程如下:
消费者启动后,触发负载均衡服务 ,负载均衡服务为消费者实例分配对应的队列 ;
分配完队列后,负载均衡服务会为每个分配的新队列创建一个消息拉取请求
pullRequest
, 拉取请求保存一个处理队列processQueue
,内部是红黑树(TreeMap
),用来保存拉取到的消息 ;拉取消息服务单线程从拉取请求队列
pullRequestQueue
中弹出拉取消息,执行拉取任务 ,拉取请求是异步回调模式,将拉取到的消息放入到处理队列;拉取请求在一次拉取消息完成之后会复用,重新被放入拉取请求队列
pullRequestQueue
中 ;拉取完成后,调用消费消息服务
consumeMessageService
的submitConsumeRequest
方法 ,消费消息服务内部有一个消费线程池;消费线程池的消费线程从消费任务队列中获取消费请求,执行消费监听器
listener.consumeMessage
;消费完成后,若消费成功,则更新偏移量
updateOffset
,先更新到内存offsetTable
,定时上报到 Broker ;若消费失败,则将失败消费发送到 Broker 。Broker 端接收到请求后, 调用消费进度管理器的
commitOffset
方法修改内存的消费进度,定时刷盘到consumerOffset.json
。
RocketMQ 4.9.X 的消费逻辑有两个非常明显的特点:
客户端代码逻辑较重。假如要支持一种新的编程语言,那么客户端就必须实现完整的负载均衡逻辑,此外还需要实现拉消息、位点管理、消费失败后将消息发回 Broker 重试等逻辑。这给多语言客户端的支持造成很大的阻碍。
保证幂等非常重要。当客户端升级或者下线时,或者 Broker 宕机,都要进行负载均衡操作,可能造成消息堆积,同时有一定几率造成重复消费。