跳至主要内容
版本: 5.0

消费重试

如果消息无法被消费,Apache RocketMQ 会根据消费重试策略重新投递消息。这有助于消除一些故障。本主题介绍消费重试功能的工作机制、版本兼容性和使用注意事项。

场景

Apache RocketMQ 的消费重试功能确保了可能因业务处理逻辑失败而受到影响的消费完整性。此功能是针对业务逻辑失败的保护措施。它不能用于控制业务流程。

此功能适用于以下场景

  • 由于消息内容导致业务失败。例如,交易状态未返回,预计业务将在特定时间段内恢复。

  • 消费失败的原因不会影响业务连续性。失败发生的可能性很小,后续消息很有可能按预期被投递和消费。在这种情况下,您可以使用重试机制重新投递消息,以避免阻塞流程。

请勿在以下场景中使用此功能

  • 消费失败被用作处理逻辑中消息流转的条件。处理逻辑假设许多消息将无法被消费。

  • 消费失败被用于限制消息处理速率。应使用限流机制将过多的消息暂时堆积在队列中以供稍后处理,而不是让消息进入重试链路。

目的

异步解耦中消息中间件的常见问题是,如果下游服务无法处理消息,如何确保整个调用链路的完整性。作为金融级可靠消息中间件服务,Apache RocketMQ 采用精心设计的确认和重试机制,确保每条消息都按业务预期进行处理。

了解 Apache RocketMQ 的消息确认和重试机制有助于解决以下问题

  • 如何确保每条消息都被处理:您可以根据消费者的逻辑确保每条消息都被处理,并且业务状态保持一致。

  • 如何确保发生异常时正在处理的消息状态正确:您可以在发生异常(如断电)时确保消息状态正确。

策略概述

启用消费重试功能后,Apache RocketMQ 代理会在消息无法被消费时重新发送消息。如果消息即使经过指定次数的重试也无法被消费,代理会将消息发送到死信队列。

触发条件

  • 消息无法被消费。在这种情况下,消费者会返回失败状态或系统会抛出异常。

  • 发生超时错误或消息在推送消费者队列中停留时间过长。

行为

  • 重试流程状态机:控制重试流程中消息的状态和变更逻辑。

  • 重试间隔:从消费失败或超时发生到消息被重试之间的时间间隔。

  • 最大重试次数:消息可以被重试消费的最大次数。

策略差异

消息重试策略根据消费者类型使用不同的重试机制和配置方法。下表描述了策略之间的差异。

消费者类型重试流程状态机重试间隔最大重试次数
PushConsumer 就绪 飞行中 等待重试 提交 * 死信队列在创建消费者组时在元数据中指定。 无序消息:递增 顺序消息:固定在创建消费者组时在元数据中指定。
SimpleConsumer 就绪 飞行中 提交 死信队列在 API 中的 InvisibleDuration 参数中指定。在创建消费者组时在元数据中指定。

有关重试策略的更多信息,请参见 推送消费者的重试策略简单消费者的重试策略

推送消费者的重试策略

重试流程状态机

推送消费者消费消息时,消息可以处于以下状态之一:推送消费者的状态机

  • 就绪 消息正在等待被 Apache RocketMQ 代理消费。
  • 飞行中 消息已被获取,正在被消费者消费。但是,消费结果尚未返回。
  • 等待重试 此状态专属于推送消费者。消息无法被消费或代理等待消费者返回消费状态时发生超时错误。在这种情况下,会触发消费重试逻辑。如果未达到最大重试次数,消息会在重试间隔时间过后返回到就绪状态。处于就绪状态的消息可以再次被消费。您可以增加重试间隔时间,以防止频繁重试。
  • 提交 消息已被消费。消费者返回成功响应后,状态机可以终止。
  • 死信队列 消费逻辑的预防措施。如果消息即使经过最大重试次数也无法被消费,消息将不再被重试,而是被发送到死信队列。您可以消费死信队列中的消息以恢复您的业务。

重试消息时,其状态会从就绪变为飞行中,然后变为等待重试。两次消费之间的间隔时间是实际消费时间和重试间隔时间的总和。最大消费间隔时间由代理上的系统参数指定,不可超过。 重试间隔

最大重试次数

