跳至主要内容
版本: 5.0

RocketMQ Prometheus 导出器

介绍

Rocketmq-exporter 是一个用于监控 RocketMQ 代理和客户端所有相关指标的系统,它将通过 mqAdmin 从代理端获取的指标值打包到 87 个缓存中。

注意

在之前的版本中,存在 87 个并发 HashMap,但由于 Map 不会删除过期的指标,一旦出现标签变更,就会生成新的指标,而旧的、未使用的指标无法自动删除,最终会导致内存溢出。然而,使用缓存结构可以实现过期删除,并且可以配置过期时间。

Rocketmq-exporter 获取监控指标的过程如下图所示。导出器通过 MQAdminExt 向 MQ 集群请求数据,并将请求到的数据通过 MetricService 标准化为 Prometheus 所需的格式,然后通过 /metrics 接口暴露给 Promethus。 4586095434

指标结构

Metric 类位于 org.apache.rocketmq.expoter.model.metrics 包中,本质上是一组实体类,每个实体类代表一种指标类型,总共 14 个 Metric 类。这些类充当 87 个缓存的键,并通过不同的标签值进行区分。

实体类包含三个维度的标签:代理、消费者、生产者
  • 与代理相关的指标类:BrokerRuntimeMetric、BrokerMetric、DLQTopicOffsetMetric、TopicPutNumMetric
  • 与消费者相关的类:ConsumerRuntimeConsumeFailedMsgsMetric 、ConsumerRuntimeConsumeFailedTPSMetric 、ConsumerRuntimeConsumeOKTPSMetric、ConsumerRuntimeConsumeRTMetric、ConsumerRuntimePullRTMetric、ConsumerRuntimePullTPSMetric、ConsumerCountMetric、ConsumerMetric、ConsumerTopicDiffMetric
  • 与生产者相关的指标类:ProducerMetric

Prometheus 拉取指标

RocketMQ-exporter 项目和 Prometheus 相当于服务器-客户端关系,其中 RocketMQ-exporter 项目引入了 Prometheus 客户端包,该包在项目的 MetricFamilySamples 类中指定了要获取的信息类型。Prometheus 从导出器请求指标,导出器将信息打包到相应的类型后返回给 Prometheus。

rocketmq-exporter 项目启动后,会将 rocketmq 的各种指标收集到 mfs 对象中。当浏览器或 Prometheus 访问相应的接口时,mfs 对象中的样本将通过服务生成成 Prometheus 支持的格式化数据。主要包括以下步骤

浏览器访问 ip:5557/metrics 调用 RMQMetricsController 类中的 metrics 方法,其中 ip 是运行 rocketmq-exporter 项目的主机的 IP。

private void metrics(HttpServletResponse response) throws IOException {
StringWriter writer = new StringWriter();
metricsService.metrics(writer);
response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
response.getOutputStream().print(writer.toString());
}

通过创建一个新的 StringWriter 对象来收集指标,导出器中的指标通过 MetricsService 类中的 metrics 方法收集到 writer 对象中,然后将收集到的指标输出到网页。

收集到的指标格式为

<metric name>{<label name>=<label value>, ...} <metric value>

示例:

rocketmq_group_diff{group="rmq_group_test_20220114",topic="fusion_console_tst",countOfOnlineConsumers="0",msgModel="1",} 23.0

MetricCollectTask 类中的 5 个计划任务

