网站首页 > 编程文章 正文
本篇开始,我们将逐步介绍消息是如何被客户端消费的,在被消费的过程中,客户端以及Broker端都经历了哪些环节,各自付出了怎么样的努力,才保障消息能被顺利的消息,来最终完成一条消息的使命。
本篇开端
我们以一段构建消费者的代码开始本篇文章。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
复制代码
这是一段很常见的代码,主要内容就是创建消费者并制定组名、设置NameServer地址、订阅Topic、注册回调业务处理器、启动消费者。
这段代码默认使用了DefaultMQPushConsumer,这个也是RocketMQ推荐使用的构建消费者实例。从名称来看Push是一个推类型的消费者,但是内部依赖是采用了拉模式,置于为何内部是拉模式,这一点后面文章中会详细的介绍到。
当调用DefaultMQPushConsumer的start方法时,消费者开始启动工作了,在获取到消息的时候,会主动回调业务监听器,进行业务上面的处理。
本篇文章,我们将探讨start方法背后的故事,看一下客户端在拉取消息之前,都做了哪些准备工作,来支撑客户端的运行。
消费者客户端内部类关系图
我们先来整体看一下涉及客户端消费几个重要的类以及他们之间的构成关系,以图的形式展示一下,先有个整体的认知印象,然后在拆开来一个一个从源码的角度介绍一下具体的内容。
图中箭头指引的方向表示存在内部引用关系。
图片罗列了客户端涉及的重要类以及主要功能介绍,接下来我们以源码的角度,开始从start方法入手,深入分析一下具体的功能实现,一步一步拆解客户端在拉取消息前的准备工作。
源码解析篇
DefaultMQPushConsumer的start方法内部调用的其实是DefaultMQPushConsumerImpl的start方法,DefaultMQPushConsumer内部持有一个DefaultMQPushConsumerImpl的实例,在构造方法中创建。
traceDispatcher这个涉及轨迹的记录,不是本篇分析的内容,知晓一下就可以。
DefaultMQPushConsumerImpl的start方法涉及的内比较多,内部封装了复杂的业务逻辑,串联了消费者客户端涉及的各个子模块。
从DefaultMQPushConsumerImpl中衍生出了几个重要的类,比如MQClientInstance、RebalanceImpl、pullAPIWrapper、OffsetStore、ConsumeMessageService。这几个类的创建以及初始化就代表着客户端在拉取消息前做的准备工作。
我们重点介绍一下MQClientInstance类。
MQClientInstance介绍
MQClientInstance的创建是根据客户端的clientId来判断是否应该创建新的实例还有获取已经存在的实例。
因为clientId的不同,MQClientInstance存在一个或者多个。一个MQClientInstance实例封装的逻辑是很重的,之所以设计成多个可能是考虑到各个消费者或者生产者不互相影响,也可能涉及到不同NameServer集群不同Broker集群在同一客户端实现的可能性。
factoryTable结构的定义如下,是一个ConcurrentMap。
clientId的的构成主要是由ClientIP以及InstanceName构成,而在DefaultMQPushConsumerImpl中start方法中介绍了InstanceName的构成。
MQClientInstance内部也涉及start方法,用于处理更加底层的一些服务,涉及到网络通信的API、定时调度任务、启动消息拉取服务、启动队列分配服务、启动内部生产者。
分析一下startScheduledTask涉及的调度服务都有哪些,从上到下一次是:
- 如果没有在启动客户时设置NameServer地址,则会从一个域名地址以HTTP GET方式请求NameServer地址。
- 从NameServer定时更新Topic的路由信息,涉及消费者和生产者。
- 清除离线的Broker服务器以及想Broker服务器发送心跳包。
- 持久化消费者offset,广播模式存储在本地,集群模式存储在远程Broker上面
- 动态调整线程池线程数,暂时没有实现。
这些调度服务伴随着客户端的启动而启动,也属于是拉取消息前准备工作的一部分。
本篇总结
本篇主要介绍了消息消费者在拉取消息前做的一些准备工作,准备工作主要是在DefaultMQPushConsumerImpl中来完成的,这个类伴随着DefaultMQPushConsumer的创建而创建,内部封装了消费者客户端涉及的方方面面,要分析消费者实现源码可以着重从这里入手。
随着内部各个模块均已初始化启动,客户端拉取消息的准备工作完成。消费者可以开始执行消息拉取的流程了。拉取消息的流程主要涉及PullMessageService、PullMessageService、RebalanceImpl这几个类,我们在下一篇文章中重点分析一下。
来源:
https://juejin.cn/post/7139008656233725960
猜你喜欢
- 2025-06-12 从零开始搭建AI网站(6):如何使用响应式编程
- 2025-06-12 Windows下Ollama安装目录迁移到D盘
- 2025-06-12 RocketMQ的偏移量更新原理(rocketmq迁移)
- 2025-06-12 阿里p8手把手教学,如何写出简洁又规范的单元测试?
- 2025-06-12 Seata源码—7.Seata TCC模式的事务处理
- 2025-06-12 Seata源码—4.全局事务拦截与开启事务处理二
- 2025-06-12 Seata源码—3.全局事务注解扫描器的初始化一
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理二
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理三
- 2025-06-12 写出更优雅的代码:搞懂 Python 协议与抽象基类的核心区别
你 发表评论:
欢迎- 06-1540套Solidworks草图素材分享,全部画出来你的草图模块就没问题了
- 06-15solidworks零件练习(法兰盘)(solidworks中的法兰是什么意思)
- 06-15solidworks零件拆图练习轴承(如何运用solidworks制作轴承)
- 06-15solidworks钣金练习25.6.11(solidworks钣金技巧)
- 06-1533篇 Solidworks公差查询(solidworks公差表)
- 06-15Solidworks的入门学习方法(solidworks入门基础教程视频)
- 06-15用SolidWorks画一个长相奇特的杯子
- 06-1540篇 Solidworks插件-迈迪工具集(solidworks2016迈迪插件)
- 最近发表
-
- 40套Solidworks草图素材分享,全部画出来你的草图模块就没问题了
- solidworks零件练习(法兰盘)(solidworks中的法兰是什么意思)
- solidworks零件拆图练习轴承(如何运用solidworks制作轴承)
- solidworks钣金练习25.6.11(solidworks钣金技巧)
- 33篇 Solidworks公差查询(solidworks公差表)
- Solidworks的入门学习方法(solidworks入门基础教程视频)
- 用SolidWorks画一个长相奇特的杯子
- 40篇 Solidworks插件-迈迪工具集(solidworks2016迈迪插件)
- 用SolidWorks画一个夹具,所有零件都在装配体里画的
- SolidWorks快速【正视于】的5种方法
- 标签列表
-
- spire.doc (70)
- instanceclient (62)
- solidworks (78)
- system.data.oracleclient (61)
- 按键小精灵源码提取 (66)
- pyqt5designer教程 (65)
- 联想刷bios工具 (66)
- c#源码 (64)
- graphics.h头文件 (62)
- mysqldump下载 (66)
- libmp3lame (60)
- maven3.3.9 (63)
- 二调符号库 (57)
- git.exe下载 (68)
- diskgenius_winpe (72)
- pythoncrc16 (57)
- solidworks宏文件下载 (59)
- qt帮助文档中文版 (73)
- satacontroller (66)
- hgcad (64)
- bootimg.exe (69)
- android-gif-drawable (62)
- axure9元件库免费下载 (57)
- libmysqlclient.so.18 (58)
- springbootdemo (64)
本文暂时没有评论,来添加一个吧(●'◡'●)