这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
1 基础概念
消费幂等是指:当出现 RocketMQ 消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。
消费幂等是指:当出现 RocketMQ 消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。
谈起消息队列,内心还是会有些波澜。
消息队列,缓存,分库分表是高并发解决方案三剑客,而消息队列是我最喜欢,也是思考最多的技术。
我想按照下面的四个阶段分享我与消息队列的故事,同时也是对我技术成长经历的回顾。
Producer 消息生产者,负责产生消息,一般由业务系统负责产生消息。
Consumer 消息消费者,负责消费消息,一般是后台系统负责异步消费。
PushConsumer Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
PullConsumer
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
ProducerGroup
一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
ConsumerGroup
一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
Broker
消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。
广播消费
一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。
集群消费
一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那举每个实例只消费其中的 3 条消息。
顺序消息 消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要是指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。
Message Queue 在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。
大家好,我是勇哥 。
20231024 , 程序员节,圆了我一个小小的梦。
花了半年时间,我写了一本电子书 ,书名是:《RocketMQ4.X设计精要》,我想在今天分享给各位。
这本书一共包含十五章,接近 10 万字,180 张图,按照 RocketMQ 的知识体系一章一章展开。
RocketMQ 的网络通讯模块负责生产者、消费者与 Broker 之间的网络通信。
笔者学习 RocketMQ 也是从通讯模块源码开始的,并且从源码里汲取了很多营养。
客户端和服务端之间完成数据交互,需要约定数据协议。数据协议如下图:
NameServer 是专为 RocketMQ 设计的轻量级名字服务,它的源码非常精简,八个类 ,少于1000行代码。
这篇文章, 笔者会从基础概念、Broker发送心跳包、NameServer 维护路由、Zookeeper vs NameServer 四个模块揭秘名字服务的设计精髓。
这篇文章,我们从源码的角度探寻 RocketMQ Producer 的实现机制。
我们先展示生产者发送消息的示例代码。
// 1. 初始化默认生产者,传递参数生产者组名
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// 2. 设置名字服务地址
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
// 3. 启动生产者服务
producer.start();
// 4. 定义消息对象
Message msg = new Message(*TOPIC* /* Topic */,
*TAG* /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.*DEFAULT_CHARSET*) /* Message body */
);
msg.setKeys("");
// 5. 发送消息
// 示例普通消息
SendResult sendResult = producer.send(msg);
// 示例异步回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// do something
}
@Override
public void onException(Throwable e) {
// do something
}
});
// 示例oneway发送
producer.sendOneway(msg);
RocketMQ 优异的性能表现,必然绕不开其优秀的存储模型 。
这篇文章,笔者按照自己的理解 , 尝试分析 RocketMQ 的存储模型,希望对大家有所启发。
首先温习下 RocketMQ 架构。
RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。
这篇文章,笔者梳理了 RocketMQ 的消费逻辑,希望对大家有所启发。
在展开集群消费逻辑细节前,我们先对 RocketMQ 4.X 架构做一个概览。
这篇文章我们聊聊广播消费,因为广播消费在某些场景下真的有奇效。笔者会从基础概念、实现机制、实战案例三个方面一一展开,希望能帮助到大家。
RocketMQ 支持两种消息模式:集群消费
( Clustering )和广播消费
( Broadcasting )。
集群消费:
同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。