跳过主内容
版本:5.0

消费重试

如果消息消费失败,Apache RocketMQ 会根据消费重试策略重新投递消息。这有助于消除一些故障。本主题介绍消费重试功能的工作机制、版本兼容性和使用注意事项。

场景

Apache RocketMQ 的消费重试功能可确保因业务处理逻辑失败而可能受影响的消费完整性。此功能是针对业务逻辑失败的一种保护措施,不能用于控制业务流程。

该功能适用于以下场景:

  • 业务因消息内容失败。例如,事务状态未返回,且业务预期在特定时间内恢复。

  • 消费失败的原因不影响业务连续性。失败发生的可能性很小,后续消息很可能按预期投递和消费。在这种情况下,您可以使用重试机制重新投递消息,以避免阻塞流程。

请勿在以下场景中使用此功能:

  • 消费失败被用作处理逻辑中消息流转的条件。处理逻辑假设许多消息将消费失败。

  • 消费失败被用于限制消息处理速率。限流应被用于在队列中临时堆积过多的消息以供后续处理,而不是让消息进入重试链路。

目的

消息中间件在异步解耦中面临的一个常见问题是,当下游服务未能处理消息时,如何确保整个调用链路的完整性。作为金融级的可靠消息中间件服务,Apache RocketMQ 使用精心设计的消息确认和重试机制,以确保每条消息都按照业务预期进行处理。

理解 Apache RocketMQ 的消息确认和重试机制有助于解决以下问题:

  • 如何确保每条消息都被处理:您可以确保每条消息都根据其消费者逻辑进行处理,并且业务状态保持一致。

  • 如何确保正在处理的消息在发生异常时状态正确:您可以确保在发生异常(例如断电)时消息状态正确。

策略概述

启用消费重试功能后,当消息消费失败时,Apache RocketMQ Broker 会重新发送消息。如果消息在指定次数的重试后仍未能消费,Broker 会将消息发送到死信队列。

触发条件

  • 消息消费失败。在这种情况下,消费者返回失败状态或系统抛出异常。

  • 发生超时错误或消息在 Push 消费者队列中停留时间过长。

行为

  • 重试过程状态机:控制重试过程中消息的状态和变化逻辑。

  • 重试间隔:从消费失败或超时发生到消息被重试之间的时间。

  • 最大重试次数:消息可以被重试消费的最大次数。

策略差异

消息重试策略根据消费者类型使用不同的重试机制和配置方法。下表描述了策略之间的差异。

消费者类型重试过程状态机重试间隔最大重试次数
PushConsumerReady → Inflight → WaitingRetry → Commit → DLQ在创建消费者组时在元数据中指定。 无序消息:递增 顺序消息:固定在创建消费者组时在元数据中指定。
SimpleConsumerReady → Inflight → Commit → DLQ在 API 的 InvisibleDuration 参数中指定。在创建消费者组时在元数据中指定。

有关重试策略的更多信息,请参见Push 消费者重试策略Simple 消费者重试策略

PushConsumer 重试策略

重试过程状态机

当 Push 消费者消费消息时,消息可以处于以下状态之一:Push 消费者状态机

  • Ready:消息正在 Apache RocketMQ Broker 上等待被消费。
  • Inflight:消息已被消费者获取并正在消费中,但消费结果尚未返回。
  • WaitingRetry:此状态是 Push 消费者独有的。当 Broker 等待消费者返回消费状态时,消息消费失败或发生超时错误。在这种情况下,会触发消费重试逻辑。如果未达到最大重试次数,消息在重试间隔结束后会回到 Ready 状态。处于 Ready 状态的消息可以再次被消费。您可以增加重试间隔以防止频繁重试。
  • Commit:消息已被消费。在消费者返回成功响应后,状态机可以终止。
  • DLQ:消费逻辑的一种预防措施。如果消息在达到最大重试次数后仍然消费失败,则不再重试,并将其发送到死信队列。您可以消费死信队列中的消息来恢复您的业务。

当消息被重试时,其状态从 Ready 变为 Inflight,然后变为 WaitingRetry。两次消费之间的间隔是实际消费时间和重试间隔的总和。最大消费间隔由 Broker 上的系统参数指定,不能超过该值。 重试间隔

