订阅
本节介绍 Apache RocketMQ 中订阅的定义、模型关系、内部属性和使用注意事项。
定义
订阅是消费者在 Apache RocketMQ 中获取和处理消息的规则和状态设置。
订阅由消费者组动态注册到代理。然后,根据订阅中定义的过滤规则匹配和消费消息。
通过配置订阅,您可以控制以下消息传递行为
消息过滤规则:这些规则用于定义主题中哪些消息被消费者消费。通过配置消息过滤规则,消费者可以有效地获取他们想要的消息,并根据不同的业务场景指定消息接收范围。有关更多信息,请参阅 消息过滤。
消费状态:默认情况下,Apache RocketMQ 代理提供持久订阅。换句话说,消费者组订阅代理后,组中的消费者可以在重新连接后从他们上次离开的地方继续消费消息。
确定订阅的规则
Apache RocketMQ 的订阅是基于消费者组和主题设计的。因此,订阅是指特定消费者组对主题的订阅。以下是确定订阅的规则
一个主题到多个订阅者下图显示了两个消费者组(组 A 和组 B)订阅了主题 A。这两个订阅彼此独立,可以分别定义。
一个订阅者到多个主题下图显示了一个消费者组(组 A)订阅了两个主题:主题 A 和主题 B。组 A 中的消费者对主题 A 和主题 B 有两个独立的订阅。这两个订阅彼此独立,可以分别定义。
模型关系
下图显示了订阅在 Apache RocketMQ 领域模型中的位置。
消息由生产者初始化并发送到 Apache RocketMQ 服务器。
消息按到达 Apache RocketMQ 服务器的顺序存储在主题的指定队列中。
消费者根据指定的订阅关系从 Apache RocketMQ 服务器获取和消费消息。
内部属性
过滤器类型
定义:消息过滤规则的类型。为订阅设置消息过滤规则后,系统会根据过滤规则匹配主题中的消息。只有满足条件的消息才会被传递给消费者。此功能可帮助您根据您的需求对发送给消费者的消息进行分类。
值
标签过滤器:根据标签字符串过滤和匹配全文。
SQL92 过滤器:根据 SQL 语法过滤和匹配消息属性。
过滤器表达式
定义:自定义过滤规则的表达式。
值:有关更多信息,请参阅 过滤器表达式的语法。
行为约束
订阅一致性
Apache RocketMQ 基于消费者组管理订阅。因此,同一消费者组中的消费者必须保持相同的消费逻辑。否则,会发生消费冲突,进而导致一些消息被错误地消费。
正确示例
//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA,"TagA");
//Consumer c2
Consumer c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicA,"TagA");
错误示例
//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA,"TagA");
//Consumer c2Consumer
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicA,"TagB");
使用注意事项
不要频繁修改订阅。
在 Apache RocketMQ 中,订阅与元数据和配置相关联,例如过滤规则和消费进度。系统还必须确保消费者组中所有消费者的消费行为、消费逻辑和负载策略一致。这些因素导致需要管理的复杂关系网。因此,我们建议您不要定期修改订阅以更改生产环境中的业务逻辑。否则,客户端需要不断调整其负载分配,从而导致消息接收问题。