跳至主要内容
版本: 5.0

消费者进度管理

Apache RocketMQ 使用消费者偏移量来管理消费者的进度。本主题介绍 Apache RocketMQ 的消费者进度管理机制。

背景

在 Apache RocketMQ 中,消息可以在被消费者订阅之前或之后生成。那么消费者如何知道从哪里开始消费消息,以及如何标记已消费的消息呢?为了克服这一挑战,Apache RocketMQ 开发了消费者进度管理机制。

Apache RocketMQ 的消费者进度管理机制解决了以下问题

  • 客户端启动后从哪里开始消费消息?

  • 如何标记已消费的消息,以确保它不会被多次处理?

  • 如果发生服务异常,同一客户端可以再次消费消息吗?

工作机制

消息偏移量

在 Apache RocketMQ 中,消息按到达顺序排队到主题中,并被分配一个唯一的 Long 类型坐标。这也称为消息的偏移量。有关这些概念的单独定义的更多信息,请参阅 主题消息队列

理论上讲,消息队列可以存储无限数量的消息。因此,偏移量的值范围从 0 到 Long.MAX_VALUE。您可以根据主题、队列和偏移量定位任何消息。下图显示了这三个概念之间的关系。偏移量

在 Apache RocketMQ 中,队列中最早消息的偏移量称为最小偏移量 (MinOffset),最新消息的偏移量称为最大偏移量 (MaxOffset)。虽然消息队列理论上可以容纳无限数量的消息,但存储它们的物理机器的空间有限。因此,Apache RocketMQ 会动态地从队列中删除最早的消息,队列的 MinOffset 和 MaxOffset 值会不断增加。消费者偏移量更新

消费者偏移量

Apache RocketMQ 遵循发布-订阅模式。多个消费者组可以订阅同一个队列。在这种情况下,当一个消费者在消费消息后删除消息时,其他消费者将无法消费它。

为了防止这种情况发生,Apache RocketMQ 使用消费者偏移量来管理不同消费者的消息消费进度。Apache RocketMQ 不会在消息被消费后立即删除它。相反,Apache RocketMQ 会维护一个记录消费者组消费的最新消息的记录,这也称为消费者偏移量。

如果客户端重新启动,消费者可以根据服务器中保存的消费者偏移量继续处理消息。如果消费者偏移量过期并被删除,则服务器中保存的队列的 MinOffset 值将用作消费者偏移量。

信息

消费者偏移量保存在 Apache RocketMQ 服务器中并从其恢复,与任何特定消费者无关。因此,Apache RocketMQ 可以跨不同消费者恢复消费者进度。

下图显示了消息队列中最小偏移量、最大偏移量和消费者偏移量之间的关系。消费者进度

  • 消费者偏移量始终小于或等于最大偏移量。

    • 如果消息的生产和消费速度相同,并且队列中没有未消费的消息,则消费者偏移量与最大偏移量相同。

    • 如果消息的消费速度比生产速度慢,则队列中存在未消费的消息。因此,消费者偏移量小于最大偏移量,差值为未消费的消息数量。

  • 通常,消费者偏移量大于或等于最小偏移量。如果消费者偏移量小于最小偏移量,则消费者将无法消费消息。在这种情况下,服务器会将正确的消费者偏移量恢复到消费者。

初始消费者偏移量

初始消费者偏移量是消费者组第一次开始消费消息队列时保存在服务器中的消费者偏移量。

Apache RocketMQ 使用消息队列的最大偏移量作为初始消费者偏移量,当消费者第一次从队列中获取消息时。换句话说,消费者从队列中的最新消息开始消费。

重置消费者偏移量

如果初始或当前消费者偏移量与您的业务状态不一致,您可以重置消费者偏移量以调整您的消费者进度。

场景

  • 不正确的初始消费者偏移量:初始消费者偏移量是队列的最大偏移量,即客户端从最新消息开始消费。如果您需要消费更早的消息,您可以将消费者偏移量重置为更早消息的偏移量。

  • 消费者延迟:如果消费者无法跟上消息生成的速率,可能会累积大量消息。如果累积的消息不是关键任务,您可以将消费者偏移量调整为更大的值以跳过这些消息并减轻下游负担。

  • 业务回溯和纠正处理:如果您想重新消费由于业务错误而被错误消费的消息,您可以将消费者偏移量设置为更小的值。

消费者偏移量重置功能

Apache RocketMQ 的消费者偏移量重置功能允许您

  • 将消费者偏移量重置为消息队列中的任何偏移量。

  • 将消费者偏移量重置为特定时间点。服务器会将消费者偏移量调整为最接近该时间点的偏移量。

限制

  • 重置消费者偏移量后,消费者将从新的偏移量开始消费消息。在回溯场景中,消费者从历史消息开始,这些消息大多是冷数据。这被称为冷读取,可能会给您的系统带来不必要的负担。在执行此操作之前,请评估风险和收益。我们建议您为此权限实施严格的控制策略,以防止滥用和频繁重置。

  • Apache RocketMQ 仅允许您重置可见消息的消费者偏移量。您无法重置处于调度或重试挂起状态的消息的偏移量。有关更多信息,请参阅 延迟消息消费重试

版本兼容性

服务器在 Apache RocketMQ 的不同版本中对初始消费者偏移量的定义不同。

  • 在 4.x 和 3.x 版本中,初始消费者偏移量定义为队列的消息状态。

  • 在 5.x 版本中,初始消费者偏移量是消费者开始接收消息时队列的最大偏移量。

因此,如果您从早期版本升级,则在启动客户端时必须注意初始消费者偏移量。

使用说明

严格控制重置权限

重置消费者偏移量会给系统带来额外的负担,并可能影响消息的读取和写入。因此,我们建议您在执行此操作之前评估风险和收益。