跳至主要内容
版本: 5.0

基本最佳实践

生产者

发送消息的注意事项

标签的使用

一个应用程序可以被标识为一个主题,消息子类型可以被标识为标签。标签可以由应用程序自由设置。只有当生产者在发送消息时设置标签时,消费者才能在订阅消息时使用标签通过代理过滤消息。5.x SDK 可以调用 messageBuilder.setTag("messageTag"),历史版本可以调用 message.setTags("messageTag")。

键的使用

在服务级别,建议将每条消息映射到一个唯一的服务标识符,并将其设置为键字段,以便将来定位消息丢失问题。服务器为每条消息创建一个索引(哈希索引),应用程序可以通过主题和键查询消息内容,以及消息由谁消费。由于它是哈希索引,请确保键尽可能唯一,以避免潜在的哈希冲突。常见的设置策略使用离散的唯一标识符,例如订单 ID、用户 ID 和请求 ID。

打印日志

如果消息发送成功或失败,您需要打印消息日志以进行服务故障排除。发送表示只要没有抛出异常,消息就已成功发送。

消息发送失败的处理方法

生产者的 send 方法本身支持内部重试,5.x 重试逻辑参考 发送重试策略

上述策略在一定程度上也保证了消息发送的成功。如果业务要求消息无损发送,仍然需要覆盖可能的异常,例如当调用 send 同步方法时,发送失败,则尝试将消息存储到 db 中,并由后台线程定期重试,以确保消息到达 Broker。

上述 DB 重试方法没有集成到 MQ 客户端,而是需要应用程序自行完成,主要基于以下考虑:首先,MQ 客户端被设计为无状态模式,方便任意水平扩展,机器资源消耗仅为 cpu、内存、网络。其次,如果 MQ 客户端内部集成了 KV 存储模块,只有同步磁盘落盘才能保证数据可靠,而同步磁盘落盘本身性能开销很大,因此通常使用异步磁盘落盘,并且由于应用程序关闭过程不受 MQ 运维人员控制,因此经常会发生 kill -9 这样的暴力关闭。导致数据没有及时落盘而丢失。第三,生产者所在的机器可靠性低,一般是虚拟机,不适合存储重要数据。综上所述,建议由应用程序控制重试过程。

消费者

消费过程是幂等的

RocketMQ 无法避免消息重复(Exactly Once),因此如果业务对消费重复非常敏感,则务必在业务层进行去重处理。这可以通过关系型数据库的帮助来实现。您首先需要确定消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 ID。在消费之前确定唯一键是否存在于关系型数据库中。如果不存在,则插入并消费,否则跳过。(实际过程需要考虑原子性问题,判断是否存在主键冲突,则插入失败,直接跳过)

MsgId 必须是全局唯一的标识符,但在实际应用中,可能会出现同一条消息有两个不同 msgId 的情况(消费者主动重传、客户端重投机制导致重复等),这需要对业务字段进行重复消费。

消费过程缓慢

提高消费并行度

绝大多数消息消费都是 IO 密集型,即可能在数据库上进行操作或调用 RPC,这种消费的消费速度取决于后端数据库或外部系统的吞吐量。通过提高消费并行度,可以提高总的消费吞吐量,但当并行度提高到一定程度后,就会下降。因此,应用程序必须设置合理的并行度。有几种方法可以修改消费并行度

  • 在同一个 ConsumerGroup 中,我们增加 Consumer 实例的数量来提高并行度(注意超过订阅队列的 Consumer 实例无效)。您可以添加一台机器,或者在现有机器上启动多个进程。
  • 提高单个 Consumer 的消费并行线程,5.x PushConsumer SDK 可以 PushConsumerBuilder.setConsumptionThreadCount() 设置线程数量,SimpleConsumer 可以自由地从业务线程中增加并发,底层线程是安全的;历史 SDK PushConsumer 可以通过修改参数 consumeThreadMin 和 consumeThreadMax 来实现。

批量消费

如果某些业务流程支持批量消费,可以大大提高消费吞吐量。例如,订单扣减的应用一次处理一个订单需要 1 秒,而一次处理 10 个订单可能只需要 2 秒,因此可以大大提高消费吞吐量。建议使用 5.x SDK 中的 SimpleConsumer,设置每次接口调用的批量大小,一次拉取多条消息。

重置位置以跳过不重要的消息

在消息堆积的情况下,如果消费速度跟不上投递速度,并且业务对数据要求不高,可以选择丢弃不重要的消息。建议使用重置位置功能,直接将消费位置调整到指定时间或位置。

优化每条消息的消费过程

例如,一条消息的消费过程如下

  • 查询[数据 1]从 DB 中根据消息
  • 查询[数据 2]从 DB 中根据消息
  • 复杂的业务计算
  • 插入[数据 3]到 DB 中
  • 插入[数据 4]到 DB 中

在消费这条消息的过程中,与 DB 有四次交互。如果我们将每次交互计算为 5ms,那么总时间为 20ms。假设服务计算需要 5ms,那么总时间为 25ms。因此,如果可以将四次 DB 交互优化为两次,那么总时间可以优化到 15ms,这意味着整体性能提高了 40%。因此,如果应用程序对延迟敏感,则可以将 DB 部署在 SSD 磁盘上。与 SCSI 磁盘相比,前者的 RT 要小得多。

消费打印日志

如果消息数量较少,建议在消费入口方法中打印消息,这需要很长时间才能消费。

   new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
LOGGER.info("Consume message={}", messageView);
//Do your consume process
return ConsumeResult.SUCCESS;
}
}

如果可以打印每条消息的消费时间,将更方便地排查消费缓慢等线上问题。

代理

代理角色

代理角色分为 ASYNC_MASTER、SYNC_MASTER 和 SLAVE。如果您对消息可靠性有严格的要求,请部署 SYNC_MASTER 加 SLAVE。如果不需要消息可靠性,请部署 ASYNC_MASTER 加 SLAVE。如果测试仅方便,可以选择仅部署 ASYNC_MASTER 或仅部署 SYNC_MASTER。

FlushDiskType

与 ASYNC_FLUSH 相比,SYNC_FLUSH 会造成性能损失,但更可靠。因此,必须根据实际服务场景进行权衡。

代理配置

参数默认值描述
listenPort10911接受客户端连接的监听端口
namesrvAddrnullName Server 地址
brokerIP1网络 InetAddressBroker 当前监听的 IP 地址
brokerIP2与 brokerIP1 相同当存在主/从 Broker 时,如果在 Broker 主节点上配置了 brokerIP2 属性,则 Broker 从节点将连接到主节点上配置的 brokerIP2 进行同步
brokerNamenullBroker 名称
brokerClusterNameDefaultCluster此 Broker 所属的集群名称
brokerId0Broker ID,0 表示主节点,其他正整数表示从节点
storePathCommitLog$HOME/store/commitlog/存储 Commit Log 的路径
storePathConsumerQueue$HOME/store/consumequeue/存储消费队列的路径
mapedFileSizeCommitLog1024 * 1024 *1024(1G)Commit Log 映射文件大小
deleteWhen04应在一天中的什么时间删除文件保留时间已超过的 Commit Log
fileReservedTime72文件保留时间(以小时为单位)
brokerRoleASYNC_MASTERSYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskTypeASYNC_FLUSHSYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 Broker 保证在收到生产者确认之前刷新消息。ASYNC_FLUSH Broker 使用刷新模式来刷新一组消息,以获得更好的性能。