MetricCollectTask 类有五个计划任务:collectTopicOffset、collectConsumerOffset、collectBrokerStatsTopic、collectBrokerStats 和 collectBrokerRuntimeStats。它们用于收集消费者偏移量信息和代理状态信息等。它的 cron 表达式为:cron: 15 0/1 * * * ?, 这意味着它将每分钟收集一次。它的核心功能是通过 mqAdminExt 对象从集群中的代理获取信息,然后将其添加到相应的 87 个监控指标中,以 collectTopicOffset 为例

  1. 首先,初始化 TopicList 对象,并通过 mqAdminExt.fetchAllTopicList() 方法获取集群中的所有主题信息。

    TopicList topicList = null;
    try {
    topicList = mqAdminExt.fetchAllTopicList();
    } catch (Exception ex) {
    log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
    return;
    }
  2. 将主题添加到 topicSet 中,并遍历每个主题,通过 mqAdminExt.examineTopicStats(topic) 函数检查主题状态。

    Set < String > topicSet = topicList != null ? topicList.getTopicList() : null;
    for (String topic: topicSet) {
    TopicStatsTable topicStats = null;
    try {
    topicStats = mqAdminExt.examineTopicStats(topic);
    } catch (Exception ex) {
    log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
    topic,
    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
    continue;}
  3. 初始化主题状态集、按代理划分的主题信息偏移量哈希表 brokerOffsetMap,以及以代理名称为键的哈希表 brokerUpdateTimestampMap 来存储更新时间戳。

    Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
    HashMap<String, Long> brokerOffsetMap = new HashMap<>();
    HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
    for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
    MessageQueue q = topicStatusEntry.getKey();
    TopicOffset offset = topicStatusEntry.getValue();
    if (brokerOffsetMap.containsKey(q.getBrokerName())) {
    brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
    } else {
    brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
    }
    if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
    if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
    brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
    }
    } else {
    brokerUpdateTimestampMap.put(q.getBrokerName(),
    offset.getLastUpdateTimestamp());
    }
    }

  4. 最后,通过遍历 brokerOffsetMap 中的每个项目,从 metricsService 获取 metricCollector 对象,并调用 RMQMetricsCollector 类中的 addTopicOffsetMetric 方法,将相应的值添加到 RMQMetricsCollector 类中 87 个指标的其中一个缓存中。

     Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
    for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
    metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
    brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
    }
    }
    log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
    }

Rocketmq-exporter 收集指标流程图

95680412354

快速入门

配置 application.yml

application.yml 中的重要配置包括

  • server.port 设置 Prometheus 监听 rocketmq-exporter 的端口,默认值为 5557。

  • rocketmq.config.webTelemetryPath 配置 Prometheus 获取指标的路径,默认值为 /metrics。可以使用默认值。

  • rocketmq.config.enableACL 如果 RocketMQ 集群启用了 ACL 验证,则需要将其设置为 true,并在 accessKey 和 secretKey 中配置相应的 ak 和 sk。

  • rocketmq.config.outOfTimeSeconds 用于配置存储指标及其值的过期时间。如果超过此时间,并且缓存中的键没有进行写入更改,则会将其删除。通常,可以将其配置为 60s(Prometheus 获取指标的时间间隔应根据过期时间进行合理配置,只要过期时间大于或等于 Prometheus 收集指标的时间间隔即可)。

  • task..cron 配置导出器通过计划任务从代理拉取指标的时间间隔,默认值为 "15 0/1 * * ?", 这意味着它将每分钟的 15 秒拉取一次指标。

启动导出器应用程序

根据其官方网站上的配置启动 Prometheus

将 Prometheus 的 static_config: -targets 配置为导出器的启动 IP 和端口,例如:localhost:5557。

访问 Prometheus 页面

如果 localhost 在默认的 localhost:9090 上启动,则可以查看收集到的指标值,如下图所示

90671925984

提示

为了获得更好的可视化效果并观察指标值变化的趋势,最好将 Prometheus 与 Grafana 一起使用!

可观察性指标

可观察性指标主要包括两类:服务器端指标和客户端指标。服务器端指标由服务器直接生成,客户端指标在客户端生成,并通过向客户端发送 RPC 请求从服务器获取。客户端指标可以进一步分为生产者指标和消费者指标。所有 87 个可观察性指标及其主要含义如下

服务器指标

服务器指标

