跳到主要内容
版本:5.0

消费者类型

Apache RocketMQ 支持以下消费者类型:PushConsumer、SimpleConsumer 和 PullConsumer。本文介绍这三种消费者类型的使用方法、工作原理、重试机制以及适用场景。

背景信息

Apache RocketMQ 提供了 PushConsumer、SimpleConsumer 和 PullConsumer 三种消费者类型。这三种消费者类型具有不同的集成和控制方式,可用于满足不同业务场景下的消息传递需求。以下因素可以帮助您为业务场景选择合适的消费者类型:

  • 并发消费:消费者如何利用多线程技术实现并发消息消费,以提高消息处理效率?

  • 同步或异步消息处理:对于不同的集成场景,消费者可能需要将收到的消息异步分发给业务逻辑系统进行处理。如何实现异步消息处理?

  • 可靠消息处理:消费者在处理消息时如何返回响应结果?当消息出现错误时,如何实现消息重试以确保可靠的消息处理?

关于上述问题的答案,请参见PushConsumerSimpleConsumer

功能概览

消息消费流程

上图显示,Apache RocketMQ 中消费者的消息消费涉及以下阶段:接收消息、处理消息和提交消费状态。

这三种消费者类型通过提供不同的实现方法和 API 操作,适用于各种消息消费场景。下表描述了三种消费者类型之间的差异。

信息

PullConsumer 仅建议在流处理框架中集成。PushConsumer 和 SimpleConsumer 可以满足大多数场景。

您可以根据业务场景在 PushConsumer 和 SimpleConsumer 之间进行切换。当您切换到不同的消费者类型时,Apache RocketMQ 中现有资源的利用和现有业务处理任务不受影响。

注意

严禁在同一消费组中混合使用 PullConsumer 和其他消费者类型。

项目PushConsumerSimpleConsumerPullConsumer
API 操作调用通过消息监听器调用回调操作返回消费结果。消费者只能在消息监听器的范围内处理消费逻辑。业务应用程序实现消息处理,并调用相应的操作返回消费结果。业务应用程序实现消息拉取和处理,并调用相应的操作返回消费结果。
消费并发管理Apache RocketMQ SDK 用于管理消息消费的并发线程数。用于消息消费的并发线程数取决于单个业务应用程序的消费逻辑。用于消息消费的并发线程数取决于单个业务应用程序的消费逻辑。
负载均衡机制5.0 版本基于消息的负载均衡,早期版本基于队列的负载均衡。基于消息的负载均衡。基于队列的负载均衡。
API 灵活性API 操作封装度高,灵活性差。原子操作提供极高的灵活性。原子操作提供极高的灵活性。
适用场景此消费者类型适用于不需要自定义流程的开发场景。此消费者类型适用于需要自定义流程的开发场景。仅建议在流处理框架场景中集成

PushConsumer

PushConsumer 是一种提供高度封装的消费者类型。消息消费和消费结果提交仅通过消息监听器进行处理。消息获取、消费状态提交和消费重试都通过 Apache RocketMQ 客户端 SDK 完成。

使用方法

PushConsumer 以固定方式使用。在消费者初始化时,向 PushConsumer 消费者注册消息监听器,并在消息监听器中实现消息处理逻辑。消息获取、监听器调用触发和消息重试均通过 Apache RocketMQ SDK 进行处理。

示例代码

// Message consumption example: Use a PushConsumer consumer to consume messages. 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the messages and return the consumption result.
return ConsumeResult.SUCCESS;
}
})
.build();

PushConsumer 消费者的消息监听器返回以下结果之一:

  • 消费成功:例如,当您使用 Apache RocketMQ SDK for Java 并且消息被消费时,返回 ConsumeResult.SUCCESS。服务器根据消费结果更新消费进度。

  • 消费失败:例如,当您使用 Apache RocketMQ SDK for Java 并且消息消费失败时,返回 ConsumeResult.FAILURE。Apache RocketMQ 是否重试消费消息取决于消费重试逻辑。

  • 意外失败:例如,如果抛出意外异常,则消息消费失败。Apache RocketMQ 是否重试消费消息取决于消费重试逻辑。

如果消息处理逻辑中的意外错误导致 PushConsumer 消费者无法持续消费消息,SDK 会认为消费已超时并强制提交消费失败结果。然后,根据消费重试逻辑处理消息。有关消费超时的更多信息,请参见Push 消费者重试策略

信息

当消费超时发生时,SDK 会提交消费失败结果。但是,当前消费线程可能无法响应结果并继续处理消息。::

工作机制

对于 PushConsumer,实时消息处理基于 SDK 典型的 Reactor 线程模型。SDK 内置一个长轮询线程,该线程拉取消息并将消息存储到队列中。然后,消息从队列传递到各个消息消费线程。消息监听器根据消息消费逻辑执行。下图显示了 PushConsumer 消费者的消息消费过程。PushConsumer原理

可靠性重试

对于 PushConsumer,客户端 SDK 与消费逻辑单元之间的通信仅通过消息监听器实现。客户端 SDK 根据消息监听器返回的结果检查消息是否被消费,并根据消费重试逻辑执行重试以确保消息可靠性。所有消息都必须以同步方式消费。消费结果在监听器操作调用结束时返回。不允许异步分发。有关消息重试的更多信息,请参见Push 消费者重试策略

