跳至主要內容
RocketMQ 消费幂等

这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等

1 基础概念

消费幂等是指:当出现 RocketMQ 消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。

例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。


勇哥大约 11 分钟RocketMQRocketMQ消息队列
RocketMQ 整体架构

1 专业术语

  • Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • PushConsumer Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。

  • PullConsumer

    Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

  • ProducerGroup

    一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。

  • ConsumerGroup

    一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。

  • Broker

    消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。

  • 广播消费

    一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。

  • 集群消费

    一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那举每个实例只消费其中的 3 条消息。

  • 顺序消息 消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要是指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

  • Message Queue 在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。


勇哥大约 15 分钟RocketMQRocketMQ消息队列
序言

大家好,我是勇哥 。

20231024 , 程序员节,圆了我一个小小的梦。

花了半年时间,我写了一本电子书 ,书名是:《RocketMQ4.X设计精要》,我想在今天分享给各位。

这本书一共包含十五章,接近 10 万字,180 张图,按照 RocketMQ 的知识体系一章一章展开。


勇哥大约 3 分钟RocketMQRocketMQ消息队列
RocketMQ 网络通讯

RocketMQ 的网络通讯模块负责生产者、消费者与 Broker 之间的网络通信。

笔者学习 RocketMQ 也是从通讯模块源码开始的,并且从源码里汲取了很多营养。

1 网络协议

客户端和服务端之间完成数据交互,需要约定数据协议。数据协议如下图:


勇哥大约 7 分钟RocketMQ消息队列RocketMQ
RocketMQ 名字服务

NameServer 是专为 RocketMQ 设计的轻量级名字服务,它的源码非常精简,八个类 ,少于1000行代码。

这篇文章, 笔者会从基础概念Broker发送心跳包NameServer 维护路由Zookeeper vs NameServer 四个模块揭秘名字服务的设计精髓。


勇哥大约 5 分钟RocketMQRocketMQ消息队列
RocketMQ 生产者

这篇文章,我们从源码的角度探寻 RocketMQ Producer 的实现机制。

1 基础配置

我们先展示生产者发送消息的示例代码。

// 1. 初始化默认生产者,传递参数生产者组名
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// 2. 设置名字服务地址 
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
// 3. 启动生产者服务 
producer.start();
// 4. 定义消息对象 
Message msg = new Message(*TOPIC* /* Topic */,
        *TAG* /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.*DEFAULT_CHARSET*) /* Message body */
);
msg.setKeys("");
// 5. 发送消息
// 示例普通消息
SendResult sendResult = producer.send(msg);
// 示例异步回调
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // do something
    }
    @Override
    public void onException(Throwable e) {
        // do something
    }
});
// 示例oneway发送
producer.sendOneway(msg);

勇哥大约 11 分钟RocketMQRocketMQ消息队列
RocketMQ 存储模型

RocketMQ 优异的性能表现,必然绕不开其优秀的存储模型 。

这篇文章,笔者按照自己的理解 , 尝试分析 RocketMQ 的存储模型,希望对大家有所启发。

1 整体概览

首先温习下 RocketMQ 架构。


勇哥大约 8 分钟RocketMQRocketMQ消息队列
RocketMQ 消费者

RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。

这篇文章,笔者梳理了 RocketMQ 的消费逻辑,希望对大家有所启发。

1 架构概览

在展开集群消费逻辑细节前,我们先对 RocketMQ 4.X 架构做一个概览。


勇哥大约 28 分钟RocketMQRocketMQ消息队列
RocketMQ 广播消费

这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念实现机制实战案例三个方面一一展开,希望能帮助到大家。

1 基础概念

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费

同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。


勇哥大约 7 分钟RocketMQRocketMQ消息队列
RocketMQ 主从同步

RocketMQ 主从复制是 RocketMQ 高可用机制之一,数据可以从主节点复制到一个或多个从节点。

这篇文章,我们聊聊 RocketMQ 的主从复制,希望大家读完之后,能够理解主从复制的精髓。

1 同步与异步

在 RocketMQ 的集群模式中,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。


勇哥大约 6 分钟RocketMQRocketMQ消息队列
2