Rocketmq消息不丢失
一、前言
RocketMQ可以理解成一个特殊的存储系统,这个存储系统特殊之处数据是一般只会被使用一次,这种情况下,如何保证这个被消费一次的消息不丢失是非常重要的。本文将分析RocketMQ从哪些方面来保证消息的不丢失。
二、消息什么情况会丢失?
由于消息从生产者,到broker,最后被消费者消费,中间最少经历了3个应用,2次rpc调用,由于rpc调用会存在成功失败外的第三种情况,因此消息会存在不可靠性。
那么,我们有哪些手段来提升消息的可靠性呢?本文将分别从生产者端,消费者端,broker端来分析保证消息不丢失的手段。
三、如何保障消息不丢失?
3.1、生产者端
生产者发送消息,有同步发送和异步发送到Broker。
我们如果对消息可靠性要求比较高,我们可以选择同步发送。在RocketMQ的客户端,同步发送自带重试机制,如果同步模式发送失败,则轮转到下一个Broker。
如果重试都发送失败了怎么办呢?
这时候我们要考虑发送失败的兜底方案-业务系统自己实现,业务系统可以将消息存储起来,使用定时任务等机制来重发消息。
3.2、Broker端
作为Broker,他的主要职责就是将消息持久化存储起来,同时最少把消息投递到消费者端一次。
由于消息是存在磁盘上的,因此持久化机制就会涉及到刷盘机制。RocketMQ支持同步刷盘和异步刷盘机制。
RocketMQ处理发送消息请求时默认写入缓冲区,不会立即同步落盘,通过定时5s进行刷新落盘
SYNC_FLUSH,同步刷盘,刷盘完成再返回给客户端,超时5s
ASYNC_FLUSH,异步刷盘,200ms刷新一次,性能高
上面的机制可以保证存储可靠性,Broker除了保证存储消息可靠外,broker还需要保证消息能够投递给消费者消费一次。Broker如何保证消息一定会投递给消费者呢?
Broker端设计了重试机制。如果消息消费失败了,会将消息写到重试topic下的队列,会最大重试16次发送到消费者端。
如果16次之后,消息还是没有消费成功,Broker端会将消息写入死信队列。
3.3、消费者端
消息投递到了消费者端,消费者如果消费不成功,不能给broker端返回ack。一般需要设置为手动提交ack机制,消费者消费消息不成功,不返回CONSUME_SUCCESS,返回RECONSUME_LATER表示需要broker再次投递该消息。
这里需要注意的是,由于broker保证消息不丢失有重试机制,可能导致消息重复投递,因此消费者端需要做幂等性处理,一般会根据业务规则处理。
事务消息实现
腾讯云事务消息
RocketMQ 提供了事务消息的支持,使得在分布式系统中可以实现分布式事务的一致性。下面是事务消息的基本实现原理和使用方法:
- 实现原理:
- RocketMQ 的事务消息实现基于两阶段提交协议(Two-Phase-Commit Protocol)。发送事务消息时,首先向消息服务器发送半消息(Half Message),消息服务器会将半消息存储在 Broker 中,但不会立即投递给消费者。
- 然后消息发送方需要执行本地事务逻辑,在本地事务执行成功后,再通知 RocketMQ 服务端进行提交或回滚操作。
- 如果消息发送方在规定时间内未通知 RocketMQ 服务端进行提交或回滚操作,RocketMQ 会根据设定的超时策略来处理半消息。
- 如果消息发送方通知 RocketMQ 服务端进行提交操作,RocketMQ 服务端会将半消息标记为可投递状态,同时消息会被投递给消费者。如果通知进行回滚操作,RocketMQ 服务端会删除半消息。
- 事务消息的使用方法:
- 在 RocketMQ 中发送事务消息,需要实现事务监听器接口 TransactionListener,该接口包含两个方法:executeLocalTransaction 和 checkLocalTransaction。
- executeLocalTransaction 方法用于执行本地事务逻辑,在该方法中执行本地事务操作,并根据执行结果返回事务状态。
- checkLocalTransaction 方法用于检查本地事务状态,通常在该方法中根据本地事务的执行状态来判断是否提交或回滚事务。
- 发送事务消息时,需要指定事务监听器,在消息发送前会调用 executeLocalTransaction 方法执行本地事务,然后根据返回的事务状态来处理事务消息的提交或回滚。
- 使用示例:
// 实现事务监听器接口
public class TransactionListenerImpl implements TransactionListener {
// 执行本地事务逻辑
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑,根据执行结果返回事务状态
// 返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE 或 UNKNOW
}
// 检查本地事务状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态,根据本地事务的执行状态来判断是否提交或回滚事务
// 返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE 或 UNKNOW
}
}
// 发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, transactionListener);
以上是关于 RocketMQ 事务消息的基本实现原理和使用方法。在使用事务消息时,需要确保本地事务的一致性和可靠性,并根据本地事务的执行结果来处理事务消息的提交或回滚。
客服分配示例
- 本地数据库分配
- 分配成功后执行发送消息逻辑
- 发送失败回滚本地事务
检查本地事务
@Slf4j
@Component
public class AcdMessageLocalTransactionChecker implements LocalTransactionChecker {
@Autowired
private IChatSessionService chatSessionService;
/**
* 回查本地事务,Broker回调Producer,将未结束的事务发给Producer,由Producer来再次决定事务是提交还是回滚
*
* @param msg 消息
* @return {@link TransactionStatus} 事务状态, 包含提交事务、回滚事务、未知状态
*/
@Override
public TransactionStatus check(Message msg) {
try {
PushNoteMessage pushNoteMessage = MessageBodyUtils.convertByteArray2Object(msg.getBody(), PushNoteMessage.class);
log.info("acdMessageLocalTransactionChecker, message: {}", JSON.toJSONString(pushNoteMessage));
if (pushNoteMessage == null) {
return TransactionStatus.RollbackTransaction;
}
String body = pushNoteMessage.getBody();
if (StringUtils.isBlank(body)) {
return TransactionStatus.RollbackTransaction;
}
AcdMessage acdMessage = JSON.parseObject(body, AcdMessage.class);
if (acdMessage == null) {
return TransactionStatus.RollbackTransaction;
}
boolean checkResult = chatSessionService.checkSessionState(acdMessage.getSessionId());
log.info("acdMessageLocalTransactionChecker, checkResult: {}, {}", acdMessage.getSessionId(), checkResult);
return checkResult ? TransactionStatus.CommitTransaction : TransactionStatus.RollbackTransaction;
} catch (Exception e) {
log.error("acdMessageLocalTransactionChecker error");
}
return TransactionStatus.RollbackTransaction;
}
}
订单示例
好的,让我举一个更具体的例子来说明 RocketMQ 事务消息的使用方法。
假设我们有一个在线商城系统,用户下单后需要扣减库存和生成订单。下单过程需要保证库存扣减和订单生成的一致性,这时可以使用 RocketMQ 事务消息来实现。
首先,我们需要实现事务监听器接口 TransactionListener,并在其中编写本地事务的逻辑。
public class OrderTransactionListener implements TransactionListener {
// 执行本地事务逻辑
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 扣减库存
boolean deductionResult = deductInventory(msg);
// 生成订单
boolean orderResult = generateOrder(msg);
if (deductionResult && orderResult) {
// 本地事务执行成功,提交事务
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务执行失败,回滚事务
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 出现异常,回滚事务
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 检查本地事务状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据订单状态表或其他方式检查本地事务的执行状态
// 如果订单状态为已完成,则提交事务,否则回滚事务
if (checkOrderStatus(msg)) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 扣减库存
private boolean deductInventory(Message msg) {
// 执行库存扣减逻辑
// 返回扣减结果
}
// 生成订单
private boolean generateOrder(Message msg) {
// 执行订单生成逻辑
// 返回生成结果
}
// 检查订单状态
private boolean checkOrderStatus(MessageExt msg) {
// 查询订单状态表或其他方式检查订单是否已完成
// 返回订单状态
}
}
然后,在发送订单的时候,我们将消息发送为事务消息:
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("your_group_name");
// 设置NameServer地址
producer.setNamesrvAddr("your_namesrv_addr");
// 设置事务监听器
producer.setTransactionListener(new OrderTransactionListener());
try {
// 启动事务消息生产者
producer.start();
// 构造订单消息
Message orderMessage = new Message("OrderTopic", "OrderTag", "order_id", orderData.getBytes());
// 发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(orderMessage, null);
// 处理发送结果
if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
// 事务提交成功
System.out.println("订单发送成功");
} else {
// 事务提交失败或未知状态
System.out.println("订单发送失败");
}
} catch (Exception e) {
// 发送异常,处理错误
e.printStackTrace();
} finally {
// 关闭事务消息生产者
producer.shutdown();
}
上述代码中,我们创建了一个事务消息生产者 TransactionMQProducer,并设置了事务监听器为 OrderTransactionListener。然后,我们构造订单消息并调用 sendMessageInTransaction 方法发送事务消息。发送结果通过 TransactionSendResult 对象获取,根据事务状态来判断事务是否提交成功。
通过以上示例,你可以看到使用 RocketMQ 事务消息可以很方便地实现分布式事务的一致性。当本地事务执行成功后,再根据本地事务状态来提交或回滚事务消息。这样可以保证订单库存扣减和订单生成的一致性。
如何保证顺序
分区顺序消息
对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
- 适用场景
- 适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
- 示例
- 用户注册需要发送验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
阿里巴巴集团内部电商系统均使用分区顺序消息,既保证业务的顺序,同时又能保证业务的高性能。
全局顺序消息
对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景
- 适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
- 示例
- 在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
如何实现顺序消息
全局顺序消息和分区顺序消息原理一样,下文以分区顺序消息为例介绍在云消息队列 RocketMQ 版中如何保证消息收发的顺序。
在云消息队列 RocketMQ 版中,消息的顺序需要由以下三个阶段保证:
- 消息发送
- 如上图所示,A1、B1、A2、A3、B2、B3是订单A和订单B的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单A的消息发送和消费都按照A1、A2、A3的顺序。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时云消息队列 RocketMQ 版支持将Sharding Key相同(例如同一订单号)的消息序路由到一个队列中。
- 云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照同一生产者发送消息的时序。不同生产者、不同线程并发产生的消息,云消息队列 RocketMQ 版服务端无法判定消息的先后顺序。
- 消息存储
- 如上图所示,顺序消息的Topic中,每个逻辑队列对应一个物理队列,当消息按照顺序发送到Topic中的逻辑队列时,每个分区的消息将按照同样的顺序存储到对应的物理队列中。
- 消息消费
- 云消息队列 RocketMQ 版按照存储的顺序将消息投递给Consumer,Consumer收到消息后也不对消息顺序做任何处理,按照接收到的顺序进行消费。
- Consumer消费消息时,同一Sharding Key的消息使用单线程消费,保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致。
场景
- 我们发送事件给客户端通过sessionId分区做到一个消息发送到一个队列中保证顺序。
延时队列
阿里云死信消息
说明:rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时
具体实现:rocketmq发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早
流程图
源码分析:
如果想要深入了解的可以看一下ScheduleMessageService这个类
delayLevelTable定义了延迟级别和延迟时间的对应关系,offsetTable存放延延迟级别对应的队列消费的offset
使用timer定时器启动了一个定时任务,把每个扫描队列封装成一个任务,然后加入到timer中
每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除
总结
优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率
死信队列
阿里云死信消息
对于消费失败且重试后依然失败的消息,云消息队列 RocketMQ 版不会立丢弃,而是将消息转发至指定的队列中,即死信队列,这些消息即为死信消息。当消费失败的原因排查并解决后,您可以重发这些死信消息,让消费者重新消费;若您暂时无法处理这些死信消息,为避免到期后死信消息被删除,您也可以先将死信消息导出进行保存。
特性说明
死信消息具有以下特性:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,默认为3天,3天后会被自动删除。因此,请在死信消息产生后的3天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。
- 如果一个Group ID未产生死信消息,云消息队列 RocketMQ 版不会为其创建相应的死信队列。
- 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。
云消息队列 RocketMQ 版控制台提供对死信消息的查询、导出和重发的功能。
消息堆积如何处理
- 止损:先看下堆积具体影响的业务场景,如果影响较大,寻找止损方案,例如是否可以增加消费者解决?
- 寻找根因
- 思考下次是否还会出现堆积
处理消息堆积的方法
消息堆积是指消息在消息队列中积压过多,导致消费者无法及时消费的情况。处理消息堆积的方法可以从以下几个方面考虑:
- 增加消费者数量:可以增加消费者的数量来提高消息消费的速度,从而减少消息堆积的情况。
- 提高消费者的消费能力:可以通过优化消费者的消费逻辑、提升消费者的处理能力来提高消息消费的速度。
- 调整消息处理的并发度:可以根据实际情况调整消息处理的并发度,通过增加并发处理的线程数或者调整线程池的参数来提高消息处理的速度。
- 增加消息队列的容量:可以通过增加消息队列的容量来减少消息堆积的情况。需要注意的是,增加容量可能会增加系统的负载,需要根据实际情况进行权衡。
- 设置消息的过期时间:可以根据业务需求设置消息的过期时间,过期的消息可以直接丢弃,避免消息堆积的情况。
- 监控消息堆积情况:可以通过监控系统定时检查消息队列中的消息堆积情况,及时发现问题并采取相应的处理措施。
- 异常消息处理:对于消费失败的消息,可以进行异常处理,例如重试、记录错误日志等,以确保消息能够被正常消费。
需要根据具体的业务需求和系统情况选择合适的处理方法,综合考虑系统性能、数据一致性和可用性等因素。
具体场景举例
- 我们有个储存会话的业务,是异步保存到ES的,这块堆积之后影响根据ES查询会话将会出现延迟,刚开始有些C端业务场景也适用了ES查询的方式,但是经常出现延迟情况,后来改成了查询mysql,只有B端业务采用了ES查询的方式.
- 当时紧急止损是先通过调整代码,改为MySQL查询,无法通过增加消费者处理,主要原因是有个DTS(canel)吞吐量瓶颈。
- 后期优化同步逻辑,增加吞吐量。
- 产生的原因:业务量徒增,当时有个营销短信异常发送到好多用户,然后引来好多用户进线咨询。
消息幂等如何处理
概念
幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。
消息重复出现的场景
发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者生产者宕机,导致服务端对生产者应答失败。 如果此时生产者 Producer 意识到消息发送失败并尝试再次发送消息,消费者 Consumer 后续会收到两条内容相同的消息。
投递时消息重复 消息消费的场景下,消息已投递到消费者 Consumer 并完成业务处理,当消费者给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者 Consumer 后续会收到两条内容相同的消息;
负载均衡时消息重复 当消息队列的服务端或消费者重启、扩容或缩容时,都有可能会触发 rebalance,此时消费者 Consumer 可能会收到重复消息。
实际项目遇到的场景
- 我们有个消费MQ落数据的场景经常在重启过程出现主键冲突,当然这个是数据库层面会直接抛出异常,算是天然幂等也没有做特殊处理。
- 创建会话场景:客户端经常会重复请求connect可以理解为创建会话,这样导致就会出现创建多个会话。
- 分配客服:
- 由于分配客服会网络超时,经常会重复请求。(dubbo超时设置调整)
- 前端存在多次点击,增加分布式锁解决。
解决方式
- 悲观锁,select for update
- 乐观锁
- 分布式锁,通过核心参数设置
MQ 消息怎么路由?
消息路由
在RocketMQ的系统架构里,由于服务器端(Broker)会根据实时压力实施弹性扩缩容等发生变动,客户端为了做负载均衡,就需要有注册中心来提供Broker的信息:
注册中心的作用是及时发现Broker服务器的变化,并将存活的Broker信息返回给客户端做负载均衡。
获取Topic
获取路由信息函数
// DefaultMQProducerImpl#tryToFindTopicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
发送消息前,必须先从注册中心里获取Broker服务器信息,包括Topic、队列、IP,然后采取负载均衡算法发送消息。
常见的负载均衡算法:
1.轮询法:将请求按照顺序轮流地分配到各个服务器上。
2.加权轮询法:在轮询算法的基础上添加了权重的条件
3.随机法
4.加权随机法
5.最小连接法:哪个服务器的连接数少,就分配给哪个服务器新的请求
6.哈希法:计算哈希值,映射到服务器上
tryToFindTopicPublishInfo
/**
根据topic获取路由信息
@param topic
@return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 1 先从本地 topicPublishInfoTable 中获取路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 2 路由信息或 messageQueueList 为空
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 2.1 添加空路由对象
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 2.2 更新路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);// 2.3 从更新后的路由表中获取路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 2.4 获取到了就返回
if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())){
return topicPublishInfo;
} else {
// 3 没有获取到路由信息则从注册中心获取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
从上面的源码可以看出获取路由信息的步骤如下:
1.先从本地topicPublishInfoTable中获取路由信息
2.如果路由信息或messageQueueList为空,则尝试本地更新一下路由信息
3.本地更新PublishInfo路由信息,并尝试获取
4.如果此时能获取到路由信息了,则返回TopicPublishInfo对象
5.本地无法获取到路由信息,则从注册中心尝试获取并更新本地缓存
Topic 路由信息表
上述过程的第一步就是获取路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
其中路由信息存储在TopicPublishInfo对象里:
各个字段含义如下:
? orderTopic:Topic是否支持排序 ? haveTopicRouterInfo:是否存在路由信息 ? messageQueueList:消息队列List ? sendWhichQueue:生产者发送消息到哪个队列的索引 ? topicRouteData:路由数据,包括队列、Broker地址、Broker数据
此外,TopicPublishInfo类还提供了选择某个队列发送消息的默认负载均衡策略:
/**
* 默认【轮询】策略选择一个MessageQueue
*
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
/**
* 选择一个消息队列
*
* @return
*/
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
从上面代码可以看出,默认的选择策略是采用轮询的方法:
lastBrokerName == null时,说明在此之前还没有进行过选择,直接返回第一个可用的消息队列 lastBrokerName != null时,且当前轮询到的消息队列不是上一次使用的,则返回当前队列,否则轮询至下一个
更新路由信息 两个子方法 根据tryToFindTopicPublishInfo的源码,接下来会进行更新路由信息的步骤,访问的主要是MQClientInstance类下的updateTopicRouteInfoFromNameServer方法,该方法又调用了两个关键的方法,分别是topicRouteData2TopicPublishInfo和topicRouteData2TopicSubscribeInfo
- topicRouteData2TopicPublishInfo方法的作用是将TopicRouteData类转换成TopicPublishInfo,并过滤掉Master挂了的Slave的MessageQueue
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);// 如果指定了Topic的Queue的发送顺序if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() 0) {// 解析配置文件,创建消息队列
String[] brokers = route.getOrderTopicConf().split(";");for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);}}// 设置Topic是有序的(消息的发送顺序按配置来)
info.setOrderTopic(true);} else {
List<QueueData qds = route.getQueueDatas();
Collections.sort(qds);// 找到每个QueueData的BrokerDatafor (QueueData qd : qds) {if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;for (BrokerData bd : route.getBrokerDatas()) {if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;break;}}if (null == brokerData) {continue;}// 如果BrokerData中没有Master节点id,可能Master挂了,此时不处理消息if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}// 创建消息队列for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);}}}// 设置Topic消息发送是无序的
info.setOrderTopic(false);}return info;}
topicRouteData2TopicSubscribeInfo方法作用是提取TopicRouteData内的QueueData字段,生成消息队列,也就是订阅了该Topic的队列
public static Set<MessageQueue topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue mqList = new HashSet<MessageQueue();
List<QueueData qds = route.getQueueDatas();for (QueueData qd : qds) {// QueueData是否可读,只有是可读的才能被订阅if (PermName.isReadable(qd.getPerm())) {for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);}}}return mqList;}
介绍完了updateTopicRouteInfoFromNameServer方法里调用的两个子方法之后,下面就来看一下updateTopicRouteInfoFromNameServer的代码。
updateTopicRouteInfoFromNameServer
更新路由信息是消息投递过程中非常重要的一环,为了防止并发修改注册信息导致数据不一致,这里使用了ReentrantLock可重入锁。
对于路由消息,就需要注意它可能不存在这种情况
- 路由消息不存在
第一次访问时,生产者还没有在Broker中创建Topic和消息队列时会发生,此时的解决方案是:如果满足isDefault && defaultMQProducer != null,则使用默认Topic来获取路由消息TopicRouteData
由上面两张图可以清楚看到,默认Topic名称为TBW102
但如果默认主题获取到的TopicRouteData实例为空呢?此时就要根据Topic名称从注册中心查询了,如果还查询不出来的话就会返回false
- 路由消息不存在,但是从注册中心获取到了
此时就需要判断本地的路由表和注册中心获取到的路由信息是否有差异,如果差异存在话就把本地路由信息更新为最新版本
上面所有文字部分对应的源码如下:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {try {
TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {// 使用默认的TopicKey尝试获取TopicRouteData// 当Broker开启自动创建Topic时,会自动进行创建
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {// 判断本地路由表存放的信息和远端注册中心存放的信息
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {// 克隆的原因是topicRouteData要被设置到下面的publishInfo和subscribeInfo里
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();// 更新Broker相关信息,当某个Broker心跳超时后,会被从BrokerData的BrokerAddrs中移除// brokerAddrTable也存有Slave的BrokerAddrfor (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Map.Entry<String, MQProducerInner it = this.producerTable.entrySet().iterator();while (it.hasNext()) {
Map.Entry<String, MQProducerInner entry = it.next();
MQProducerInner impl = entry.getValue();if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{
Set<MessageQueue subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Map.Entry<String, MQConsumerInner it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {
Map.Entry<String, MQConsumerInner entry = it.next();
MQConsumerInner impl = entry.getValue();if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}
log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);}} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;}
消息过滤
功能介绍
消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。
应用场景
通常,一个 Topic 中存放的是相同业务属性的消息,例如交易流水 Topic 包含了下单流水、支付流水、发货流水等,业务若只想消费者其中一种类别的流水,可在客户端进行过滤,但这种过滤方式会带来带宽的资源浪费。
针对上述场景,TDMQ 提供 Broker 端过滤的方式,用户可在生产消息时设置一个或者多个 Tag 标签,消费时指定 Tag 订阅。
使用方式
TAG 过滤
发送消息
发送消息时,每条消息必须指明 Tag。
Message msg = new Message("TOPIC","TagA","Hello world".getBytes());
订阅消息
订阅所有 Tag:消费者如需订阅某 Topic 下所有类型的消息,Tag 用星号()表示。
* consumer.subscribe("TOPIC", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
订阅单个 Tag:消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag。
consumer.subscribe("TOPIC", "TagA", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
订阅多个 Tag:消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用两个竖线(||)分隔。
consumer.subscribe("TOPIC", "TagA||TagB", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
SQL 过滤
发送消息
发送代码和简单的消息没有区别 主要是在构造消息体的时候,带上自定义属性,允许多个。
int totalMessagesToSend = 5;
for (int i = 0; i < totalMessagesToSend; i++) {
Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
msg.putUserProperty("key1","value1");
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}
订阅消息
对于消费消息,主要是订阅的时候,带上对应的SQL表达式,其他的和普通的消费消息流程没有区别。
//订阅所有消息
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));
// 订阅topic 订阅单个key的sql,最常用
//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));
//订阅多个属性
//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 消息处理逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费, 根据消费情况,返回处理状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();
参考文档:
阿里云RocketMQ文档:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/user-guide/query-a-message-trace?spm=a2c4g.11186623.0.0.74fb245dkXSt9l
腾讯云RocketMQ文档:https://cloud.tencent.com/document/product/1493/61583
本文暂时没有评论,来添加一个吧(●'◡'●)