RocketMQ Prometheus 导出器
介绍
Rocketmq-exporter
是一个用于监控 RocketMQ 代理和客户端所有相关指标的系统,它将通过 mqAdmin 从代理端获取的指标值打包到 87 个缓存中。
在之前的版本中,存在 87 个并发 HashMap,但由于 Map 不会删除过期的指标,一旦出现标签变更,就会生成新的指标,而旧的、未使用的指标无法自动删除,最终会导致内存溢出。然而,使用缓存结构可以实现过期删除,并且可以配置过期时间。
Rocketmq-exporter
获取监控指标的过程如下图所示。导出器通过 MQAdminExt 向 MQ 集群请求数据,并将请求到的数据通过 MetricService 标准化为 Prometheus 所需的格式,然后通过 /metrics
接口暴露给 Promethus。
指标结构
Metric 类位于 org.apache.rocketmq.expoter.model.metrics
包中,本质上是一组实体类,每个实体类代表一种指标类型,总共 14 个 Metric 类。这些类充当 87 个缓存的键,并通过不同的标签值进行区分。
- 与代理相关的指标类:BrokerRuntimeMetric、BrokerMetric、DLQTopicOffsetMetric、TopicPutNumMetric
- 与消费者相关的类:ConsumerRuntimeConsumeFailedMsgsMetric 、ConsumerRuntimeConsumeFailedTPSMetric 、ConsumerRuntimeConsumeOKTPSMetric、ConsumerRuntimeConsumeRTMetric、ConsumerRuntimePullRTMetric、ConsumerRuntimePullTPSMetric、ConsumerCountMetric、ConsumerMetric、ConsumerTopicDiffMetric
- 与生产者相关的指标类:ProducerMetric
Prometheus 拉取指标
RocketMQ-exporter
项目和 Prometheus
相当于服务器-客户端关系,其中 RocketMQ-exporter 项目引入了 Prometheus 客户端包,该包在项目的 MetricFamilySamples 类中指定了要获取的信息类型。Prometheus 从导出器请求指标,导出器将信息打包到相应的类型后返回给 Prometheus。
rocketmq-exporter 项目启动后,会将 rocketmq 的各种指标收集到 mfs 对象中。当浏览器或 Prometheus 访问相应的接口时,mfs 对象中的样本将通过服务生成成 Prometheus 支持的格式化数据。主要包括以下步骤
浏览器访问 ip:5557/metrics 调用 RMQMetricsController 类中的 metrics 方法,其中 ip 是运行 rocketmq-exporter 项目的主机的 IP。
private void metrics(HttpServletResponse response) throws IOException {
StringWriter writer = new StringWriter();
metricsService.metrics(writer);
response.setHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8");
response.getOutputStream().print(writer.toString());
}
通过创建一个新的 StringWriter 对象来收集指标,导出器中的指标通过 MetricsService 类中的 metrics 方法收集到 writer 对象中,然后将收集到的指标输出到网页。
收集到的指标格式为
<metric name>{<label name>=<label value>, ...} <metric value>
示例:
rocketmq_group_diff{group="rmq_group_test_20220114",topic="fusion_console_tst",countOfOnlineConsumers="0",msgModel="1",} 23.0
MetricCollectTask 类中的 5 个计划任务
MetricCollectTask 类有五个计划任务:collectTopicOffset、collectConsumerOffset、collectBrokerStatsTopic、collectBrokerStats 和 collectBrokerRuntimeStats。它们用于收集消费者偏移量信息和代理状态信息等。它的 cron 表达式为:cron: 15 0/1 * * * ?, 这意味着它将每分钟收集一次。它的核心功能是通过 mqAdminExt 对象从集群中的代理获取信息,然后将其添加到相应的 87 个监控指标中,以 collectTopicOffset 为例
首先,初始化 TopicList 对象,并通过 mqAdminExt.fetchAllTopicList() 方法获取集群中的所有主题信息。
TopicList topicList = null;
try {
topicList = mqAdminExt.fetchAllTopicList();
} catch (Exception ex) {
log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
return;
}将主题添加到 topicSet 中,并遍历每个主题,通过 mqAdminExt.examineTopicStats(topic) 函数检查主题状态。
Set < String > topicSet = topicList != null ? topicList.getTopicList() : null;
for (String topic: topicSet) {
TopicStatsTable topicStats = null;
try {
topicStats = mqAdminExt.examineTopicStats(topic);
} catch (Exception ex) {
log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
topic,
JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
continue;}初始化主题状态集、按代理划分的主题信息偏移量哈希表 brokerOffsetMap,以及以代理名称为键的哈希表 brokerUpdateTimestampMap 来存储更新时间戳。
Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
HashMap<String, Long> brokerOffsetMap = new HashMap<>();
HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();
for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
MessageQueue q = topicStatusEntry.getKey();
TopicOffset offset = topicStatusEntry.getValue();
if (brokerOffsetMap.containsKey(q.getBrokerName())) {
brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
} else {
brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
}
if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
}
} else {
brokerUpdateTimestampMap.put(q.getBrokerName(),
offset.getLastUpdateTimestamp());
}
}最后,通过遍历 brokerOffsetMap 中的每个项目,从 metricsService 获取 metricCollector 对象,并调用 RMQMetricsCollector 类中的 addTopicOffsetMetric 方法,将相应的值添加到 RMQMetricsCollector 类中 87 个指标的其中一个缓存中。
Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
}
}
log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
}
Rocketmq-exporter 收集指标流程图
快速入门
配置 application.yml
application.yml
中的重要配置包括
server.port 设置 Prometheus 监听 rocketmq-exporter 的端口,默认值为 5557。
rocketmq.config.webTelemetryPath 配置 Prometheus 获取指标的路径,默认值为 /metrics。可以使用默认值。
rocketmq.config.enableACL 如果 RocketMQ 集群启用了 ACL 验证,则需要将其设置为 true,并在 accessKey 和 secretKey 中配置相应的 ak 和 sk。
rocketmq.config.outOfTimeSeconds 用于配置存储指标及其值的过期时间。如果超过此时间,并且缓存中的键没有进行写入更改,则会将其删除。通常,可以将其配置为 60s(Prometheus 获取指标的时间间隔应根据过期时间进行合理配置,只要过期时间大于或等于 Prometheus 收集指标的时间间隔即可)。
task..cron 配置导出器通过计划任务从代理拉取指标的时间间隔,默认值为 "15 0/1 * * ?", 这意味着它将每分钟的 15 秒拉取一次指标。
启动导出器应用程序
根据其官方网站上的配置启动 Prometheus
将 Prometheus 的 static_config: -targets 配置为导出器的启动 IP 和端口,例如:localhost:5557。
访问 Prometheus 页面
如果 localhost 在默认的 localhost:9090 上启动,则可以查看收集到的指标值,如下图所示
为了获得更好的可视化效果并观察指标值变化的趋势,最好将 Prometheus 与 Grafana 一起使用!
可观察性指标
可观察性指标主要包括两类:服务器端指标和客户端指标。服务器端指标由服务器直接生成,客户端指标在客户端生成,并通过向客户端发送 RPC 请求从服务器获取。客户端指标可以进一步分为生产者指标和消费者指标。所有 87 个可观察性指标及其主要含义如下
服务器指标
服务器指标
指标名称 | 定义 | 对应代理指标名称 |
---|---|---|
rocketmq_broker_tps | 代理级生产 TPS | |
rocketmq_broker_qps | 代理级消费 QPS | |
rocketmq_broker_commitlog_diff | 从从节点同步的消息大小落后于主节点 | |
rocketmq_brokeruntime_pmdt_0ms | 写入请求从服务器端处理开始到完成的时间(0ms) | putMessageDistributeTime |
rocketmq_brokeruntime_pmdt_0to10ms | 写入请求从服务器端处理开始到完成的时间(0~10ms) | |
rocketmq_brokeruntime_pmdt_10to50ms | 写入请求从服务器端处理开始到完成的时间(10~50ms) | |
rocketmq_brokeruntime_pmdt_50to100ms | 写入请求从服务器端处理开始到完成的时间(50~100ms) | |
rocketmq_brokeruntime_pmdt_100to200ms | 写入请求从服务器端处理开始到完成的时间(100~200ms) | |
rocketmq_brokeruntime_pmdt_200to500ms | 写入请求从服务器端处理开始到完成的时间(200~500ms) | |
rocketmq_brokeruntime_pmdt_500to1s | 写入请求从服务器端处理开始到完成的时间(500~1000ms) | |
rocketmq_brokeruntime_pmdt_1to2s | 写入请求从服务器端处理开始到完成的时间(1~2s) | |
rocketmq_brokeruntime_pmdt_2to3s | 写入请求从服务器端处理开始到完成的时间(2~3s) | |
rocketmq_brokeruntime_pmdt_3to4s | 写入请求从服务器端处理开始到完成的时间(3~4s) | |
rocketmq_brokeruntime_pmdt_4to5s | 写入请求从服务器端处理开始到完成的时间(4~5s) | |
rocketmq_brokeruntime_pmdt_5to10s | 写入请求从服务器端处理开始到完成的时间(5~10s) | |
rocketmq_brokeruntime_pmdt_10stomore | 写入请求从服务器端处理开始到完成的时间(> 10s) | |
rocketmq_brokeruntime_dispatch_behind_bytes | 尚未分发的消息字节数(例如构建索引的操作) | dispatchBehindBytes |
rocketmq_brokeruntime_put_message_size_total | 写入到代理的消息大小的总和 | putMessageSizeTotal |
rocketmq_brokeruntime_put_message_average_size | 写入到代理的消息的平均大小 | putMessageAverageSize |
rocketmq_brokeruntime_remain_transientstore_buffer_numbs | TransientStorePool 中队列的容量 | remainTransientStoreBufferNumbs |
rocketmq_brokeruntime_earliest_message_timestamp | 代理存储的消息的 earliest timestamp | earliestMessageTimeStamp |
rocketmq_brokeruntime_putmessage_entire_time_max | 代理启动后写入消息的最大时间 | putMessageEntireTimeMax |
rocketmq_brokeruntime_start_accept_sendrequest_time | 代理开始接受发送请求的时间 | startAcceptSendRequestTimeStamp |
rocketmq_brokeruntime_putmessage_times_total | 写入代理的消息的总次数 | putMessageTimesTotal |
rocketmq_brokeruntime_getmessage_entire_time_max | 代理启动后处理消息拉取的最大时间 | getMessageEntireTimeMax |
rocketmq_brokeruntime_pagecache_lock_time_mills | pageCacheLockTimeMills | |
rocketmq_brokeruntime_commitlog_disk_ratio | commitLog 所在磁盘的使用率 | commitLogDiskRatio |
rocketmq_brokeruntime_dispatch_maxbuffer | 代理未计算的值,保持为 0 | dispatchMaxBuffer |
rocketmq_brokeruntime_pull_threadpoolqueue_capacity | 处理拉取请求的线程池队列的容量。 | pullThreadPoolQueueCapacity |
rocketmq_brokeruntime_send_threadpoolqueue_capacity | 处理拉取请求的线程池中队列的容量 | sendThreadPoolQueueCapacity |
rocketmq_brokeruntime_query_threadpool_queue_capacity | 处理查询请求的线程池中队列的容量 | queryThreadPoolQueueCapacity |
rocketmq_brokeruntime_pull_threadpoolqueue_size | 处理拉取请求的线程池中队列的实际大小 | pullThreadPoolQueueSize |
rocketmq_brokeruntime_query_threadpoolqueue_size | 处理查询请求的线程池中队列的实际大小 | queryThreadPoolQueueSize |
rocketmq_brokeruntime_send_threadpool_queue_size | 处理发送请求的线程池中队列的实际大小 | sendThreadPoolQueueSize |
rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills | 处理拉取请求的线程池中队列中头部任务的等待时间 | pullThreadPoolQueueHeadWaitTimeMills |
rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills | 处理查询请求的线程池中队列中头部任务的等待时间 | queryThreadPoolQueueHeadWaitTimeMills |
rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills | 处理发送请求的线程池中队列中头部任务的等待时间 | sendThreadPoolQueueHeadWaitTimeMills |
rocketmq_brokeruntime_msg_gettotal_yesterdaymorning | 昨晚午夜之前读取消息的总次数 | msgGetTotalYesterdayMorning |
rocketmq_brokeruntime_msg_puttotal_yesterdaymorning | 昨晚午夜之前写入消息的总次数 | msgPutTotalYesterdayMorning |
rocketmq_brokeruntime_msg_gettotal_todaymorning | 今晚午夜之前读取消息的总次数 | msgGetTotalTodayMorning |
rocketmq_brokeruntime_msg_puttotal_todaymorning | 今晚午夜之前写入消息的总次数 | putMessageTimesTotal |
rocketmq_brokeruntime_msg_put_total_today_now | 到目前为止写入每个代理的消息数量。 | msgPutTotalTodayNow |
rocketmq_brokeruntime_msg_gettotal_today_now | 到目前为止从每个代理读取的消息数量。 | msgGetTotalTodayNow |
rocketmq_brokeruntime_commitlogdir_capacity_free | 存储 commitLog 的目录的可用空间。 | commitLogDirCapacity |
rocketmq_brokeruntime_commitlogdir_capacity_total | 存储 commitLog 的目录的总空间。 | |
rocketmq_brokeruntime_commitlog_maxoffset | commitLog 的最大偏移量。 | commitLogMaxOffset |
rocketmq_brokeruntime_commitlog_minoffset | commitLog 的最小偏移量。 | commitLogMinOffset |
rocketmq_brokeruntime_remain_howmanydata_toflush | remainHowManyDataToFlush | |
rocketmq_brokeruntime_getfound_tps600 | 过去 600 秒内 getMessage 期间接收的消息的平均 TPS。 | getFoundTps |
rocketmq_brokeruntime_getfound_tps60 | 过去 60 秒内 getMessage 期间接收的消息的平均 TPS。 | |
rocketmq_brokeruntime_getfound_tps10 | 过去 10 秒内 getMessage 期间接收的消息的平均 TPS。 | |
rocketmq_brokeruntime_gettotal_tps600 | 过去 600 秒内 getMessage 调用的平均 TPS。 | getTotalTps |
rocketmq_brokeruntime_gettotal_tps60 | 过去 60 秒内 getMessage 调用的平均 TPS。 | |
rocketmq_brokeruntime_gettotal_tps10 | 过去 10 秒内 getMessage 调用的平均 TPS。 | |
rocketmq_brokeruntime_gettransfered_tps600 | getTransferedTps | |
rocketmq_brokeruntime_gettransfered_tps60 | ||
rocketmq_brokeruntime_gettransfered_tps10 | ||
rocketmq_brokeruntime_getmiss_tps600 | 过去 600 秒内未获取到消息的 getMessage 的平均 TPS | getMissTps |
rocketmq_brokeruntime_getmiss_tps60 | 过去 60 秒内未获取到消息的 getMessage 的平均 TPS | |
rocketmq_brokeruntime_getmiss_tps10 | 过去 10 秒内未获取到消息的 getMessage 的平均 TPS | |
rocketmq_brokeruntime_put_tps600 | 过去 600 秒内消息写入操作的平均 TPS | putTps |
rocketmq_brokeruntime_put_tps60 | 过去 60 秒内消息写入操作的平均 TPS | |
rocketmq_brokeruntime_put_tps10 | 过去 10 秒内消息写入操作的平均 TPS |
生产者指标
生产者指标
指标名称 | 定义 |
---|---|
rocketmq_producer_offset | 当前主题的最大偏移量 |
rocketmq_topic_retry_offset | 当前重试主题的最大偏移量 |
rocketmq_topic_dlq_offset | 当前死信主题的最大偏移量 |
rocketmq_producer_tps | Broker 组上主题的生产 TPS |
rocketmq_producer_message_size | Broker 组上主题的生产消息大小的 TPS |
rocketmq_queue_producer_tps | 队列级别的生产 TPS |
rocketmq_queue_producer_message_size | 队列级别的生产消息大小的 TPS |
消费者指标
消费者指标
指标名称 | 定义 |
---|---|
rocketmq_group_diff | 消费者组消息累积消息计数 |
rocketmq_group_retrydiff | 消费者组重试队列累积消息计数 |
rocketmq_group_dlqdiff | 消费者组死信队列累积消息计数 |
rocketmq_group_count | 消费者组中的消费者数量 |
rocketmq_client_consume_fail_msg_count | 消费者组中消费者在过去 1 小时内消费失败的次数 |
rocketmq_client_consume_fail_msg_tps | 消费者组消费者失败 TPS |
rocketmq_client_consume_ok_msg_tps | 消费者组消费者成功 TPS |
rocketmq_client_consume_rt | 消息被拉取后被消费的时间 |
rocketmq_client_consumer_pull_rt | 客户端拉取消息的时间 |
rocketmq_client_consumer_pull_tps | 客户端拉取消息 TPS |
rocketmq_consumer_tps | 每个 Broker 组上订阅组的消费 TPS |
rocketmq_group_consume_tps | 订阅组的当前消费 TPS(按 broker 聚合,用于 rocketmq_consumer_tps) |
rocketmq_consumer_offset | 订阅组在 Broker 组中的当前消费偏移量 |
rocketmq_group_consume_total_offset | 订阅组的当前消费偏移量(按 broker 聚合,用于 rocketmq_consumer_offset) |
rocketmq_consumer_message_size | 订阅组在 Broker 组中消费消息大小的 TPS |
rocketmq_send_back_nums | 订阅组在 Broker 组中消费失败并写入重试消息的次数 |
rocketmq_group_get_latency_by_storetime | 消费者组的消费延迟,当前时间与导出器获取消息的时间之差。 |