网站首页 > 编程文章 正文
引言
RocketMQ的消息是存储于指定的MQ队列中,而消费端在消费消息时,也消费端在处理消息时,一个MQ队列,也只会被一个消费端订阅,同一个消费端可以处理同一个topic下的多个队列,当订阅的队列中有数据时,就会将获取到的数据提交到消费线程池进行处理,处理完成后,进行更新每个消费组对应的topic的偏移量,那在异步更新的逻辑中如何保证这个偏移量的值的顺序呢?
分析
当前主要集中于源代码 ConsumeRequest中的 run方法,因为ConsumeRequest是对于消息消费的封装。在消息消费成功会直接调用如下的代码:
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
processConsumeResult: 主要功能就是处理当前消息的执行结果,这里会根据处理的消息结果是否成功来决定如何处理消息。当前只分析成功的情况,会执行以下的代码
// 获取当前消息的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
// 当前的队列没有移除,则进行更新偏移量
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新的偏移量,会进行比较,以最大值写入,最后统一由具体的定时任务进行提交
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
执行到最后,会调用
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset来更新偏移量,那具体的逻辑就集中于updateOffset方法,由 RemoteBrokerOffsetStore来实现:
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
RemoteBrokerOffsetStore维护了一个变量 offsetTable来处理每个MQ队列的偏移量,当处理完成一个消息队列的消息时,首先获取当前队列对应的MQ对应的值,如果没有则直接设置。否则确认是否根据increaseOnly字段来递增更新当前的值,从 processConsumeResult中知道,increaseOnly 为true, 则调用
MixAll.compareAndIncreaseOnly来更新值。
public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {
long prev = target.get();
// 循环确认,当前的值是否大于已经存在的值
while (value > prev) {
boolean updated = target.compareAndSet(prev, value);
if (updated)
return true;
prev = target.get();
}
return false;
}
如此之后,就更新到对应的MQ的偏移量了。
上面的流程,仅是将数据更新到内存,并未进行持久化,那是如何触发的呢?
其实在 MQClientInstance实例启动的时候,会启动一个定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
该任务在启动后10,每5秒钟执行一次 persistAllConsumerOffset方法,同时会执行对应消费端方法 persistConsumerOffset,而实现类为
DefaultMQPushConsumerImpl.
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
该方法会将分配的每个队列的偏移量同步至MQ服务端。
注意点
- 当一个队列因为rebalance,而不在当前的消费端时,有可能当前的偏移量不会被更新到服务端,导致该消息会被重新被新的消费端所消费。
- 因为偏移量是先更新到内存,再通过定时任务更新至服务端,所以也有可能因为消费端的宕机或是重启,也有可能导致偏移量数据的更新失败,从而导致消息被重复消费。
猜你喜欢
- 2025-06-12 从零开始搭建AI网站(6):如何使用响应式编程
- 2025-06-12 Windows下Ollama安装目录迁移到D盘
- 2025-06-12 阿里p8手把手教学,如何写出简洁又规范的单元测试?
- 2025-06-12 Seata源码—7.Seata TCC模式的事务处理
- 2025-06-12 Seata源码—4.全局事务拦截与开启事务处理二
- 2025-06-12 Seata源码—3.全局事务注解扫描器的初始化一
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理二
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理三
- 2025-06-12 写出更优雅的代码:搞懂 Python 协议与抽象基类的核心区别
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理一
你 发表评论:
欢迎- 06-13边缘计算网关如何实现系统高效运维?
- 06-13虚拟专用网络VPN连接配置
- 06-13WIN10/11下配置VPN(解决L2TP/IPsec无法连接的问题)
- 06-13公共网络上的私有安全通道——VPN
- 06-13远程办公如何访问公司内网办公系统和内部资源?
- 06-13VBRAS场景测试方法—如何高效验证网络设备的性能与稳定性
- 06-13Win10系统如何使用VPN远程办公
- 06-13iPhone轻松实现远程访问公司局域网电脑上的共享文件
- 最近发表
- 标签列表
-
- spire.doc (70)
- instanceclient (62)
- system.data.oracleclient (61)
- 按键小精灵源码提取 (66)
- pyqt5designer教程 (65)
- 联想刷bios工具 (66)
- c#源码 (64)
- graphics.h头文件 (62)
- mysqldump下载 (66)
- sqljdbc4.jar下载 (56)
- libmp3lame (60)
- maven3.3.9 (63)
- 二调符号库 (57)
- git.exe下载 (68)
- diskgenius_winpe (72)
- pythoncrc16 (57)
- solidworks宏文件下载 (59)
- qt帮助文档中文版 (73)
- satacontroller (66)
- hgcad (64)
- bootimg.exe (69)
- android-gif-drawable (62)
- axure9元件库免费下载 (57)
- libmysqlclient.so.18 (58)
- springbootdemo (64)
本文暂时没有评论,来添加一个吧(●'◡'●)