序
本文主要研究一下carrera的RocketMQProduceOffsetFetcher
RocketMQProduceOffsetFetcher
DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java
public class RocketMQProduceOffsetFetcher {
? private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class);
?
? private DefaultMQAdminExt defaultMQAdminExt;
?
? private DefaultMQPullConsumer defaultMQPullConsumer;
?
? private String namesrvAddr;
?
? public RocketMQProduceOffsetFetcher(String namesrvAddr) {
? ? ? this.defaultMQAdminExt = new DefaultMQAdminExt();
? ? ? defaultMQAdminExt.setNamesrvAddr(namesrvAddr);
? ? ? defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
?
? ? ? this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
? ? ? defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
? ? ? defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
? ? ? this.namesrvAddr = namesrvAddr;
? }
?
? public String getNamesrvAddr() {
? ? ? return namesrvAddr;
? }
?
? public void start() throws MQClientException {
? ? ? defaultMQAdminExt.start();
? ? ? defaultMQPullConsumer.start();
? ? ? defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
? }
?
? public void shutdown() {
? ? ? defaultMQAdminExt.shutdown();
? ? ? defaultMQPullConsumer.shutdown();
? }
?
? public ConsumeStats getConsumeStats(String group, String topic) throws Exception {
? ? ? return defaultMQAdminExt.examineConsumeStats(group, topic);
? }
?
? public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
? ? ? return defaultMQAdminExt.examineTopicStats(topic);
? }
?
? public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
? ? ? return defaultMQPullConsumer.pull(mq, "*", offset, 1);
? }
}
- RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
- 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
- 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)
DefaultMQAdminExt
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
? private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
? private String adminExtGroup = "admin_ext_group";
? private String createTopicKey = MixAll.DEFAULT_TOPIC;
? private long timeoutMillis = 5000;
?
? //......
?
? @Override
? public ConsumeStats examineConsumeStats(String consumerGroup,
? ? ? String topic) throws RemotingException, MQClientException,
? ? ? InterruptedException, MQBrokerException {
? ? ? return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
? }
?
? @Override
? public TopicStatsTable examineTopicStats(
? ? ? String topic) throws RemotingException, MQClientException, InterruptedException,
? ? ? MQBrokerException {
? ? ? return defaultMQAdminExtImpl.examineTopicStats(topic);
? }
?
? //......
}
- examineConsumeStats及examineTopicStats都委托给了defaultMQAdminExtImpl
DefaultMQAdminExtImpl
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
? private final Logger log = ClientLogger.getLog();
? private final DefaultMQAdminExt defaultMQAdminExt;
? private ServiceState serviceState = ServiceState.CREATE_JUST;
? private MQClientInstance mqClientInstance;
? private RPCHook rpcHook;
? private long timeoutMillis = 20000;
? private Random random = new Random();
?
? //......
?
? @Override
? public ConsumeStats examineConsumeStats(String consumerGroup,
? ? ? String topic) throws RemotingException, MQClientException,
? ? ? InterruptedException, MQBrokerException {
? ? ? String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic;
? ? ? TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic);
? ? ? ConsumeStats result = new ConsumeStats();
?
? ? ? for (BrokerData bd : topicRouteData.getBrokerDatas()) {
? ? ? ? ? String addr = bd.selectBrokerAddr();
? ? ? ? ? if (addr != null) {
? ? ? ? ? ? ? ConsumeStats consumeStats =
? ? ? ? ? ? ? ? ? this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
? ? ? ? ? ? ? result.getOffsetTable().putAll(consumeStats.getOffsetTable());
? ? ? ? ? ? ? double value = result.getConsumeTps() + consumeStats.getConsumeTps();
? ? ? ? ? ? ? result.setConsumeTps(value);
? ? ? ? ? }
? ? ? }
?
? ? ? if (result.getOffsetTable().isEmpty()) {
? ? ? ? ? throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
? ? ? ? ? ? ? "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
? ? ? }
?
? ? ? return result;
? }
?
? @Override
? public TopicStatsTable examineTopicStats(
? ? ? String topic) throws RemotingException, MQClientException, InterruptedException,
? ? ? MQBrokerException {
? ? ? TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
? ? ? TopicStatsTable topicStatsTable = new TopicStatsTable();
?
? ? ? for (BrokerData bd : topicRouteData.getBrokerDatas()) {
? ? ? ? ? String addr = bd.selectBrokerAddr();
? ? ? ? ? if (addr != null) {
? ? ? ? ? ? ? TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
? ? ? ? ? ? ? topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
? ? ? ? ? }
? ? ? }
?
? ? ? if (topicStatsTable.getOffsetTable().isEmpty()) {
? ? ? ? ? throw new MQClientException("Not found the topic stats info", null);
? ? ? }
?
? ? ? return topicStatsTable;
? }
?
? //......
} ? ?
- examineConsumeStats方法通过examineTopicRouteInfo(queryTopic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3)获取consumeStats;examineTopicStats方法也是先通过examineTopicRouteInfo(topic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis)获取topicStatsTable
小结
- RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
- 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
- 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)
doc
- RocketMQProduceOffsetFetcher
本文暂时没有评论,来添加一个吧(●'◡'●)