为确保消息可靠性,Apache RocketMQ 禁止 PushConsumer 消费者在消息消费中出现以下行为:

  • 在消息消费完成前返回消费结果。例如,对于稍后消费失败的消息,提前返回消费成功结果。在这种情况下,Apache RocketMQ 无法检查实际消费结果,并且不会重试消息消费。

  • 将消息从消息监听器分发到其他自定义线程并提前返回消费结果。如果消息消费失败但提前返回了消费成功结果,Apache RocketMQ 无法检查实际消费结果,并且不会重试消息消费。

确保消息顺序

对于 Apache RocketMQ 中的FIFO 消息,如果为消费者组配置了顺序消息消费,PushConsumer 消费者会按照消费顺序消费消息。当 PushConsumer 消费者消费消息时,消息顺序得到保证,无需单个业务应用程序在业务逻辑中定义消费顺序。

在 Apache RocketMQ 中,同步提交是顺序消息处理的先决条件。如果在业务逻辑中定义了异步分发,Apache RocketMQ 将无法保证消息的顺序。::

适用场景

PushConsumer 将消息处理限制为同步处理,并限制每条消息的处理超时。PushConsumer 适用于以下场景:

  • 可预测的消息处理持续时间:如果消息处理持续时间没有限制,对于需要长时间处理的消息,消息重试会持续触发以确保消息可靠性。这会导致大量重复消息。

  • 无异步和无自定义流程:PushConsumer 将消费逻辑的线程模型限制为 Reactor 线程模型。客户端 SDK 根据最大吞吐量处理消息。该模型易于开发,但不允许异步或自定义流程。

SimpleConsumer

SimpleConsumer 是一种支持消息处理原子操作的消费者类型。此类消费者根据业务逻辑调用操作来获取消息、提交消费状态并执行消息重试。

使用方法

SimpleConsumer 涉及多个 API 操作。根据需要调用相应的操作以获取消息并分发给业务线程进行处理。然后,调用 commit 操作提交消息处理结果。示例代码

// Consumption example: When a SimpleConsumer consumer consumes normal messages, the consumer obtain messages and commit message consumption results. 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the max await time when receive messages from the server.
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// A SimpleConsumer consumer must obtain and process messages.
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
logger.error("Failed to receive message", e);
}

下表描述了为 SimpleConsumer 提供的 API 操作。

操作描述可修改参数
ReceiveMessage消费者可以调用此操作从服务器获取消息。注意 由于服务器使用分布式存储,即使请求的消息实际存在于服务器上,服务器也可能返回空结果。您可以再次调用 ReceiveMessage 操作或增加 ReceiveMessage 操作中的并发值。 批量拉取大小:一次获取的消息数量。SimpleConsumer 消费者可以获取多条消息进行批量消费。 消息不可见持续时间:消息的最大处理持续时间。如果消费失败,此参数控制消息重试间隔。更多信息,请参见 SimpleConsumer 重试策略。调用 ReceiveMessage 操作时此参数是必需的。
AckMessage消费者消费消息后,调用此操作向服务器返回消费成功结果。
ChangeInvisibleDuration在消费重试场景中,消费者可以调用此操作更改消息处理持续时间以控制消息重试间隔。消息不可见持续时间:消息的最大处理时间。您可以调用此操作更改 ReceiveMessage 操作中指定的消息不可见持续时间。在大多数情况下,此操作用于您希望增加消息处理持续时间的场景。

可靠性重试

当 SimpleConsumer 消费者消费消息时,客户端 SDK 与 Apache RocketMQ 服务器之间的通信通过 ReceiveMessageAckMessage 操作实现。当客户端 SDK 成功处理消息时,调用 AckMessage 操作。当消息处理失败时,在指定的消息不可见持续时间到期后,不会返回 ack 消息以触发消息重试机制。更多信息,请参见Simple 消费者重试策略

确保消息顺序

在 Apache RocketMQ 中,SimpleConsumer 消费者按照消息存储的顺序获取FIFO 消息。如果一组有序消息中的某条消息未完全处理,则无法获取该组有序消息中的下一条消息。

适用场景

SimpleConsumer 提供原子 API 操作来获取消息并提交消费结果。与 PushConsumer 相比,SimpleConsumer 提供了更好的灵活性。SimpleConsumer 适用于以下场景:

  • 不可控的消息处理持续时间:如果消息处理持续时间无法估量,我们建议您使用 SimpleConsumer 以防止消息处理时间过长。您可以在消息消费期间指定估计的消息处理持续时间。如果现有处理持续时间不适合您的业务场景,您可以调用相应的 API 操作来更改消息处理持续时间。

  • 异步处理和批量消费:SimpleConsumer 在 SDK 中不涉及复杂的线程封装。业务应用程序可以使用自定义设置。这样,SimpleConsumer 消费者可以实现异步分发、批量消费以及其他自定义场景。

  • 自定义消息消费速率:使用 SimpleConsumer 时,业务应用程序调用 ReceiveMessage 操作来获取消息。您可以调整获取消息的频率以控制消息消费速率。

PullConsumer

待续。

使用注意事项

为 PushConsumer 指定合适的消费时长限制

建议您限制 PushConsumer 消费者的消息消费时长,以防止消息长时间处理。消息长时间处理可能导致消息处理超时而产生重复消息,并使下一条消息持续等待消费。如果消息频繁处理时间过长,我们建议您使用 SimpleConsumer 并根据业务需求指定合适的消息不可见持续时间。