消费重试
如果消息消费失败,Apache RocketMQ 会根据消费重试策略重新投递该消息,以帮助消除某些故障。本主题介绍了消费重试功能的工作机制、版本兼容性以及使用注意事项。
适用场景
Apache RocketMQ 的消费重试功能可确保因业务处理逻辑失败而可能受损的消费完整性。此功能是针对业务逻辑失败的一种保障措施,不能用于控制业务流程。
该功能适用于以下场景:
因消息内容导致的业务失败。例如,未返回交易状态,且业务预期在特定时间内能够恢复。
消费失败的原因不会影响业务连续性。该失败发生的可能性很小,且后续消息极有可能按预期投递和消费。在此类情况下,您可以使用重试机制重新投递消息,以避免阻塞流程。
请勿在以下场景中使用该功能:
将消费失败作为处理逻辑中分流消息的条件。处理逻辑假设会有大量消息消费失败。
将消费失败用于限制消息处理速率。应使用限流机制将过多的消息暂时堆积在队列中以供稍后处理,而不是让消息进入重试链路。
目的
异步解耦中消息中间件的一个常见问题是:如果下游服务处理消息失败,如何确保整个调用链路的完整性。作为金融级可靠的消息中间件服务,Apache RocketMQ 使用设计良好的消息确认和重试机制,确保每条消息都按照业务预期进行处理。
了解 Apache RocketMQ 的消息确认和重试机制有助于解决以下问题:
如何确保每条消息都被处理:您可以基于消费逻辑确保每条消息都得到处理,并保持业务状态一致。
如何确保在发生异常时处理中的消息状态正确:您可以确保在断电等异常发生时,消息状态依然正确。
策略概述
启用消费重试功能后,Apache RocketMQ Broker 在消息消费失败时会重新发送消息。如果消息在指定的重试次数后仍然消费失败,Broker 会将消息发送到死信队列(Dead-Letter Queue)。
触发条件
消息消费失败。此时消费者返回失败状态或系统抛出异常。
发生超时错误或消息在 Push Consumer 队列中停留的时间过长。
行为
重试过程状态机:控制消息在重试过程中的状态及其变更逻辑。
重试间隔:从消费失败或超时发生到消息再次被重试之间经过的时间。
最大重试次数:消息可进行消费重试的最大次数。
策略差异
根据消费者类型的不同,消息重试策略使用不同的重试机制和配置方法。下表描述了这些策略之间的差异。
| 消费者类型 | 重试过程状态机 | 重试间隔 | 最大重试次数 |
|---|---|---|---|
| PushConsumer | * Ready * Inflight * WaitingRetry * Commit * DLQ | 创建消费组时在元数据中指定。* 无序消息:递增 * 有序消息:固定 | 创建消费组时在元数据中指定。 |
| SimpleConsumer | * Ready * Inflight * Commit * DLQ | 在 API 中的 InvisibleDuration 参数中指定。 | 创建消费组时在元数据中指定。 |
有关重试策略的更多信息,请参阅Push Consumer 的重试策略和Simple Consumer 的重试策略。
PushConsumer 的重试策略
重试过程状态机
当 Push Consumer 消费消息时,消息可能处于以下状态之一:
- Ready:消息正在 Apache RocketMQ Broker 上等待被消费。
- Inflight:消息已被获取并正在由消费者处理。但尚未返回消费结果。
- WaitingRetry:此状态仅适用于 Push Consumer。当消息消费失败或 Broker 等待消费者返回消费状态时发生超时,会触发消费重试逻辑。如果未达到最大重试次数,重试间隔结束后消息将回到 Ready 状态。处于 Ready 状态的消息可以再次被消费。您可以增加重试间隔以防止频繁重试。
- Commit:消息已完成消费。消费者返回成功响应后,状态机终止。
- DLQ:消费逻辑的预防性措施。如果达到最大重试次数后消息仍消费失败,消息将不再被重试,并被发送到死信队列。您可以消费死信队列中的消息以恢复业务。
当消息被重试时,其状态从 Ready 变为 Inflight,然后变为 WaitingRetry。两次消费之间的间隔是实际消费耗时与重试间隔之和。最大消费间隔由 Broker 上的系统参数指定,不可超过该值。
最大重试次数
Push Consumer 的最大重试次数是在创建消费组时在元数据中指定的。更多信息,请参阅消费组。
例如,如果最大重试次数为 3,则消息总共可投递 4 次:1 次原始尝试和 3 次重试。
重试间隔
无序消息(非顺序消息):递增。下表描述了详细信息。
重试次数 间隔 重试次数 间隔 1 10 秒 9 7 分钟 2 30 秒 10 8 分钟 3 1 分钟 11 9 分钟 4 2 分钟 12 10 分钟 5 3 分钟 13 20 分钟 6 4 分钟 14 30 分钟 7 5 分钟 15 1 小时 8 6 分钟 16 2 小时
如果重试次数超过 16 次,后续每次重试的间隔均为 2 小时。
- 有序消息:固定。更多信息,请参阅参数限制。
示例
对于 Push Consumer,消息重试仅由消费失败的状态码触发。意外异常也会被 SDK 捕获。
SimpleConsumer simpleConsumer = null;
// Consumption example: Consume normal messages as a push consumer and trigger a message retry by using a consumption failure.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Retry the message until the maximum number of retries is reached.
return ConsumeResult.FAILURE;
}
};
SimpleConsumer 的重试策略
重试过程状态机
当 Simple Consumer 消费消息时,消息可能处于以下状态之一:
Ready:消息正在 Apache RocketMQ Broker 上等待被消费。
Inflight:消息已被获取并正在由消费者处理。但尚未返回消费结果。
Commit:消息已完成消费。消费者返回成功响应后,状态机终止。
DLQ:消费逻辑的预防性措施。如果达到最大重试次数后消息仍消费失败,消息将不再被重试,并被发送到死信队列。您可以消费死信队列中的消息以恢复业务。
重试间隔是固定且预分配的。它由消费者在调用 API 时通过 InvisibleDuration 参数配置。该参数指定了消息的最大处理时长。当消息被重试时,该参数的值会被复用。您无需为后续重试配置间隔。
由于 InvisibleDuration 的值是预分配的,它可能无法满足您的业务需求。您可以在调用 API 的代码中更改它。
例如,如果您将 InvisibleDuration 设置为 20 毫秒,但消息无法在该时长内处理完毕,您可以将其更改为更大的值以避免触发重试机制。
在更改 InvisibleDuration 值之前,必须满足以下条件:
当前消息未发生超时错误。
当前消息的消费状态未返回。
如下图所示,更改立即生效,即 InvisibleDuration 值从调用 API 的那一刻起重新计算。
最大重试次数
Simple Consumer 的最大重试次数是在创建消费组时在元数据中指定的。更多信息,请参阅消费组。
消息重试间隔
消息重试间隔 = InvisibleDuration 值 − 实际消息处理耗时
因此,消费重试间隔由 InvisibleDuration 值控制。例如,如果 InvisibleDuration 为 30 毫秒,且在处理开始 10 毫秒后返回了消费失败,则下一次重试的时间为 20 毫秒,这意味着重试间隔为 20 毫秒。如果在 30 毫秒内未返回消费结果,则会发生超时错误并触发重试。此时,重试间隔为 0 毫秒。
示例
Simple Consumer 只需等待消息重试即可。
// Consumption example: Consume normal messages as a simple consumer. If you want a message to be retried, do not process the message. Wait for it to time out, and the broker retries it automatically.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// If you want a message to be retried after it fails to be consumed, ignore the failure and wait for the message to be visible. Then try to obtain it again from the broker.
});
} catch (ClientException e) {
// If the message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用说明
不要使用消费重试来处理消费限流
如场景中所述,消息重试适用于业务处理和消息消费失败属于小概率事件的场景。消息重试不适用于故障持续存在的场景,例如消费限流。
错误示例:在当前消费速率高于上限时返回消费失败以触发重试。
正确示例:如果当前消费速率高于限制,请稍后再获取并消费消息。
设置合理的重试次数以避免无限重试
尽管 Apache RocketMQ 支持自定义消费重试次数,但我们建议您设置较小的重试次数和较长的重试间隔,以减轻系统负担。避免进行大量重试或无限重试。