跳过主内容
版本: 5.0

RocketMQ Prometheus Exporter

简介

Rocketmq-exporter 是一个用于监控 RocketMQ broker 和 client 侧所有相关指标的系统,它将通过 mqAdmin 从 broker 侧获取到的指标值封装到 87 个缓存中。

注意

在之前的版本中,有 87 个 concurrentHashMaps,但由于 Map 不会删除过期的指标,一旦标签发生变化,就会生成新的指标,而旧的、未使用的指标无法自动删除,最终导致内存溢出。但是,使用 Cache 结构可以实现过期删除,并且可以配置过期时间。

`Rocketmq-exporter` 获取监控指标的流程如下图所示。Exporter 通过 MQAdminExt 从 MQ 集群请求数据,请求到的数据通过 MetricService 标准化为 Prometheus 所需的格式,然后通过 `/metrics` 接口暴露给 Prometheus。4586095434

指标结构

Metric 类位于 **`org.apache.rocketmq.expoter.model.metrics`** 包中,本质上是一组实体类,每个实体类代表一种指标类型,总共有 14 个 Metric 类。这些类是 87 个缓存的键,并通过不同的标签值进行区分。

实体类包含三个维度的标签:broker、consumer、producer
  • Broker 相关指标类 : BrokerRuntimeMetric、BrokerMetric、DLQTopicOffsetMetric、TopicPutNumMetric
  • 消费者相关类 : ConsumerRuntimeConsumeFailedMsgsMetric 、ConsumerRuntimeConsumeFailedTPSMetric 、ConsumerRuntimeConsumeOKTPSMetric、ConsumerRuntimeConsumeRTMetric、ConsumerRuntimePullRTMetric、ConsumerRuntimePullTPSMetric、ConsumerCountMetric、ConsumerMetric、ConsumerTopicDiffMetric
  • 生产者相关指标类: ProducerMetric

Prometheus 拉取指标

RocketMQ-exporter 项目和 `Prometheus` 相当于服务器-客户端关系,其中 `RocketMQ-exporter` 项目引入了 Prometheus 客户端包,该包在项目的 MetricFamilySamples 类中指定要获取的信息类型。Prometheus 从 exporter 请求指标,exporter 将信息打包成相应的类型后返回给 Prometheus。

rocketmq-exporter 项目启动后,会将 RocketMQ 的各种指标收集到 mfs 对象中。当浏览器或 Prometheus 访问相应的接口时,mfs 对象中的样本将通过服务生成 Prometheus 支持的格式化数据。主要包括以下步骤:

浏览器访问 ip:5557/metrics 以调用 RMQMetricsController 类中的 metrics 方法,其中 ip 是运行 rocketmq-exporter 项目的主机 IP。

private void metrics(HttpServletResponse response) throws IOException {
StringWriter writer = new StringWriter();
metricsService.metrics(writer);
response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
response.getOutputStream().print(writer.toString());
}

通过创建一个新的 StringWriter 对象来收集指标,exporter 中的指标通过 MetricsService 类中的 metrics 方法收集到 writer 对象中,然后将收集到的指标输出到网页。

收集到的指标格式为:

<metric name>{<label name>=<label value>, ...} <metric value>

示例:

rocketmq_group_diff{group="rmq_group_test_20220114",topic="fusion_console_tst",countOfOnlineConsumers="0",msgModel="1",} 23.0

MetricCollectTask 类中的 5 个定时任务

