订阅
本节介绍 Apache RocketMQ 中订阅的定义、模型关系、内部属性和使用注意事项。
定义
订阅是消费者在 Apache RocketMQ 中获取和处理消息的规则和状态设置。
订阅由消费组动态注册到 Broker。消息随后会根据订阅定义的过滤规则进行匹配和消费。
通过配置订阅,您可以控制以下消息行为:
消息过滤规则:这些规则用于定义消费者消费主题中的哪些消息。通过配置消息过滤规则,消费者可以有效地获取他们想要的消息,并根据不同的业务场景指定消息接收范围。更多信息,请参见 消息过滤。
消费状态:默认情况下,Apache RocketMQ Broker 提供持久化订阅。换句话说,消费组订阅 Broker 后,组内消费者在重新连接后可以从上次中断的地方继续消费消息。
订阅确定规则
Apache RocketMQ 的订阅是基于消费组和主题设计的。因此,订阅指的是指定消费组对某个主题的订阅。以下描述了订阅的确定规则:
一主题多订阅:下图展示了两个消费组(消费组 A 和消费组 B)订阅了主题 A。这两个订阅相互独立,可以单独定义。
一订阅者多主题:下图展示了一个消费组(消费组 A)订阅了两个主题:主题 A 和主题 B。消费组 A 中的消费者对主题 A 和主题 B 有两个独立的订阅。这两个订阅相互独立,可以单独定义。
模型关系
下图展示了订阅在 Apache RocketMQ 领域模型中的位置。
消息由生产者初始化并发送到 Apache RocketMQ 服务器。
消息按照到达 Apache RocketMQ 服务器的顺序存储在主题的指定队列中。
消费者根据指定的订阅关系从 Apache RocketMQ 服务器获取并消费消息。
内部属性
过滤类型
定义:消息过滤规则的类型。为订阅设置消息过滤规则后,系统会根据过滤规则匹配主题中的消息。只有符合条件的消息才会传递给消费者。此功能有助于您根据要求对发送给消费者的消息进行分类。
取值
标签过滤:根据标签字符串过滤和匹配全文。
SQL92 过滤:根据 SQL 语法过滤和匹配消息属性。
过滤表达式
定义:自定义过滤规则的表达式。
取值:更多信息,请参见 过滤表达式语法。
行为约束
订阅一致性
Apache RocketMQ 基于消费组管理订阅。因此,同一消费组中的消费者必须保持相同的消费逻辑。否则,会发生消费冲突,进而导致某些消息被错误消费。
正确示例
//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA,"TagA");
//Consumer c2
Consumer c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicA,"TagA");
错误示例
//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA,"TagA");
//Consumer c2Consumer
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicA,"TagB");
使用注意事项
请勿频繁修改订阅。
在 Apache RocketMQ 中,订阅与元数据和配置相关联,例如过滤规则和消费进度。系统还必须确保消费组中所有消费者的消费行为、消费逻辑和负载策略一致。这些因素导致需要管理复杂的关系网。因此,我们建议您不要定期修改订阅以改变生产环境中的业务逻辑。否则,客户端需要不断调整其负载分配,从而导致消息接收问题。