跳至主要内容
版本: 5.0

消费者类型

Apache RocketMQ 支持以下类型的消费者:PushConsumer、SimpleConsumer 和 PullConsumer。本主题描述了三种消费者类型的用法、工作原理、重试机制和使用场景。

背景信息

Apache RocketMQ 提供 PushConsumer、SimpleConsumer 和 PullConsumer 三种消费者类型。这三种消费者类型具有不同的集成和控制方法,您可以使用它们来满足不同业务场景下的消息需求。以下因素可以帮助您为您的业务场景选择合适的消费者类型

  • 并发消费:消费者如何使用多线程技术实现并发消息消费以提高消息处理效率?

  • 同步或异步消息处理:对于不同的集成场景,消费者可能需要异步地将接收到的消息分发到业务逻辑系统进行处理。如何实现异步消息处理?

  • 可靠消息处理:消费者如何在处理消息时返回响应结果?如何在发生消息错误时实现消息重试以确保可靠的消息处理?

有关上述问题的答案,请参见 PushConsumerSimpleConsumer

功能概述

消息消费流程

上图显示了 Apache RocketMQ 中消费者消费消息涉及的以下阶段:接收消息、处理消息和提交消费状态。

这三种类型的消费者通过提供不同的实现方法和 API 操作,适用于各种消息消费场景。下表描述了三种消费者类型的区别。

info

PullConsumer 仅推荐用于流处理框架中的集成。PushConsumer 和 SimpleConsumer 可以满足大多数场景。

您可以根据您的业务场景在 PushConsumer 和 SimpleConsumer 之间切换。当您切换到不同的消费者类型时,Apache RocketMQ 中现有资源和现有业务处理任务的使用不受影响。

danger

在同一个消费者组中混合使用 PullConsumer 和其他消费者类型是严格禁止的。

项目PushConsumerSimpleConsumerPullConsumer
API 操作调用使用消息监听器调用回调操作以返回消费结果。消费者只能在消息监听器的范围内处理消费逻辑。业务应用程序实现消息处理并调用相应的操作以返回消费结果。业务应用程序实现消息拉取和处理,并调用相应的操作以返回消费结果。
消费并发管理Apache RocketMQ SDK 用于管理消息消费的并发线程数。用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。
负载均衡机制5.0 版本中的基于消息的负载均衡,早期版本中的基于队列的负载均衡。基于消息的负载均衡。基于队列的负载均衡。
API 灵活性API 操作被封装,灵活性较差。原子操作提供了极大的灵活性。原子操作提供了极大的灵活性。
场景此消费者类型适用于不需要自定义流程的开发场景。此消费者类型适用于需要自定义流程的开发场景。建议仅在流处理框架场景中进行集成

PushConsumer

PushConsumer 是一种高度封装的消费者类型。消息消费和消费结果提交仅通过消息监听器进行处理。消息获取、消费状态提交和消费重试由 Apache RocketMQ 客户端 SDK 完成。

用法

PushConsumer 以固定方式使用。在初始化消费者时,将消息监听器注册到 PushConsumer 消费者,并在消息监听器中实现消息处理逻辑。消息获取、监听器调用触发和消息重试由 Apache RocketMQ SDK 处理。

示例代码

// Message consumption example: Use a PushConsumer consumer to consume messages. 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the messages and return the consumption result.
return ConsumeResult.SUCCESS;
}
})
.build();

PushConsumer 消费者的消息监听器返回以下结果之一

  • 消费成功:例如,当您使用 Apache RocketMQ SDK for Java 并且消息被消费时,将返回 ConsumeResult.SUCCESS。服务器根据消费结果更新消费进度。

  • 消费失败:例如,当您使用 Apache RocketMQ SDK for Java 并且消息无法被消费时,将返回 ConsumeResult.FAILURE。Apache RocketMQ 是否重试消费消息取决于消费重试逻辑。

  • 意外失败:例如,如果抛出意外异常,则消息无法被消费。Apache RocketMQ 是否重试消费消息取决于消费重试逻辑。

如果消息处理逻辑中的意外错误不断阻止 PushConsumer 消费者消费消息,SDK 会认为消费已超时并强制提交消费失败结果。然后,根据消费重试逻辑处理消息。有关消费超时的更多信息,请参见 推送消费者的重试策略

info

当发生消费超时时,SDK 会提交消费失败结果。但是,当前消费线程可能无法响应结果,并继续处理消息。::

工作机制

对于 PushConsumer,实时消息处理基于 SDK 的典型 Reactor 线程模型。SDK 具有内置的长轮询线程,该线程拉取消息并将消息存储到队列中。然后,消息从队列传递到各个消息消费线程。消息监听器根据消息消费逻辑进行操作。下图显示了 PushConsumer 消费者的消息消费过程。 PushConsumer原理

重试以确保可靠性

对于 PushConsumer,客户端 SDK 与消费逻辑单元之间的通信仅通过消息监听器实现。客户端 SDK 根据消息监听器返回的结果检查消息是否被消费,并根据消费重试逻辑执行重试以确保消息可靠性。所有消息必须以同步方式消费。消费结果在监听器操作调用结束时返回。不允许异步分发。有关消息重试的更多信息,请参见 推送消费者的重试策略