MetricCollectTask 类有五个定时任务:collectTopicOffset、collectConsumerOffset、collectBrokerStatsTopic、collectBrokerStats 和 collectBrokerRuntimeStats。它们用于收集消费者偏移量信息和 Broker 状态信息等。其 cron 表达式为:`cron: 15 0/1 * * * ?`,这意味着它将每分钟收集一次。其核心功能是通过 mqAdminExt 对象从集群中的 broker 获取信息,然后将其添加到相应的 87 个监控指标中,以 collectTopicOffset 为例

  1. 首先,初始化 TopicList 对象,并通过 mqAdminExt.fetchAllTopicList() 方法获取集群中的所有 topic 信息。

    TopicList topicList = null;
    try {
    topicList = mqAdminExt.fetchAllTopicList();
    } catch (Exception ex) {
    log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
    return;
    }
  2. 将 topic 添加到 topicSet 中,并遍历每个 topic,通过 mqAdminExt.examineTopicStats(topic) 函数检查 topic 状态。

    Set < String > topicSet = topicList != null ? topicList.getTopicList() : null;
    for (String topic: topicSet) {
    TopicStatsTable topicStats = null;
    try {
    topicStats = mqAdminExt.examineTopicStats(topic);
    } catch (Exception ex) {
    log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
    topic,
    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
    continue;}
  3. 初始化 topic 状态集、按 broker 划分 topic 信息偏移量的哈希表 brokerOffsetMap,以及以 broker 名称为键存储更新时间戳的哈希表 brokerUpdateTimestampMap。

    Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
    HashMap<String, Long> brokerOffsetMap = new HashMap<>();
    HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
    for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
    MessageQueue q = topicStatusEntry.getKey();
    TopicOffset offset = topicStatusEntry.getValue();
    if (brokerOffsetMap.containsKey(q.getBrokerName())) {
    brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
    } else {
    brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
    }
    if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
    if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
    brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
    }
    } else {
    brokerUpdateTimestampMap.put(q.getBrokerName(),
    offset.getLastUpdateTimestamp());
    }
    }

  4. 最后,通过遍历 brokerOffsetMap 中的每个项,通过 metricsService 获取 metricCollector 对象,并调用 RMQMetricsCollector 类中的 addTopicOffsetMetric 方法,将相应的值添加到 RMQMetricsCollector 类中 87 个指标的其中一个缓存中。

     Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
    for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
    metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
    brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
    }
    }
    log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
    }

Rocketmq-exporter 指标收集流程图

95680412354

快速入门

配置 `application.yml`

`application.yml` 中的重要配置包括:

  • server.port 设置 Prometheus 监听 rocketmq-exporter 的端口,默认值为 5557。

  • rocketmq.config.webTelemetryPath 配置 Prometheus 获取指标的路径,默认值为 `/metrics`。可以使用默认值。

  • rocketmq.config.enableACL 如果 RocketMQ 集群启用了 ACL 验证,则需要将其设置为 true,并在 accessKey 和 secretKey 中配置相应的 ak 和 sk。

  • rocketmq.config.outOfTimeSeconds 用于配置存储指标及其值的过期时间。如果超过此时间且缓存中的键未发生写入更改,则将被删除。通常可以配置为 60s(Prometheus 获取指标的时间间隔应根据过期时间合理配置,只要过期时间大于等于 Prometheus 收集指标的时间间隔即可)。

  • task.*.cron 配置 exporter 通过定时任务从 broker 拉取指标的时间间隔,默认值为 `15 0/1 * * * ?`,表示每分钟的第 15 秒拉取一次指标。

启动 exporter 应用

根据 Prometheus 官网配置启动 Prometheus

将 Prometheus 的 `static_config: -targets` 配置为 exporter 的启动 IP 和端口,例如:`localhost:5557`。

访问 Prometheus 页面

如果 localhost 以默认的 `localhost:9090` 启动,您可以查看收集到的指标值,如下图所示

90671925984

提示

为了更好的可视化效果和观察指标值变化趋势,Prometheus 最好与 Grafana 配合使用!

可观测性指标

可观测性指标主要包括两类:服务端指标和客户端指标。服务端指标由服务器直接生成,客户端指标在客户端生成并通过 RPC 请求由服务器获取。客户端指标可进一步分为生产者指标和消费者指标。所有 87 个可观测性指标及其主要含义如下:

服务端指标

服务端指标

