一致的订阅关系
介绍
订阅关系是 RocketMQ 领域模型中非常重要的组成部分,用于表达消费者消费消息的控制元数据。有关完整概念,请参阅 订阅关系模型.
当同一消费者组中的所有消费者实例对主题和标签具有完全相同的订阅时,订阅关系是一致的。如果订阅关系(消费者组名称-主题-标签)不一致,会导致消费消息混乱,甚至丢失消息。
1 正确订阅关系的示例
1.1 订阅的主题相同,过滤表达式一致
如下图所示,同一消费者组中的三个消费者实例 C1、C2 和 C3 都订阅了 TopicA,并且对 TopicA 的标签订阅都是 Tag1,符合订阅关系一致性原则。
正确示例代码 1
C1、C2 和 C3 的订阅关系一致,意味着 C1、C2 和 C3 订阅消息的代码必须完全相同,代码示例如下
PushConsumer consumer1 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer1.subscribe("TopicA", new FilterExpression("TagA", FilterExpressionType.TAG));
PushConsumer consumer2 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer2.subscribe("TopicA", new FilterExpression("TagA", FilterExpressionType.TAG));
PushConsumer consumer3 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer3.subscribe("TopicA", new FilterExpression("TagA", FilterExpressionType.TAG));
RocketMQ 强调订阅关系的一致性,这意味着同一消费者组中的每个消费者都应该保持一致,因为从服务器的角度来看,组中的所有消费者都应该是相同的逻辑副本。
强调订阅关系的一致性并不意味着消费者不能订阅多个主题,每个消费者仍然可以根据需要订阅多个主题,但前提是同一消费者组中的消费者必须保持一致。
2 诊断不一致的订阅关系
问题描述
使用 RocketMQ 版本的消息队列时,可能会出现不一致的订阅关系。具体问题如下
- RocketMQ 版本的消息队列控制台中显示订阅关系一致性为否。
- 消费者实例未收到订阅的消息。
请参考以下步骤进行检查
您可以在 Apache RocketMQ 控制台或 CLi 工具中检查指定组的订阅关系是否一致。如果查询结果不一致,请参考本文中的常见订阅关系不一致问题,对消费者实例的消费代码进行排查。
- 检查消费者实例中与订阅相关的配置代码,确保同一消费者组中的所有消费者实例都订阅了相同的主题和标签。
- 使用控制台或 Cli 命令 ConsumerConnection 检查有效的订阅关系是否一致。
- 测试并确认消息可以被预期的消费者实例消费。
3 常见的不一致订阅关系问题
3.1 同一消费者组中,消费者实例订阅的主题不同(适用于 3.x、4.x SDK)
在早期的 3.x/4.x 版本的 SDK 中,如下图所示,同一消费者组中的三个消费者实例 C1、C2 和 C3 分别订阅了 TopicA、TopicB 和 TopicC,它们的订阅主题不一致,不符合订阅关系一致性的原则。
5.x 版本的 SDK 现在支持同一消费者组中的消费者实例订阅不同的主题。
3.2 同一消费者组中的消费者实例订阅了相同的主题,但订阅的标签不同。
如下图所示,同一消费者组中的消费者实例 C1、C2 和 C3 都订阅了 TopicA,但 C1 订阅了 TopicA 的 Tag1,而 C2 和 C3 订阅了 TopicA 的 Tag2。同一主题的订阅标签不一致,不符合订阅关系一致性的原则。
错误示例代码 2
消费者示例 2-1:
PushConsumer consumer1 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer1.subscribe("TopicA", new FilterExpression("Tag1", FilterExpressionType.TAG));
消费者示例 2-2:
PushConsumer consumer2 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer2.subscribe("TopicA", new FilterExpression("Tag2", FilterExpressionType.TAG));消费者示例 2-3:
PushConsumer consumer3 = provider.newPushConsumerBuilder().setConsumerGroup("GroupA").build();
consumer3.subscribe("TopicA", new FilterExpression("Tag2", FilterExpressionType.TAG));