基本最佳实践
生产者
发送消息注意事项
Tag 的使用
一个应用可以被认为是一个 Topic,消息的子类型可以通过 Tag 来标识。Tag 可以由应用自由设置。只有生产者在发送消息时设置了 Tag,消费者才能在订阅消息时通过 Broker 使用 Tag 来过滤消息。5.x SDK 可以调用 messageBuilder.setTag("messageTag"),历史版本可以调用 message.setTags("messageTag")。
Key 的使用
在服务级别,建议将每条消息映射到一个唯一的业务标识符,并设置到 keys 字段,以便未来定位消息丢失问题。服务器为每条消息创建索引(哈希索引),应用程序可以通过 topic 和 key 查询消息内容以及消息被谁消费。由于是哈希索引,请确保 key 尽可能唯一,以避免潜在的哈希冲突。常见的设置策略使用离散的唯一标识符,如订单 ID、用户 ID 和请求 ID。
日志打印
消息发送成功或者失败,都需要打印消息日志,方便排查业务故障。发送 指的是只要不抛出异常就表示消息发送成功。
消息发送失败的处理方式
Producer 的 send 方法本身支持内部重试,5.x 重试逻辑参考 发送重试策略:
上述策略也一定程度上保证了消息发送的成功。如果业务要求消息不丢失,依然需要对可能存在的异常进行兜底,比如当调用 send 同步方法发送失败时,需要尝试将消息存储到 DB,并由后台线程定时重试,以保证消息能够到达 Broker。
上述 DB 重试方式不集成到 MQ 客户端,而要求应用自行完成的原因主要基于以下考虑:首先,MQ 客户端设计为无状态模式,便于任意水平扩容,且机器资源的消耗只有 CPU、内存、网络。其次,如果 MQ 客户端内部集成 KV 存储模块,只有同步落盘数据才能可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又因为应用关闭过程不受 MQ 运维人员控制,可能经常发生 kill -9 等暴力关闭,导致数据未能及时落盘而丢失。第三,Producer 所在机器可靠性较低,通常为虚拟机,不适合存储重要数据。综上所述,推荐将重试过程由应用控制。
消费者
消费过程幂等
RocketMQ 无法避免消息重复(Exactly Once),所以如果业务对消费重复非常敏感,务必在业务层进行去重处理。这可以借助关系型数据库来实现。首先,您需要为消息确定一个唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 ID。在消费前,判断关系型数据库中是否存在该唯一键。如果不存在,则插入并消费,否则跳过。(实际处理中应考虑原子性问题,判断是否存在主键冲突,若插入失败,则直接跳过)
MsgId 必须是全局唯一标识符,但在实际应用中,可能会出现同一条消息拥有两个不同 MsgId 的情况(消费者主动重传、客户端重投机制导致的重复等),这使得业务字段的重复消费成为必要。
消费过程过慢
提高消费并行度
绝大部分消息消费都是 IO 密集型,即可能在操作数据库或者调用 RPC,这类消费的速率取决于后端数据库或者外部系统的吞吐量。通过提高消费并行度,可以提升总体的消费吞吐量,但当并行度增加到一定程度,其反而会下降。所以应用必须设置合理的并行度。修改消费并行度有几种方式:
- 在同一个 ConsumerGroup 内,增加 Consumer 实例的数量来提高并行度(注意 Consumer 实例数量超过订阅队列数是无效的)。可以增加机器,或者在已有机器上启动多个进程。
- 提高单个 Consumer 的消费并行线程,5.x PushConsumer SDK 可以通过 PushConsumerBuilder.setConsumptionThreadCount() 设置线程数,SimpleConsumer 可自由从业务线程提高并发,底层线程安全;历史版本 SDK PushConsumer 可通过修改参数 consumeThreadMin 和 consumeThreadMax 实现。
批量消费
如果某些业务流程支持批量消费,则可以大幅度提升消费吞吐量。例如订单扣减的应用,一次处理一个订单需要 1 s,而一次处理 10 个订单可能只需要 2 s,这样就可以大幅度提升消费吞吐量。推荐使用 5.x SDK 的 SimpleConsumer,设置每次接口调用批量大小,一次拉取多条消息进行消费。
重置位点跳过非重要消息
在消息堆积的情况下,如果消费速率跟不上投递速率,并且业务对数据不强求足够实时,可以选择丢弃不重要的消息。建议使用重置位点功能,直接将消费位点调整到指定时间或位置。
优化单条消息消费过程
例如,一条消息的消费过程如下:
- 查询[数据 1]根据消息从数据库中查询
- 查询[数据 2]根据消息从数据库中查询
- 复杂业务计算
- 插入[数据 3]到数据库中
- 插入[数据 4]到数据库中
这条消息的消费过程中有四次与 DB 的交互。如果每次交互计算为 5ms,则总时间为 20ms。假设业务计算耗时 5ms,则总时间为 25ms。因此,如果四次 DB 交互能够优化为两次,总时间可以优化到 15ms,这意味着整体性能提升 40%。所以,如果应用对延迟敏感,可以将 DB 部署在 SSD 盘上。与 SCSI 盘相比,前者的 RT 要小得多。
消费打印日志
如果消息量较小,建议在消费入口方法中打印消息,以便排查耗时长的消费情况。
new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
LOGGER.info("Consume message={}", messageView);
//Do your consume process
return ConsumeResult.SUCCESS;
}
}
如果能打印每条消息的消费耗时,则排查线上消费过慢等问题会更加方便。
Broker
Broker 角色
Broker 角色分为 ASYNC_MASTER、SYNC_MASTER 和 SLAVE。如果对消息可靠性有严格要求,部署 SYNC_MASTER 加 SLAVE。如果不需要消息可靠性,部署 ASYNC_MASTER 加 SLAVE。如果只是为了测试方便,可以选择只部署 ASYNC_MASTER 或只部署 SYNC_MASTER。
FlushDiskType
与 ASYNC_FLUSH 相比,SYNC_FLUSH 会带来性能损失,但更可靠。因此,必须根据实际服务场景进行权衡。
Broker 配置
参数 | 默认值 | 描述 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | NameServer 地址 |
brokerIP1 | 网络 InetAddress | Broker 当前监听的 IP 地址 |
brokerIP2 | 与 brokerIP1 相同 | 当存在主从 Broker 时,如果在 Broker 主节点配置了 brokerIP2 属性,则 Broker 从节点会连接主节点上配置的 brokerIP2 进行同步。 |
brokerName | null | Broker 名称 |
brokerClusterName | DefaultCluster | 此 Broker 所属的集群名称 |
brokerId | 0 | Broker ID,0 表示主节点,其他正整数表示从节点 |
storePathCommitLog | $HOME/store/commitlog/ | 存储 Commit Log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储消费队列的路径 |
mappedFileSizeCommitLog | 1024 * 1024 *1024(1G) | Commit Log 映射文件大小 |
deleteWhen | 04 | 一天中哪个时间点删除文件保留时间已超的 Commit Log |
fileReservedTime | 72 | 文件保留时间,单位为小时 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 Broker 保证在收到生产者的确认前刷新消息。ASYNC_FLUSH Broker 使用刷新模式,以提高性能来刷新一组消息。 |
Broker 日志管理
Broker 的默认日志路径位于 ${user.home}/logs/rocketmqlogs/。您可以通过编辑二进制包 conf 目录下的 xx.logback.xml 文件来更改日志级别和路径。
注意:请确保您的日志已妥善保护,以防止敏感信息泄露。