指标名称定义对应 Broker 指标名称
rocketmq_broker_tpsBroker 级生产 TPS
rocketmq_broker_qpsBroker 级消费 QPS
rocketmq_broker_commitlog_diffBroker 组从从节点同步落后消息大小
rocketmq_brokeruntime_pmdt_0ms服务端写入请求到完成写入的处理时间(0ms)putMessageDistributeTime
rocketmq_brokeruntime_pmdt_0to10ms服务端写入请求到完成写入的处理时间(0~10ms)
rocketmq_brokeruntime_pmdt_10to50ms服务端写入请求到完成写入的处理时间(10~50ms)
rocketmq_brokeruntime_pmdt_50to100ms服务端写入请求到完成写入的处理时间(50~100ms)
rocketmq_brokeruntime_pmdt_100to200ms服务端写入请求到完成写入的处理时间(100~200ms)
rocketmq_brokeruntime_pmdt_200to500ms服务端写入请求到完成写入的处理时间(200~500ms)
rocketmq_brokeruntime_pmdt_500to1s服务端写入请求到完成写入的处理时间(500~1000ms)
rocketmq_brokeruntime_pmdt_1to2s服务端写入请求到完成写入的处理时间(1~2s)
rocketmq_brokeruntime_pmdt_2to3s服务端写入请求到完成写入的处理时间(2~3s)
rocketmq_brokeruntime_pmdt_3to4s服务端写入请求到完成写入的处理时间(3~4s)
rocketmq_brokeruntime_pmdt_4to5s服务端写入请求到完成写入的处理时间(4~5s)
rocketmq_brokeruntime_pmdt_5to10s服务端写入请求到完成写入的处理时间(5~10s)
rocketmq_brokeruntime_pmdt_10stomore服务端写入请求到完成写入的处理时间(> 10s)
rocketmq_brokeruntime_dispatch_behind_bytes尚未分发的消息字节数(例如构建索引等操作)dispatchBehindBytes
rocketmq_brokeruntime_put_message_size_total写入 broker 的消息总大小putMessageSizeTotal
rocketmq_brokeruntime_put_message_average_size写入 broker 的消息平均大小putMessageAverageSize
rocketmq_brokeruntime_remain_transientstore_buffer_numbsTransientStorePool 中队列的容量remainTransientStoreBufferNumbs
rocketmq_brokeruntime_earliest_message_timestampbroker 存储的最早消息时间戳earliestMessageTimeStamp
rocketmq_brokeruntime_putmessage_entire_time_max自 broker 运行以来,写入消息的最大耗时putMessageEntireTimeMax
rocketmq_brokeruntime_start_accept_sendrequest_timebroker 开始接受发送请求的时间startAcceptSendRequestTimeStamp
rocketmq_brokeruntime_putmessage_times_total写入 broker 的消息总次数putMessageTimesTotal
rocketmq_brokeruntime_getmessage_entire_time_max自 broker 运行以来,处理消息拉取请求的最大耗时getMessageEntireTimeMax
rocketmq_brokeruntime_pagecache_lock_time_millspageCacheLockTimeMills
rocketmq_brokeruntime_commitlog_disk_ratiocommitLog 所在磁盘的使用率commitLogDiskRatio
rocketmq_brokeruntime_dispatch_maxbufferbroker 未计算且保持为 0 的值dispatchMaxBuffer
rocketmq_brokeruntime_pull_threadpoolqueue_capacity处理拉取请求的线程池队列容量。pullThreadPoolQueueCapacity
rocketmq_brokeruntime_send_threadpoolqueue_capacity处理发送请求的线程池队列容量sendThreadPoolQueueCapacity
rocketmq_brokeruntime_query_threadpool_queue_capacity处理查询请求的线程池队列容量queryThreadPoolQueueCapacity
rocketmq_brokeruntime_pull_threadpoolqueue_size处理拉取请求的线程池队列实际大小pullThreadPoolQueueSize
rocketmq_brokeruntime_query_threadpoolqueue_size处理查询请求的线程池队列实际大小queryThreadPoolQueueSize
rocketmq_brokeruntime_send_threadpool_queue_size处理发送请求的线程池队列实际大小sendThreadPoolQueueSize
rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills处理拉取请求的线程池队列中头部任务的等待时间pullThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills处理查询请求的线程池队列中头部任务的等待时间queryThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills处理发送请求的线程池队列中头部任务的等待时间sendThreadPoolQueueHeadWaitTimeMills
rocketmq_brokeruntime_msg_gettotal_yesterdaymorning截至昨晚午夜,消息读取的总次数msgGetTotalYesterdayMorning
rocketmq_brokeruntime_msg_puttotal_yesterdaymorning截至昨晚午夜,消息写入的总次数msgPutTotalYesterdayMorning
rocketmq_brokeruntime_msg_gettotal_todaymorning截至今晚午夜,消息读取的总次数msgGetTotalTodayMorning
rocketmq_brokeruntime_msg_puttotal_todaymorning截至今晚午夜,消息写入的总次数putMessageTimesTotal
rocketmq_brokeruntime_msg_put_total_today_now目前写入每个 broker 的消息数量。msgPutTotalTodayNow
rocketmq_brokeruntime_msg_gettotal_today_now目前从每个 broker 读取的消息数量。msgGetTotalTodayNow
rocketmq_brokeruntime_commitlogdir_capacity_freecommitLog 存储目录中的可用空间。commitLogDirCapacity
rocketmq_brokeruntime_commitlogdir_capacity_totalcommit logs 存储目录中的总空间。
rocketmq_brokeruntime_commitlog_maxoffsetcommitLog 的最大偏移量。commitLogMaxOffset
rocketmq_brokeruntime_commitlog_minoffsetcommitLog 的最小偏移量。commitLogMinOffset
rocketmq_brokeruntime_remain_howmanydata_toflushremainHowManyDataToFlush
rocketmq_brokeruntime_getfound_tps600过去 600 秒内 getMessage 期间接收到的消息的平均 TPS。getFoundTps
rocketmq_brokeruntime_getfound_tps60过去 60 秒内 getMessage 期间接收到的消息的平均 TPS。
rocketmq_brokeruntime_getfound_tps10过去 10 秒内 getMessage 期间接收到的消息的平均 TPS。
rocketmq_brokeruntime_gettotal_tps600过去 600 秒内 getMessage 调用平均 TPS。getTotalTps
rocketmq_brokeruntime_gettotal_tps60过去 60 秒内 getMessage 调用平均 TPS。
rocketmq_brokeruntime_gettotal_tps10过去 10 秒内 getMessage 调用平均 TPS。
rocketmq_brokeruntime_gettransfered_tps600getTransferedTps
rocketmq_brokeruntime_gettransfered_tps60
rocketmq_brokeruntime_gettransfered_tps10
rocketmq_brokeruntime_getmiss_tps600过去 600 秒内 getMessage 未获取到消息的平均 TPSgetMissTps
rocketmq_brokeruntime_getmiss_tps60过去 60 秒内 getMessage 未获取到消息的平均 TPS
rocketmq_brokeruntime_getmiss_tps10过去 10 秒内 getMessage 未获取到消息的平均 TPS
rocketmq_brokeruntime_put_tps600过去 600 秒内消息写入操作的平均 TPSputTps
rocketmq_brokeruntime_put_tps60过去 60 秒内消息写入操作的平均 TPS
rocketmq_brokeruntime_put_tps10过去 10 秒内消息写入操作的平均 TPS
生产者指标

