跳到主要内容
版本: 5.0

消费者进度管理

Apache RocketMQ 使用消费者位点(consumer offsets)管理消费者的进度。本文档描述了 Apache RocketMQ 的消费者进度管理机制。

背景

在 Apache RocketMQ 中,消息可以在被消费者订阅之前或之后生成。那么,消费者如何知道从哪里开始消费消息,以及如何标记已消费的消息?为了克服这一挑战,Apache RocketMQ 开发了消费者进度管理机制。

Apache RocketMQ 的消费者进度管理机制解决了以下问题:

  • 客户端启动后从哪里开始消费消息?

  • 如何标记已消费的消息以确保不会重复处理?

  • 如果发生服务异常,同一客户端是否可以再次消费消息?

工作机制

消息位点

在 Apache RocketMQ 中,消息按照到达顺序在主题中排队,并被分配一个唯一的 Long 类型坐标。这也被称为消息的位点(offset)。有关这些概念的单独定义的更多信息,请参阅主题消息队列

理论上讲,一个消息队列可以存储无限数量的消息。因此,位点的取值范围是从 0 到 Long.MAX_VALUE。您可以根据消息的主题、队列和位点来定位任何消息。下图显示了这三个概念之间的关系。Offset

在 Apache RocketMQ 中,队列中最早消息的位点称为最小位点(MinOffset),最新消息的位点称为最大位点(MaxOffset)。尽管消息队列理论上可以容纳无限数量的消息,但存储它们的物理机器空间有限。因此,Apache RocketMQ 会动态删除队列中最早的消息,并且队列的 MinOffset 和 MaxOffset 值会不断增加。Consumer offset update

消费者位点

Apache RocketMQ 遵循发布-订阅模式。多个消费者组可以订阅同一个队列。在这种情况下,当一个消费者在消费消息后将其删除时,其他消费者就无法消费该消息了。

为了防止这种情况发生,Apache RocketMQ 使用消费者位点来管理不同消息的消费进度。Apache RocketMQ 不会在消息被消费后立即删除它。相反,Apache RocketMQ 维护着一个消费者组消费的最新消息的记录,这也被称为消费者位点。

如果客户端重启,消费者能够根据服务器中保存的消费者位点继续处理消息。如果消费者位点过期并被删除,则使用服务器中保存的队列的 MinOffset 值作为消费者位点。

info

消费者位点保存在 Apache RocketMQ 服务器中并从其中恢复,与任何特定消费者无关。因此,Apache RocketMQ 可以在不同消费者之间恢复消费进度。

下图显示了消息队列中最小位点、最大位点和消费者位点之间的关系。Consumer progress

  • 消费者位点总是小于或等于最大位点。

    • 如果消息的生产和消费速度相同,并且队列中没有未消费的消息,则消费者位点与最大位点相同。

    • 如果消息的消费速度慢于生产速度,则队列中存在未消费的消息。因此,消费者位点小于最大位点,差值就是未消费消息的数量。

  • 通常,消费者位点大于或等于最小位点。如果消费者位点小于最小位点,则消费者无法消费消息。在这种情况下,服务器会将正确的消费者位点恢复给消费者。

初始消费者位点

初始消费者位点是当消费者组第一次开始消费消息队列时,服务器中保存的消费者位点。

Apache RocketMQ 将消费者首次从队列获取消息时消息队列的最大位点作为初始消费者位点。换句话说,消费者从队列中的最新消息开始消费。

重置消费者位点

如果初始或当前消费者位点与您的业务状态不符,您可以重置消费者位点以调整您的消费进度。

场景

  • 初始消费者位点不当:初始消费者位点是队列的最大位点,即客户端从最新消息开始消费。如果您需要消费更早的消息,可以重置消费者位点到更早的消息。

  • 消费者滞后:如果消费者无法跟上消息生成的速度,可能会累积大量消息。如果累积的消息不是关键任务,您可以将消费者位点调整为更大的值以跳过这些消息,从而减轻下游负担。

  • 业务回溯和纠错处理:如果您想重新消费由于业务错误而错误消费的消息,可以将消费者位点设置为更小的值。

消费者位点重置功能

Apache RocketMQ 的消费者位点重置功能允许您:

  • 将消费者位点重置为消息队列中的任意位点。

  • 将消费者位点重置到特定时间点。服务器会将消费者位点调整到最接近该时间点的位点。

限制

  • 重置消费者位点后,消费者将从新的位点开始消费消息。在回溯场景中,消费者从历史消息(大部分是冷数据)开始。这被称为冷读(cold reads),可能会给您的系统带来不必要的负担。在执行此操作之前,请评估风险和收益。我们建议您对此权限实施严格的控制策略,以防止滥用和频繁重置。

  • Apache RocketMQ 仅允许您重置可见消息的消费者位点。您不能重置处于调度或重试待处理状态的消息的位点。有关更多信息,请参阅延时消息消费重试

版本兼容性

不同版本的 Apache RocketMQ 中,服务器对初始消费者位点有不同的定义。

  • 在 4.x 和 3.x 版本中,初始消费者位点被定义为队列的消息状态。

  • 在 5.x 版本中,初始消费者位点是消费者开始接收消息时队列的最大位点。

因此,如果您从早期版本升级,在启动客户端时必须注意初始消费者位点。

使用说明

严格控制重置权限

重置消费者位点会给系统带来额外负担,并可能影响消息的读写。因此,我们建议您在执行此操作之前评估风险和收益。