最大重试次数

Push 消费者的最大重试次数在创建消费者组时在元数据中指定。有关更多信息,请参见消费者组

例如,如果最大重试次数为三次,则消息可以投递四次:一次原始尝试和三次重试。

重试间隔

  • 无序消息(非顺序消息):递增。下表描述了详细信息。

    重试次数间隔重试次数间隔
    110 秒97 分钟
    230 秒108 分钟
    31 分钟119 分钟
    42 分钟1210 分钟
    53 分钟1320 分钟
    64 分钟1430 分钟
    75 分钟151 小时
    86 分钟162 小时
注意

如果重试次数超过 16 次,后续每次重试的间隔均为 2 小时。

  • 顺序消息:固定。有关更多信息,请参见参数限制

示例

对于 Push 消费者,消息重试仅由消费失败的状态码触发。意外异常也会被 SDK 捕获。

        SimpleConsumer simpleConsumer = null;
// Consumption example: Consume normal messages as a push consumer and trigger a message retry by using a consumption failure.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Retry the message until the maximum number of retries is reached.
return ConsumeResult.FAILURE;
}
};

SimpleConsumer 重试策略

重试过程状态机

当 Simple 消费者消费消息时,消息可以处于以下状态之一:Push 消费者状态机

  • Ready:消息正在 Apache RocketMQ Broker 上等待被消费。

  • Inflight:消息已被消费者获取并正在消费中,但消费结果尚未返回。

  • Commit:消息已被消费。在消费者返回成功响应后,状态机可以终止。

  • DLQ:消费逻辑的一种预防措施。如果消息在达到最大重试次数后仍然消费失败,则不再重试,并将其发送到死信队列。您可以消费死信队列中的消息来恢复您的业务。

重试间隔是固定的且预先分配的。它由消费者在调用 API 时通过 InvisibleDuration 参数配置。该参数指定消息的最大处理持续时间。当消息被重试时,该参数的值被重用。您无需为后续重试配置间隔。 Simple 消费者重试

由于 InvisibleDuration 值是预先分配的,它可能不符合您的业务要求。您可以在调用 API 的代码中更改它。

例如,如果您将 InvisibleDuration 值设置为 20 毫秒,并且消息无法在该持续时间内处理,您可以将该值更改为更大的值以避免触发重试机制。

在更改 InvisibleDuration 值之前,必须满足以下条件:

  • 当前消息未发生超时错误。

  • 当前消息的消费状态未返回。

如下图所示,更改会立即生效,即 InvisibleDuration 值从调用 API 的时间点开始重新计算。 修改 InvisibleDuration 值

最大重试次数

Simple 消费者的最大重试次数在创建消费者组时在元数据中指定。有关更多信息,请参见消费者组

消息重试间隔

消息重试间隔 = InvisibleDuration 值 − 消息实际处理持续时间

因此,消费重试间隔由 InvisibleDuration 值控制。例如,如果 InvisibleDuration 值为 30 毫秒,并且在处理开始 10 毫秒后返回消费失败,则下次重试的时间为 20 毫秒,这意味着重试间隔为 20 毫秒。如果在 30 毫秒内没有返回消费结果,则会发生超时错误并触发重试。此时,重试间隔为 0 毫秒。

示例

Simple 消费者只需等待消息被重试即可。

 // Consumption example: Consume normal messages as a simple consumer. If you want a message to be retried, do not process the message. Wait for it to time out, and the broker retries it automatically. 
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// If you want a message to be retried after it fails to be consumed, ignore the failure and wait for the message to be visible. Then try to obtain it again from the broker.
});
} catch (ClientException e) {
// If the message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用注意事项

请勿使用消费重试来处理消费限流

场景中所述,消息重试适用于业务处理和消息消费失败是小概率事件的场景。消息重试不适用于故障持续存在的场景,例如消费限流。

  • 错误示例:当当前消费速率高于上限时,返回消费失败以触发重试。

  • 正确示例:如果当前消费速率超出限制,则稍后获取并消费消息。

设置合适的重试次数以避免无限重试

尽管 Apache RocketMQ 支持自定义消费重试次数,但我们建议您设置较少的重试次数和较长的重试间隔,以减轻系统负担。避免大量重试或无限重试。