推送消费者的最大重试次数在创建消费者组时在元数据中指定。有关更多信息,请参见 消费者组

例如,如果最大重试次数为 3,则消息可以投递 4 次:一次原始尝试和 3 次重试。

重试间隔

  • 无序消息(非顺序消息):递增。下表描述了详细信息。

    重试次数间隔重试次数间隔
    110 秒97 分钟
    230 秒108 分钟
    31 分钟119 分钟
    42 分钟1210 分钟
    53 分钟1320 分钟
    64 分钟1430 分钟
    75 分钟151 小时
    86 分钟162 小时
信息

如果重试次数超过 16 次,则每次后续重试的间隔时间为 2 小时。

  • 顺序消息:固定。有关更多信息,请参见 参数限制

示例

对于推送消费者,只有消费失败状态码才会触发消息重试。SDK 也会捕获意外异常。

        SimpleConsumer simpleConsumer = null;
// Consumption example: Consume normal messages as a push consumer and trigger a message retry by using a consumption failure.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Retry the message until the maximum number of retries is reached.
return ConsumeResult.FAILURE;
}
};

简单消费者的重试策略

重试流程状态机

简单消费者消费消息时,消息可以处于以下状态之一:推送消费者状态机

  • 就绪 消息正在等待被 Apache RocketMQ 代理消费。

  • 飞行中 消息已被获取,正在被消费者消费。但是,消费结果尚未返回。

  • 提交 消息已被消费。消费者返回成功响应后,状态机可以终止。

  • 死信队列 消费逻辑的预防措施。如果消息即使经过最大重试次数也无法被消费,消息将不再被重试,而是被发送到死信队列。您可以消费死信队列中的消息以恢复您的业务。

重试间隔是固定的,并且是预先分配的。它在消费者调用 API 时由消费者在 InvisibleDuration 参数中配置。该参数指定消息的最大处理时长。重试消息时,会重复使用该参数的值。您无需为后续重试配置间隔时间。 简单消费者的重试

由于 InvisibleDuration 值是预先分配的,它可能无法满足您的业务需求。您可以在用于调用 API 的代码中更改它。

例如,如果您将 InvisibleDuration 值设置为 20 毫秒,并且消息无法在该时长内处理,您可以将该值更改为更大的值,以避免触发重试机制。

在您可以更改 InvisibleDuration 值之前,必须满足以下条件

  • 当前消息未发生超时错误。

  • 当前消息的消费状态未返回。

如下图所示,更改会立即生效,即从调用 API 的时间点开始重新计算 InvisibleDuration 值。 修改 InvisibleDuration 值

最大重试次数

简单消费者的最大重试次数在创建消费者组时在元数据中指定。有关更多信息,请参见 消费者组

消息重试间隔

消息重试间隔 = InvisibleDuration 值 − 消息实际处理时长

因此,消费重试间隔由 InvisibleDuration 值控制。例如,如果 InvisibleDuration 值为 30 毫秒,并且在处理开始后 10 毫秒返回消费失败,则下次重试的时间为 20 毫秒,这意味着重试间隔为 20 毫秒。如果在 30 毫秒内没有返回消费结果,则会发生超时错误并触发重试。然后,重试间隔为 0 毫秒。

示例

简单消费者只需要等待消息被重试。

 // Consumption example: Consume normal messages as a simple consumer. If you want a message to be retried, do not process the message. Wait for it to time out, and the broker retries it automatically. 
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// If you want a message to be retried after it fails to be consumed, ignore the failure and wait for the message to be visible. Then try to obtain it again from the broker.
});
} catch (ClientException e) {
// If the message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用说明

不要使用消费重试来处理消费限流

场景 中所述,消息重试适用于业务处理和消息消费失败是小概率事件的场景。消息重试不适用于持续失败的场景,例如消费限流。

  • 错误示例:当当前消费速率高于上限时,返回消费失败以触发重试。

  • 正确示例:如果当前消费速率高于限制,则稍后获取和消费消息。

设置适当的重试次数以避免无限重试

虽然 Apache RocketMQ 支持自定义消费重试次数,但我们建议您设置少量重试次数和较长的重试间隔,以减少系统负担。避免大量重试或无限重试。