指标名称定义对应代理指标名称
rocketmq_broker_tps代理级生产 TPS
rocketmq_broker_qps代理级消费 QPS
rocketmq_broker_commitlog_diff从从节点同步的消息大小落后于主节点
rocketmq_brokeruntime_pmdt_0ms写入请求从服务器端处理开始到完成的时间(0ms)putMessageDistributeTime
rocketmq_brokeruntime_pmdt_0to10ms写入请求从服务器端处理开始到完成的时间(0~10ms)
rocketmq_brokeruntime_pmdt_10to50ms写入请求从服务器端处理开始到完成的时间(10~50ms)
rocketmq_brokeruntime_pmdt_50to100ms写入请求从服务器端处理开始到完成的时间(50~100ms)
rocketmq_brokeruntime_pmdt_100to200ms写入请求从服务器端处理开始到完成的时间(100~200ms)
rocketmq_brokeruntime_pmdt_200to500ms写入请求从服务器端处理开始到完成的时间(200~500ms)
rocketmq_brokeruntime_pmdt_500to1s写入请求从服务器端处理开始到完成的时间(500~1000ms)
rocketmq_brokeruntime_pmdt_1to2s写入请求从服务器端处理开始到完成的时间(1~2s)
rocketmq_brokeruntime_pmdt_2to3s写入请求从服务器端处理开始到完成的时间(2~3s)
rocketmq_brokeruntime_pmdt_3to4s写入请求从服务器端处理开始到完成的时间(3~4s)
rocketmq_brokeruntime_pmdt_4to5s写入请求从服务器端处理开始到完成的时间(4~5s)
rocketmq_brokeruntime_pmdt_5to10s写入请求从服务器端处理开始到完成的时间(5~10s)
rocketmq_brokeruntime_pmdt_10stomore写入请求从服务器端处理开始到完成的时间(> 10s)
rocketmq_brokeruntime_dispatch_behind_bytes尚未分发的消息字节数(例如构建索引的操作)dispatchBehindBytes
rocketmq_brokeruntime_put_message_size_total写入到代理的消息大小的总和putMessageSizeTotal
rocketmq_brokeruntime_put_message_average_size写入到代理的消息的平均大小putMessageAverageSize
rocketmq_brokeruntime_remain_transientstore_buffer_numbsTransientStorePool 中队列的容量remainTransientStoreBufferNumbs
rocketmq_brokeruntime_earliest_message_timestamp代理存储的消息的 earliest timestampearliestMessageTimeStamp
rocketmq_brokeruntime_putmessage_entire_time_max代理启动后写入消息的最大时间putMessageEntireTimeMax
rocketmq_brokeruntime_start_accept_sendrequest_time代理开始接受发送请求的时间startAcceptSendRequestTimeStamp
rocketmq_brokeruntime_putmessage_times_total写入代理的消息的总次数putMessageTimesTotal
rocketmq_brokeruntime_getmessage_entire_time_max代理启动后处理消息拉取的最大时间getMessageEntireTimeMax
rocketmq_brokeruntime_pagecache_lock_time_millspageCacheLockTimeMills
rocketmq_brokeruntime_commitlog_disk_ratiocommitLog 所在磁盘的使用率commitLogDiskRatio
rocketmq_brokeruntime_dispatch_maxbuffer代理未计算的值,保持为 0dispatchMaxBuffer
rocketmq_brokeruntime_pull_threadpoolqueue_capacity处理拉取请求的线程池队列的容量。pullThreadPoolQueueCapacity
rocketmq_brokeruntime_send_threadpoolqueue_capacity处理拉取请求的线程池中队列的容量sendThreadPoolQueueCapacity
rocketmq_brokeruntime_query_threadpool_queue_capacity处理查询请求的线程池中队列的容量queryThreadPoolQueueCapacity
rocketmq_brokeruntime_pull_threadpoolqueue_size处理拉取请求的线程池中队列的实际大小pullThreadPoolQueueSize
rocketmq_brokeruntime_query_threadpoolqueue_size处理查询请求的线程池中队列的实际大小queryThreadPoolQueueSize
rocketmq_brokeruntime_send_threadpool_queue_size处理发送请求的线程池中队列的实际大小sendThreadPoolQueueSize
rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills处理拉取请求的线程池中队列中头部任务的等待时间pullThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills处理查询请求的线程池中队列中头部任务的等待时间queryThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills处理发送请求的线程池中队列中头部任务的等待时间sendThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_msg_gettotal_yesterdaymorning昨晚午夜之前读取消息的总次数msgGetTotalYesterdayMorning
rocketmq_brokeruntime_msg_puttotal_yesterdaymorning昨晚午夜之前写入消息的总次数msgPutTotalYesterdayMorning
rocketmq_brokeruntime_msg_gettotal_todaymorning今晚午夜之前读取消息的总次数msgGetTotalTodayMorning
rocketmq_brokeruntime_msg_puttotal_todaymorning今晚午夜之前写入消息的总次数putMessageTimesTotal
rocketmq_brokeruntime_msg_put_total_today_now到目前为止写入每个代理的消息数量。msgPutTotalTodayNow
rocketmq_brokeruntime_msg_gettotal_today_now到目前为止从每个代理读取的消息数量。msgGetTotalTodayNow
rocketmq_brokeruntime_commitlogdir_capacity_free存储 commitLog 的目录的可用空间。commitLogDirCapacity
rocketmq_brokeruntime_commitlogdir_capacity_total存储 commitLog 的目录的总空间。
rocketmq_brokeruntime_commitlog_maxoffsetcommitLog 的最大偏移量。commitLogMaxOffset
rocketmq_brokeruntime_commitlog_minoffsetcommitLog 的最小偏移量。commitLogMinOffset
rocketmq_brokeruntime_remain_howmanydata_toflushremainHowManyDataToFlush
rocketmq_brokeruntime_getfound_tps600过去 600 秒内 getMessage 期间接收的消息的平均 TPS。getFoundTps
rocketmq_brokeruntime_getfound_tps60过去 60 秒内 getMessage 期间接收的消息的平均 TPS。
rocketmq_brokeruntime_getfound_tps10过去 10 秒内 getMessage 期间接收的消息的平均 TPS。
rocketmq_brokeruntime_gettotal_tps600过去 600 秒内 getMessage 调用的平均 TPS。getTotalTps
rocketmq_brokeruntime_gettotal_tps60过去 60 秒内 getMessage 调用的平均 TPS。
rocketmq_brokeruntime_gettotal_tps10过去 10 秒内 getMessage 调用的平均 TPS。
rocketmq_brokeruntime_gettransfered_tps600getTransferedTps
rocketmq_brokeruntime_gettransfered_tps60
rocketmq_brokeruntime_gettransfered_tps10
rocketmq_brokeruntime_getmiss_tps600过去 600 秒内未获取到消息的 getMessage 的平均 TPSgetMissTps
rocketmq_brokeruntime_getmiss_tps60过去 60 秒内未获取到消息的 getMessage 的平均 TPS
rocketmq_brokeruntime_getmiss_tps10过去 10 秒内未获取到消息的 getMessage 的平均 TPS
rocketmq_brokeruntime_put_tps600过去 600 秒内消息写入操作的平均 TPSputTps
rocketmq_brokeruntime_put_tps60过去 60 秒内消息写入操作的平均 TPS
rocketmq_brokeruntime_put_tps10过去 10 秒内消息写入操作的平均 TPS
生产者指标

