消费者类型
Apache RocketMQ 支持以下消费者类型:PushConsumer、SimpleConsumer 和 PullConsumer。本文介绍这三种消费者类型的使用方法、工作原理、重试机制以及适用场景。
背景信息
Apache RocketMQ 提供了 PushConsumer、SimpleConsumer 和 PullConsumer 三种消费者类型。这三种消费者类型具有不同的集成和控制方式,可用于满足不同业务场景下的消息传递需求。以下因素可以帮助您为业务场景选择合适的消费者类型:
并发消费:消费者如何利用多线程技术实现并发消息消费,以提高消息处理效率?
同步或异步消息处理:对于不同的集成场景,消费者可能需要将收到的消息异步分发给业务逻辑系统进行处理。如何实现异步消息处理?
可靠消息处理:消费者在处理消息时如何返回响应结果?当消息出现错误时,如何实现消息重试以确保可靠的消息处理?
关于上述问题的答案,请参见PushConsumer 和 SimpleConsumer。
功能概览
上图显示,Apache RocketMQ 中消费者的消息消费涉及以下阶段:接收消息、处理消息和提交消费状态。
这三种消费者类型通过提供不同的实现方法和 API 操作,适用于各种消息消费场景。下表描述了三种消费者类型之间的差异。
PullConsumer 仅建议在流处理框架中集成。PushConsumer 和 SimpleConsumer 可以满足大多数场景。
您可以根据业务场景在 PushConsumer 和 SimpleConsumer 之间进行切换。当您切换到不同的消费者类型时,Apache RocketMQ 中现有资源的利用和现有业务处理任务不受影响。
严禁在同一消费组中混合使用 PullConsumer 和其他消费者类型。
项目 | PushConsumer | SimpleConsumer | PullConsumer |
---|---|---|---|
API 操作调用 | 通过消息监听器调用回调操作返回消费结果。消费者只能在消息监听器的范围内处理消费逻辑。 | 业务应用程序实现消息处理,并调用相应的操作返回消费结果。 | 业务应用程序实现消息拉取和处理,并调用相应的操作返回消费结果。 |
消费并发管理 | Apache RocketMQ SDK 用于管理消息消费的并发线程数。 | 用于消息消费的并发线程数取决于单个业务应用程序的消费逻辑。 | 用于消息消费的并发线程数取决于单个业务应用程序的消费逻辑。 |
负载均衡机制 | 5.0 版本基于消息的负载均衡,早期版本基于队列的负载均衡。 | 基于消息的负载均衡。 | 基于队列的负载均衡。 |
API 灵活性 | API 操作封装度高,灵活性差。 | 原子操作提供极高的灵活性。 | 原子操作提供极高的灵活性。 |
适用场景 | 此消费者类型适用于不需要自定义流程的开发场景。 | 此消费者类型适用于需要自定义流程的开发场景。 | 仅建议在流处理框架场景中集成 |
PushConsumer
PushConsumer 是一种提供高度封装的消费者类型。消息消费和消费结果提交仅通过消息监听器进行处理。消息获取、消费状态提交和消费重试都通过 Apache RocketMQ 客户端 SDK 完成。
使用方法
PushConsumer 以固定方式使用。在消费者初始化时,向 PushConsumer 消费者注册消息监听器,并在消息监听器中实现消息处理逻辑。消息获取、监听器调用触发和消息重试均通过 Apache RocketMQ SDK 进行处理。
示例代码
// Message consumption example: Use a PushConsumer consumer to consume messages.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the messages and return the consumption result.
return ConsumeResult.SUCCESS;
}
})
.build();
PushConsumer 消费者的消息监听器返回以下结果之一:
消费成功:例如,当您使用 Apache RocketMQ SDK for Java 并且消息被消费时,返回
ConsumeResult.SUCCESS
。服务器根据消费结果更新消费进度。消费失败:例如,当您使用 Apache RocketMQ SDK for Java 并且消息消费失败时,返回
ConsumeResult.FAILURE
。Apache RocketMQ 是否重试消费消息取决于消费重试逻辑。意外失败:例如,如果抛出意外异常,则消息消费失败。Apache RocketMQ 是否重试消费消息取决于消费重试逻辑。
如果消息处理逻辑中的意外错误导致 PushConsumer 消费者无法持续消费消息,SDK 会认为消费已超时并强制提交消费失败结果。然后,根据消费重试逻辑处理消息。有关消费超时的更多信息,请参见Push 消费者重试策略。
当消费超时发生时,SDK 会提交消费失败结果。但是,当前消费线程可能无法响应结果并继续处理消息。::
工作机制
对于 PushConsumer,实时消息处理基于 SDK 典型的 Reactor 线程模型。SDK 内置一个长轮询线程,该线程拉取消息并将消息存储到队列中。然后,消息从队列传递到各个消息消费线程。消息监听器根据消息消费逻辑执行。下图显示了 PushConsumer 消费者的消息消费过程。
可靠性重试
对于 PushConsumer,客户端 SDK 与消费逻辑单元之间的通信仅通过消息监听器实现。客户端 SDK 根据消息监听器返回的结果检查消息是否被消费,并根据消费重试逻辑执行重试以确保消息可靠性。所有消息都必须以同步方式消费。消费结果在监听器操作调用结束时返回。不允许异步分发。有关消息重试的更多信息,请参见Push 消费者重试策略。
为确保消息可靠性,Apache RocketMQ 禁止 PushConsumer 消费者在消息消费中出现以下行为:
在消息消费完成前返回消费结果。例如,对于稍后消费失败的消息,提前返回消费成功结果。在这种情况下,Apache RocketMQ 无法检查实际消费结果,并且不会重试消息消费。
将消息从消息监听器分发到其他自定义线程并提前返回消费结果。如果消息消费失败但提前返回了消费成功结果,Apache RocketMQ 无法检查实际消费结果,并且不会重试消息消费。
确保消息顺序
对于 Apache RocketMQ 中的FIFO 消息,如果为消费者组配置了顺序消息消费,PushConsumer 消费者会按照消费顺序消费消息。当 PushConsumer 消费者消费消息时,消息顺序得到保证,无需单个业务应用程序在业务逻辑中定义消费顺序。
在 Apache RocketMQ 中,同步提交是顺序消息处理的先决条件。如果在业务逻辑中定义了异步分发,Apache RocketMQ 将无法保证消息的顺序。::
适用场景
PushConsumer 将消息处理限制为同步处理,并限制每条消息的处理超时。PushConsumer 适用于以下场景:
可预测的消息处理持续时间:如果消息处理持续时间没有限制,对于需要长时间处理的消息,消息重试会持续触发以确保消息可靠性。这会导致大量重复消息。
无异步和无自定义流程:PushConsumer 将消费逻辑的线程模型限制为 Reactor 线程模型。客户端 SDK 根据最大吞吐量处理消息。该模型易于开发,但不允许异步或自定义流程。
SimpleConsumer
SimpleConsumer 是一种支持消息处理原子操作的消费者类型。此类消费者根据业务逻辑调用操作来获取消息、提交消费状态并执行消息重试。
使用方法
SimpleConsumer 涉及多个 API 操作。根据需要调用相应的操作以获取消息并分发给业务线程进行处理。然后,调用 commit 操作提交消息处理结果。示例代码
// Consumption example: When a SimpleConsumer consumer consumes normal messages, the consumer obtain messages and commit message consumption results.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Configure consumer group.
.setConsumerGroup("YourConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the max await time when receive messages from the server.
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// A SimpleConsumer consumer must obtain and process messages.
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
logger.error("Failed to receive message", e);
}
下表描述了为 SimpleConsumer 提供的 API 操作。
操作 | 描述 | 可修改参数 |
---|---|---|
ReceiveMessage | 消费者可以调用此操作从服务器获取消息。注意 由于服务器使用分布式存储,即使请求的消息实际存在于服务器上,服务器也可能返回空结果。您可以再次调用 ReceiveMessage 操作或增加 ReceiveMessage 操作中的并发值。 | 批量拉取大小:一次获取的消息数量。SimpleConsumer 消费者可以获取多条消息进行批量消费。 消息不可见持续时间:消息的最大处理持续时间。如果消费失败,此参数控制消息重试间隔。更多信息,请参见 SimpleConsumer 重试策略。调用 ReceiveMessage 操作时此参数是必需的。 |
AckMessage | 消费者消费消息后,调用此操作向服务器返回消费成功结果。 | 无 |
ChangeInvisibleDuration | 在消费重试场景中,消费者可以调用此操作更改消息处理持续时间以控制消息重试间隔。 | 消息不可见持续时间:消息的最大处理时间。您可以调用此操作更改 ReceiveMessage 操作中指定的消息不可见持续时间。在大多数情况下,此操作用于您希望增加消息处理持续时间的场景。 |
可靠性重试
当 SimpleConsumer 消费者消费消息时,客户端 SDK 与 Apache RocketMQ 服务器之间的通信通过 ReceiveMessage
和 AckMessage
操作实现。当客户端 SDK 成功处理消息时,调用 AckMessage
操作。当消息处理失败时,在指定的消息不可见持续时间到期后,不会返回 ack 消息以触发消息重试机制。更多信息,请参见Simple 消费者重试策略。
确保消息顺序
在 Apache RocketMQ 中,SimpleConsumer 消费者按照消息存储的顺序获取FIFO 消息。如果一组有序消息中的某条消息未完全处理,则无法获取该组有序消息中的下一条消息。
适用场景
SimpleConsumer 提供原子 API 操作来获取消息并提交消费结果。与 PushConsumer 相比,SimpleConsumer 提供了更好的灵活性。SimpleConsumer 适用于以下场景:
不可控的消息处理持续时间:如果消息处理持续时间无法估量,我们建议您使用 SimpleConsumer 以防止消息处理时间过长。您可以在消息消费期间指定估计的消息处理持续时间。如果现有处理持续时间不适合您的业务场景,您可以调用相应的 API 操作来更改消息处理持续时间。
异步处理和批量消费:SimpleConsumer 在 SDK 中不涉及复杂的线程封装。业务应用程序可以使用自定义设置。这样,SimpleConsumer 消费者可以实现异步分发、批量消费以及其他自定义场景。
自定义消息消费速率:使用 SimpleConsumer 时,业务应用程序调用 ReceiveMessage 操作来获取消息。您可以调整获取消息的频率以控制消息消费速率。
PullConsumer
待续。
使用注意事项
为 PushConsumer 指定合适的消费时长限制
建议您限制 PushConsumer 消费者的消息消费时长,以防止消息长时间处理。消息长时间处理可能导致消息处理超时而产生重复消息,并使下一条消息持续等待消费。如果消息频繁处理时间过长,我们建议您使用 SimpleConsumer 并根据业务需求指定合适的消息不可见持续时间。