跳到主要内容
版本: 5.0

基本最佳实践

生产者

发送消息注意事项

Tag 的使用

一个应用可以被认为是一个 Topic,消息的子类型可以通过 Tag 来标识。Tag 可以由应用自由设置。只有生产者在发送消息时设置了 Tag,消费者才能在订阅消息时通过 Broker 使用 Tag 来过滤消息。5.x SDK 可以调用 messageBuilder.setTag("messageTag"),历史版本可以调用 message.setTags("messageTag")。

Key 的使用

在服务级别,建议将每条消息映射到一个唯一的业务标识符,并设置到 keys 字段,以便未来定位消息丢失问题。服务器为每条消息创建索引(哈希索引),应用程序可以通过 topic 和 key 查询消息内容以及消息被谁消费。由于是哈希索引,请确保 key 尽可能唯一,以避免潜在的哈希冲突。常见的设置策略使用离散的唯一标识符,如订单 ID、用户 ID 和请求 ID。

日志打印

消息发送成功或者失败,都需要打印消息日志,方便排查业务故障。发送 指的是只要不抛出异常就表示消息发送成功。

消息发送失败的处理方式

Producer 的 send 方法本身支持内部重试,5.x 重试逻辑参考 发送重试策略

上述策略也一定程度上保证了消息发送的成功。如果业务要求消息不丢失,依然需要对可能存在的异常进行兜底,比如当调用 send 同步方法发送失败时,需要尝试将消息存储到 DB,并由后台线程定时重试,以保证消息能够到达 Broker。

上述 DB 重试方式不集成到 MQ 客户端,而要求应用自行完成的原因主要基于以下考虑:首先,MQ 客户端设计为无状态模式,便于任意水平扩容,且机器资源的消耗只有 CPU、内存、网络。其次,如果 MQ 客户端内部集成 KV 存储模块,只有同步落盘数据才能可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又因为应用关闭过程不受 MQ 运维人员控制,可能经常发生 kill -9 等暴力关闭,导致数据未能及时落盘而丢失。第三,Producer 所在机器可靠性较低,通常为虚拟机,不适合存储重要数据。综上所述,推荐将重试过程由应用控制。

消费者

消费过程幂等

RocketMQ 无法避免消息重复(Exactly Once),所以如果业务对消费重复非常敏感,务必在业务层进行去重处理。这可以借助关系型数据库来实现。首先,您需要为消息确定一个唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 ID。在消费前,判断关系型数据库中是否存在该唯一键。如果不存在,则插入并消费,否则跳过。(实际处理中应考虑原子性问题,判断是否存在主键冲突,若插入失败,则直接跳过)

MsgId 必须是全局唯一标识符,但在实际应用中,可能会出现同一条消息拥有两个不同 MsgId 的情况(消费者主动重传、客户端重投机制导致的重复等),这使得业务字段的重复消费成为必要。

消费过程过慢

提高消费并行度

绝大部分消息消费都是 IO 密集型,即可能在操作数据库或者调用 RPC,这类消费的速率取决于后端数据库或者外部系统的吞吐量。通过提高消费并行度,可以提升总体的消费吞吐量,但当并行度增加到一定程度,其反而会下降。所以应用必须设置合理的并行度。修改消费并行度有几种方式:

  • 在同一个 ConsumerGroup 内,增加 Consumer 实例的数量来提高并行度(注意 Consumer 实例数量超过订阅队列数是无效的)。可以增加机器,或者在已有机器上启动多个进程。
  • 提高单个 Consumer 的消费并行线程,5.x PushConsumer SDK 可以通过 PushConsumerBuilder.setConsumptionThreadCount() 设置线程数,SimpleConsumer 可自由从业务线程提高并发,底层线程安全;历史版本 SDK PushConsumer 可通过修改参数 consumeThreadMin 和 consumeThreadMax 实现。

批量消费

如果某些业务流程支持批量消费,则可以大幅度提升消费吞吐量。例如订单扣减的应用,一次处理一个订单需要 1 s,而一次处理 10 个订单可能只需要 2 s,这样就可以大幅度提升消费吞吐量。推荐使用 5.x SDK 的 SimpleConsumer,设置每次接口调用批量大小,一次拉取多条消息进行消费。

重置位点跳过非重要消息

在消息堆积的情况下,如果消费速率跟不上投递速率,并且业务对数据不强求足够实时,可以选择丢弃不重要的消息。建议使用重置位点功能,直接将消费位点调整到指定时间或位置。

优化单条消息消费过程

例如,一条消息的消费过程如下:

  • 查询[数据 1]根据消息从数据库中查询
  • 查询[数据 2]根据消息从数据库中查询
  • 复杂业务计算
  • 插入[数据 3]到数据库中
  • 插入[数据 4]到数据库中

这条消息的消费过程中有四次与 DB 的交互。如果每次交互计算为 5ms,则总时间为 20ms。假设业务计算耗时 5ms,则总时间为 25ms。因此,如果四次 DB 交互能够优化为两次,总时间可以优化到 15ms,这意味着整体性能提升 40%。所以,如果应用对延迟敏感,可以将 DB 部署在 SSD 盘上。与 SCSI 盘相比,前者的 RT 要小得多。

消费打印日志

如果消息量较小,建议在消费入口方法中打印消息,以便排查耗时长的消费情况。

   new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
LOGGER.info("Consume message={}", messageView);
//Do your consume process
return ConsumeResult.SUCCESS;
}
}

如果能打印每条消息的消费耗时,则排查线上消费过慢等问题会更加方便。

Broker

Broker 角色

Broker 角色分为 ASYNC_MASTER、SYNC_MASTER 和 SLAVE。如果对消息可靠性有严格要求,部署 SYNC_MASTER 加 SLAVE。如果不需要消息可靠性,部署 ASYNC_MASTER 加 SLAVE。如果只是为了测试方便,可以选择只部署 ASYNC_MASTER 或只部署 SYNC_MASTER。

FlushDiskType

与 ASYNC_FLUSH 相比,SYNC_FLUSH 会带来性能损失,但更可靠。因此,必须根据实际服务场景进行权衡。

Broker 配置

参数默认值描述
listenPort10911接受客户端连接的监听端口
namesrvAddrnullNameServer 地址
brokerIP1网络 InetAddressBroker 当前监听的 IP 地址
brokerIP2与 brokerIP1 相同当存在主从 Broker 时,如果在 Broker 主节点配置了 brokerIP2 属性,则 Broker 从节点会连接主节点上配置的 brokerIP2 进行同步。
brokerNamenullBroker 名称
brokerClusterNameDefaultCluster此 Broker 所属的集群名称
brokerId0Broker ID,0 表示主节点,其他正整数表示从节点
storePathCommitLog$HOME/store/commitlog/存储 Commit Log 的路径
storePathConsumerQueue$HOME/store/consumequeue/存储消费队列的路径
mappedFileSizeCommitLog1024 * 1024 *1024(1G)Commit Log 映射文件大小
deleteWhen04一天中哪个时间点删除文件保留时间已超的 Commit Log
fileReservedTime72文件保留时间,单位为小时
brokerRoleASYNC_MASTERSYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskTypeASYNC_FLUSHSYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 Broker 保证在收到生产者的确认前刷新消息。ASYNC_FLUSH Broker 使用刷新模式,以提高性能来刷新一组消息。

Broker 日志管理

Broker 的默认日志路径位于 ${user.home}/logs/rocketmqlogs/。您可以通过编辑二进制包 conf 目录下的 xx.logback.xml 文件来更改日志级别和路径。

注意:请确保您的日志已妥善保护,以防止敏感信息泄露。