为了确保消息可靠性,Apache RocketMQ 禁止 PushConsumer 消费者在消息消费中执行以下行为。

  • 在消息消费完成之前返回消费结果。例如,对于稍后无法消费的消息,提前返回消费成功结果。在这种情况下,Apache RocketMQ 无法检查实际消费结果,也不会重试消息消费。

  • 从消息监听器将消息分发到其他自定义线程并提前返回消费结果。如果消息无法被消费,但提前返回了消费成功结果,Apache RocketMQ 无法检查实际消费结果,也不会重试消息消费。

确保消息顺序

对于 Apache RocketMQ 中的 Fifo 消息,如果为消费者组配置了有序消息消费,PushConsumer 消费者会按消费顺序消费消息。当 PushConsumer 消费者消费消息时,无需在业务逻辑中定义消费顺序,即可确保消费顺序。

在 Apache RocketMQ 中,同步提交是进行有序消息处理的先决条件。如果在业务逻辑中定义了异步分发,Apache RocketMQ 无法确保消息的顺序。::

场景

PushConsumer 将消息处理限制为同步处理,并限制每个消息的处理超时时间。PushConsumer 适用于以下场景

  • 可预测的消息处理时长:如果消息处理时长没有限制,则会不断触发需要长时间处理的消息的重试,以确保消息可靠性。这会导致大量重复消息。

  • 没有异步和自定义流程:PushConsumer 将消费逻辑的线程模型限制为 Reactor 线程模型。客户端 SDK 根据最大吞吐量处理消息。这种模型易于开发,但不允许异步或自定义流程。

SimpleConsumer

SimpleConsumer 是一种支持消息处理原子操作的消费者类型。这种类型的消费者根据业务逻辑调用操作来获取消息、提交消费状态和执行消息重试。

用法

SimpleConsumer 涉及多个 API 操作。根据需要调用相应的操作来获取消息并将消息分发到业务线程进行处理。然后,调用提交操作以提交消息处理结果。示例代码

// Consumption example: When a SimpleConsumer consumer consumes normal messages, the consumer obtain messages and commit message consumption results. 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the max await time when receive messages from the server.
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// A SimpleConsumer consumer must obtain and process messages.
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
logger.error("Failed to receive message", e);
}

下表描述了 SimpleConsumer 提供的 API 操作。

操作描述可修改参数
ReceiveMessage消费者可以调用此操作从服务器获取消息。注意 由于服务器使用分布式存储,因此服务器可能会返回空结果,即使请求的消息实际上存在于服务器上。您可以再次调用 ReceiveMessage 操作或增加 ReceiveMessage 操作中的并发值。 批量拉取大小:一次获取的消息数量。SimpleConsumer 消费者可以获取多条消息以进行批量消费。 消息不可见时长:消息的最大处理时长。此参数控制消费失败时的消息重试间隔。有关更多信息,请参阅 **SimpleConsumer 的重试策略**。调用 ReceiveMessage 操作时需要此参数。
AckMessage消费者消费消息后,消费者调用此操作将消费成功结果返回给服务器。
ChangeInvisibleDuration在消费重试场景中,消费者可以调用此操作更改消息处理时长以控制消息重试间隔。消息不可见时长:消息的最大处理时间。您可以调用此操作更改在 ReceiveMessage 操作中指定的不可见时长。在大多数情况下,此操作用于您想要增加消息处理时长的场景。

重试以确保可靠性

当 SimpleConsumer 消费者消费消息时,客户端 SDK 与 Apache RocketMQ 服务器之间的通信是通过 ReceiveMessageAckMessage 操作实现的。当客户端 SDK 成功处理消息时,将调用 AckMessage 操作。当消息处理失败时,不会返回任何确认消息,从而在指定的消息不可见时长过后触发消息重试机制。有关更多信息,请参阅 简单消费者的重试策略.

确保消息顺序

在 Apache RocketMQ 中,SimpleConsumer 消费者按存储顺序获取 FIFO 消息。如果一组有序消息中的消息未完全处理,则无法获取该组有序消息中的下一条消息。

场景

SimpleConsumer 提供原子 API 操作来获取消息并提交消费结果。与 PushConsumer 相比,SimpleConsumer 提供了更好的灵活性。SimpleConsumer 适用于以下场景

  • 不可控的消息处理时长:如果消息处理时长不可估量,建议您使用 SimpleConsumer 来防止消息被处理过长时间。您可以在消息消费期间指定估计的消息处理时长。如果现有处理时长不适合您的业务场景,您可以调用相应的 API 操作更改消息处理时长。

  • 异步处理和批量消费:SimpleConsumer 不涉及 SDK 中的复杂线程封装。业务应用程序可以使用自定义设置。这样,SimpleConsumer 消费者就可以实现异步分发、批量消费和其他自定义场景。

  • 自定义消息消费速率:使用 SimpleConsumer 时,业务应用程序调用 ReceiveMessage 操作获取消息。您可以调整获取消息的频率以控制消息消费速率。

PullConsumer

待续。

使用说明

为 PushConsumer 指定适当的消费时长限制

建议您限制 PushConsumer 消费者的消息消费时长,以防止消息被处理过长时间。长时间处理消息会导致由于消息处理超时而导致重复消息,并使下一条消息持续等待消费。如果消息经常被处理过长时间,建议您使用 SimpleConsumer 并根据您的业务需求指定合适的消息不可见时长。