消费者负载均衡
当消费者组中的消费者从 Apache RocketMQ 主题中拉取消息时,会使用负载均衡策略来确定如何将消息分配给消费者。负载均衡策略可以提高服务并发性和应用程序可扩展性。本主题介绍 Apache RocketMQ 为消费者提供的负载均衡策略。
背景信息
熟悉 Apache RocketMQ 提供的负载均衡策略可以帮助您在遇到以下情况时确定采取的适当措施
灾难恢复:您可以确定在本地节点发生故障时如何重试和切换消息。
消息排序:您可以更好地了解 Apache RocketMQ 如何确保严格的先进先出消息排序。
水平分区:您可以根据消息的分配方式来规划流量迁移和水平扩展操作。
广播消费和集群消费
Apache RocketMQ 允许多个消费者组订阅相同的消息,并允许每个消费者组初始化多个消费者。消费者组和消费者可以在以下情况下配置为消费消息:
跨消费者组的广播消费:此场景在前面图的左侧进行了说明。每个消费者组初始化自己的消费者,该消费者消费所有消息。消息以一对多的关系从主题传递到多个订阅者。
此模式通常用于诸如网关推送和配置推送之类的场景。
消费者组内的集群消费:此场景在前面图的右侧进行了说明。每个消费者组初始化多个消费者,消息被发送到组中的所有消费者。当您想要在组内实现水平流量分区和负载均衡时,这很有用。
此模式适用于微服务解耦。
消费者负载均衡策略简介
在使用广播消费的场景中,不需要负载均衡,因为每个消费者组只包含一个消费者。
但是,在使用集群消费的场景中,每个消费者组包含多个消费者。负载均衡策略可以帮助确定如何分配消息。
根据消费者类型,负载均衡策略可以分为以下两种类型
基于消息的负载均衡
使用范围
基于消息的负载均衡是推送消费者和简单消费者的唯一且默认策略。
工作机制
基于消息的负载均衡将主题中的消息均匀分配给消费者组中的多个消费者。
如上图所示,消费者组 A 包含三个消费者:A1、A2 和 A3。这三个消费者消费主题中 Queue1 的消息。
基于消息的负载均衡确保队列中的消息可以被多个消费者并发处理。但是,消息是随机发送给消费者的,这意味着您无法指定如何将消息分配给消费者。
基于消息的负载均衡基于主题中单个消息的确认语义。消费者收到消息后,代理会锁定消息,以确保在消息被消费或超时之前,其他消费者无法看到该消息。这可以防止同一队列的消息被不同的消费者多次消费。
顺序消息的负载策略
在顺序消息中,消息的顺序是指消息组中多个消息的顺序。这些消息必须按照它们在代理上存储的顺序进行处理。因此,基于消息的负载均衡需要确保消息组中的消息按照它们在服务器上存储的顺序进行消费。当不同的消费者处理同一组中的消息时,系统会严格按照消息顺序锁定消息,以确保消息按顺序消费。
在上图中,Queue1 的消息组 G1 中有四个顺序消息。它们的保存顺序由 M1 到 M4 表示。在消费过程中,当消费者 A1 处理消息 M1 和 M2 时,如果 M1 和 M2 的消费状态没有提交,消费者 A2 无法并行消费消息 M3 和 M4。消费者只有在前面消息的消费状态提交后才能消费消息。
特点
与基于队列的负载均衡相比,基于消息的负载均衡具有以下特点
- 更均衡的消费分配。在传统的基于队列的负载均衡中,队列数量和消费者数量可能无法得到很好的平衡。这会导致系统中一些消费者处于空闲状态,而另一些消费者则不堪重负。相比之下,基于消息的负载均衡确保消费者之间负载均衡,而无需您管理队列数量和消费者数量。
- 对网络容量差异的容忍度更高。在在线生产环境中,由于实际网络状况或网络硬件规格不一致,消费者的处理能力可能会有所不同。如果消息是根据队列分配的,则可能出现一些消费者累积了消息,而另一些消费者处于空闲状态的情况。相反,基于消息的负载均衡按需分配消息,以实现消费者之间更均衡的负载分配。
- 更易于进行队列分配的 O&M。在使用传统的基于队列的负载均衡的场景中,您必须确保队列数量大于或等于消费者数量,以确保没有消费者处于空闲状态。基于消息的负载均衡不存在这个问题。
场景
由于队列中的消息是离散地分配给消费者的,因此基于消息的负载均衡适用于大多数在线事件处理场景。在这些场景中,消费者只需要基本的处理能力,而不是消息的批量聚合。对于流处理和聚合计算等需要聚合和批量处理消息的场景,基于队列的负载均衡是更好的选择。示例
消费者不需要为基于消息的负载均衡进行额外的配置。默认情况下,此策略对推送消费者和简单消费者启用。
SimpleConsumer simpleConsumer = null;
// Consumption example 1: When push consumers consume normal messages, they need only to process messages on a message listener and do not need to consider load balancing.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: When simple consumers consume normal messages, they obtain and submit messages. The consumers obtain messages based on the subscribed topic and do not need to consider load balancing.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, consumers must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, consumers must re-initiate the request to obtain the message.
e.printStackTrace();
}
基于队列的负载均衡
使用范围
对于代理版本 4.x 和 3.x 的消费者,包括 PullConsumer、DefaultPushConsumer、DefaultPullConsumer 和 DefaultLitePullConsumer,只能使用基于队列的负载均衡。
工作机制
在基于队列的负载均衡策略中,同一消费者组中的消费者消费分配给它们的队列中的消息。每个队列由一个消费者消费。
如上图所示,主题中的三个队列(Queue1、Queue2 和 Queue3)被分配给消费者组中的两个消费者。由于每个队列只能分配给一个消费者,因此消费者 A2 被分配了两个队列。如果队列数量少于消费者数量,则一些消费者将没有分配给它们的队列。
基于队列的负载均衡根据操作数据(如队列数量和消费者数量)分配消息。每个队列绑定到一个特定的消费者。然后,每个消费者根据获取消息的消费语义处理消息>提交偏移量>持久化偏移量。消费者获取消息时,不会将消费状态返回到队列。因此,为了避免多个消费者重复消费消息,每个队列只能由一个消费者消费。
基于队列的负载均衡保证一个队列只由一个消费者处理。但是,此策略的实现依赖于消费者和代理之间的信息协商机制。
Apache RocketMQ 不保证队列中的消息只由一个消费者处理。因此,当消费者数量和队列数量发生变化时,可能会出现队列分配的临时不一致,导致少量消息被多次处理。
特点
与基于消息的负载均衡相比,基于队列的负载均衡粒度更大,灵活性更低。但是,基于队列的负载均衡非常适合流处理场景。它确保队列中的消息由一个消费者处理。因此,基于队列的负载均衡更适合于需要处理聚合消息或批量消息的场景。
场景
基于队列的负载均衡适用于需要处理聚合消息或批量消息的场景。这些是流计算和数据聚合应用程序中的常见场景。
示例
消费者不需要为基于队列的负载均衡执行额外的配置。默认情况下,此策略已为 4.x 和 3.x 版本的 Broker 的拉取消费者启用。
有关示例代码的更多信息,请访问 Apache RocketMQ 代码库。
版本兼容性
基于消息的负载均衡策略从 Apache RocketMQ 的 5.0 版本 Broker 开始可用。对于 4.x 和 3.x 版本的 Broker,仅提供基于队列的负载均衡策略。
Apache RocketMQ 5.x 版本的 Broker 支持基于消息和基于队列的两种负载均衡策略。哪种策略生效取决于客户端版本和消费者类型。
使用注意事项
为消费逻辑实现消息幂等性。
基于消息和基于队列的负载均衡策略都会在添加消费者、删除消费者和 Broker 扩展等场景中触发临时重新平衡。这可能会导致暂时的负载不一致,并导致少量消息被消费多次。因此,需要执行去重以确保消息消费的幂等性。