生产者指标

指标名称定义
rocketmq_producer_offset当前主题的最大偏移量
rocketmq_topic_retry_offset当前重试主题的最大偏移量
rocketmq_topic_dlq_offset当前死信主题的最大偏移量
rocketmq_producer_tpsBroker 组上主题的生产 TPS
rocketmq_producer_message_sizeBroker 组上主题的生产消息大小的 TPS
rocketmq_queue_producer_tps队列级别的生产 TPS
rocketmq_queue_producer_message_size队列级别的生产消息大小的 TPS
消费者指标

消费者指标

指标名称定义
rocketmq_group_diff消费者组消息累积消息计数
rocketmq_group_retrydiff消费者组重试队列累积消息计数
rocketmq_group_dlqdiff消费者组死信队列累积消息计数
rocketmq_group_count消费者组中的消费者数量
rocketmq_client_consume_fail_msg_count消费者组中消费者在过去 1 小时内消费失败的次数
rocketmq_client_consume_fail_msg_tps消费者组消费者失败 TPS
rocketmq_client_consume_ok_msg_tps消费者组消费者成功 TPS
rocketmq_client_consume_rt消息被拉取后被消费的时间
rocketmq_client_consumer_pull_rt客户端拉取消息的时间
rocketmq_client_consumer_pull_tps客户端拉取消息 TPS
rocketmq_consumer_tps每个 Broker 组上订阅组的消费 TPS
rocketmq_group_consume_tps订阅组的当前消费 TPS(按 broker 聚合,用于 rocketmq_consumer_tps)
rocketmq_consumer_offset订阅组在 Broker 组中的当前消费偏移量
rocketmq_group_consume_total_offset订阅组的当前消费偏移量(按 broker 聚合,用于 rocketmq_consumer_offset)
rocketmq_consumer_message_size订阅组在 Broker 组中消费消息大小的 TPS
rocketmq_send_back_nums订阅组在 Broker 组中消费失败并写入重试消息的次数
rocketmq_group_get_latency_by_storetime消费者组的消费延迟,当前时间与导出器获取消息的时间之差。