生产者指标

指标名称定义
rocketmq_producer_offset当前 Topic 的最大偏移量
rocketmq_topic_retry_offset当前重试 Topic 的最大偏移量
rocketmq_topic_dlq_offset当前死信 Topic 的最大偏移量
rocketmq_producer_tpsTopic 在 Broker 组上的生产 TPS
rocketmq_producer_message_sizeTopic 在 Broker 组上的生产消息大小 TPS
rocketmq_queue_producer_tps队列级生产 TPS
rocketmq_queue_producer_message_size队列级生产消息大小 TPS
消费者指标

消费者指标

指标名称定义
rocketmq_group_diff消费组消息堆积数量
rocketmq_group_retrydiff消费组重试队列堆积数量
rocketmq_group_dlqdiff消费组死信队列堆积数量
rocketmq_group_count消费组内的消费者数量
rocketmq_client_consume_fail_msg_count消费组内消费者过去 1 小时消费失败次数
rocketmq_client_consume_fail_msg_tps消费组消费失败 TPS
rocketmq_client_consume_ok_msg_tps消费组消费成功 TPS
rocketmq_client_consume_rt消息拉取后被消费的耗时
rocketmq_client_consumer_pull_rt客户端拉取消息的耗时
rocketmq_client_consumer_pull_tps客户端拉取消息 TPS
rocketmq_consumer_tps订阅组在各 Broker 组上的消费 TPS
rocketmq_group_consume_tps订阅组当前消费 TPS(由 Broker 聚合 rocketmq_consumer_tps)
rocketmq_consumer_offset订阅组在 Broker 组中的当前消费偏移量
rocketmq_group_consume_total_offset订阅组当前消费偏移量(由 Broker 聚合 rocketmq_consumer_offset)
rocketmq_consumer_message_size订阅组在 Broker 组中消费消息大小的 TPS
rocketmq_send_back_nums订阅组在 Broker 组中消费失败并写入重试消息的次数
rocketmq_group_get_latency_by_storetime消费组的消费延迟,即当前时间与 exporter 获取消息时的时间差。