程序员开发实例大全宝库

网站首页 > 编程文章 正文

聊聊carrera的RocketMQProduceOffsetFetcher

zazugpt 2024-08-13 13:15:23 编程文章 25 ℃ 0 评论

本文主要研究一下carrera的RocketMQProduceOffsetFetcher



RocketMQProduceOffsetFetcher

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java

public class RocketMQProduceOffsetFetcher {
 ?  private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class);
?
 ?  private DefaultMQAdminExt defaultMQAdminExt;
?
 ?  private DefaultMQPullConsumer defaultMQPullConsumer;
?
 ?  private String namesrvAddr;
?
 ?  public RocketMQProduceOffsetFetcher(String namesrvAddr) {
 ? ? ?  this.defaultMQAdminExt = new DefaultMQAdminExt();
 ? ? ?  defaultMQAdminExt.setNamesrvAddr(namesrvAddr);
 ? ? ?  defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
?
 ? ? ?  this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
 ? ? ?  defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
 ? ? ?  defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
 ? ? ?  this.namesrvAddr = namesrvAddr;
 ?  }
?
 ?  public String getNamesrvAddr() {
 ? ? ?  return namesrvAddr;
 ?  }
?
 ?  public void start() throws MQClientException {
 ? ? ?  defaultMQAdminExt.start();
 ? ? ?  defaultMQPullConsumer.start();
 ? ? ?  defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
 ?  }
?
 ?  public void shutdown() {
 ? ? ?  defaultMQAdminExt.shutdown();
 ? ? ?  defaultMQPullConsumer.shutdown();
 ?  }
?
 ?  public ConsumeStats getConsumeStats(String group, String topic) throws Exception {
 ? ? ?  return defaultMQAdminExt.examineConsumeStats(group, topic);
 ?  }
?
 ?  public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
 ? ? ?  return defaultMQAdminExt.examineTopicStats(topic);
 ?  }
?
 ?  public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
 ? ? ?  return defaultMQPullConsumer.pull(mq, "*", offset, 1);
 ?  }
}
  • RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
  • 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
  • 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)

DefaultMQAdminExt

DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java

public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
 ?  private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
 ?  private String adminExtGroup = "admin_ext_group";
 ?  private String createTopicKey = MixAll.DEFAULT_TOPIC;
 ?  private long timeoutMillis = 5000;
?
 ?  //......
?
 ?  @Override
 ?  public ConsumeStats examineConsumeStats(String consumerGroup,
 ? ? ?  String topic) throws RemotingException, MQClientException,
 ? ? ?  InterruptedException, MQBrokerException {
 ? ? ?  return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
 ?  }
?
 ?  @Override
 ?  public TopicStatsTable examineTopicStats(
 ? ? ?  String topic) throws RemotingException, MQClientException, InterruptedException,
 ? ? ?  MQBrokerException {
 ? ? ?  return defaultMQAdminExtImpl.examineTopicStats(topic);
 ?  }
?
 ?  //......
}
  • examineConsumeStats及examineTopicStats都委托给了defaultMQAdminExtImpl

DefaultMQAdminExtImpl

DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 ?  private final Logger log = ClientLogger.getLog();
 ?  private final DefaultMQAdminExt defaultMQAdminExt;
 ?  private ServiceState serviceState = ServiceState.CREATE_JUST;
 ?  private MQClientInstance mqClientInstance;
 ?  private RPCHook rpcHook;
 ?  private long timeoutMillis = 20000;
 ?  private Random random = new Random();
?
 ?  //......
?
 ?  @Override
 ?  public ConsumeStats examineConsumeStats(String consumerGroup,
 ? ? ?  String topic) throws RemotingException, MQClientException,
 ? ? ?  InterruptedException, MQBrokerException {
 ? ? ?  String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic;
 ? ? ?  TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic);
 ? ? ?  ConsumeStats result = new ConsumeStats();
?
 ? ? ?  for (BrokerData bd : topicRouteData.getBrokerDatas()) {
 ? ? ? ? ?  String addr = bd.selectBrokerAddr();
 ? ? ? ? ?  if (addr != null) {
 ? ? ? ? ? ? ?  ConsumeStats consumeStats =
 ? ? ? ? ? ? ? ? ?  this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
 ? ? ? ? ? ? ?  result.getOffsetTable().putAll(consumeStats.getOffsetTable());
 ? ? ? ? ? ? ?  double value = result.getConsumeTps() + consumeStats.getConsumeTps();
 ? ? ? ? ? ? ?  result.setConsumeTps(value);
 ? ? ? ? ?  }
 ? ? ?  }
?
 ? ? ?  if (result.getOffsetTable().isEmpty()) {
 ? ? ? ? ?  throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
 ? ? ? ? ? ? ?  "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
 ? ? ?  }
?
 ? ? ?  return result;
 ?  }
?
 ?  @Override
 ?  public TopicStatsTable examineTopicStats(
 ? ? ?  String topic) throws RemotingException, MQClientException, InterruptedException,
 ? ? ?  MQBrokerException {
 ? ? ?  TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
 ? ? ?  TopicStatsTable topicStatsTable = new TopicStatsTable();
?
 ? ? ?  for (BrokerData bd : topicRouteData.getBrokerDatas()) {
 ? ? ? ? ?  String addr = bd.selectBrokerAddr();
 ? ? ? ? ?  if (addr != null) {
 ? ? ? ? ? ? ?  TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
 ? ? ? ? ? ? ?  topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
 ? ? ? ? ?  }
 ? ? ?  }
?
 ? ? ?  if (topicStatsTable.getOffsetTable().isEmpty()) {
 ? ? ? ? ?  throw new MQClientException("Not found the topic stats info", null);
 ? ? ?  }
?
 ? ? ?  return topicStatsTable;
 ?  }
?
 ?  //......
} ? ?
  • examineConsumeStats方法通过examineTopicRouteInfo(queryTopic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3)获取consumeStats;examineTopicStats方法也是先通过examineTopicRouteInfo(topic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis)获取topicStatsTable

小结

  • RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
  • 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
  • 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)

doc

  • RocketMQProduceOffsetFetcher

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表