消息过滤
在消费者订阅主题后,Apache RocketMQ 会将主题中的所有消息传递给消费者。但是,如果您希望消费者只接收与您的业务相关的消息,可以在 Apache RocketMQ 代理上设置过滤器。本主题介绍消息过滤功能及其工作原理。本主题还介绍了消息的分类,并提供了有关如何使用不同过滤方法的示例。
场景
Apache RocketMQ 遵循发布-订阅模式。Apache RocketMQ 是一种消息中间件,广泛用于促进分布式上游和下游应用程序之间的通信。在实际场景中,应用程序可能使用不同的方法来消费消息。这些应用程序都可以订阅同一个 Apache RocketMQ 主题,并且可以设置过滤器以允许这些应用程序仅接收与它们相关的消息。
通过使用 Apache RocketMQ 的消息过滤功能,您可以有效地管理发送到不同消费者的消息。这可以防止您的系统因大量非关键任务消息而过载。
Apache RocketMQ 的消息过滤功能在主题级别生效,允许您管理分布在多个服务中的一个业务的消息。如果您想管理不同业务的消息,可以订阅不同的主题。
功能概述
定义
消息过滤功能根据消费者配置的条件过滤消息,并将满足条件的消息发送给消费者。
首先,在 Apache RocketMQ 生产者和消费者上定义消息属性和标签。然后在消费者上设置过滤条件,Apache RocketMQ 代理根据条件过滤消息,并将过滤后的消息发送给消费者。
工作机制
消息过滤涉及以下步骤
生产者:生产者在初始化消息之前将属性和标签附加到消息。这些属性和标签用于匹配消费者设置的过滤条件。
消费者:消费者在初始化和消费消息时,调用订阅注册操作,通知代理已订阅的主题和消息,或过滤条件。
代理:Apache RocketMQ 代理在收到消费者的消息请求后,根据消费者提交的过滤条件表达式动态过滤消息,并将匹配过滤条件的消息发送给消费者。
分类
Apache RocketMQ 支持基于标签的过滤和基于属性的 SQL 过滤。下表比较了这两种方法。
项目 | 基于标签的过滤 | 基于属性的 SQL 过滤 |
---|---|---|
过滤目标 | 消息标签。 | 消息属性,包括自定义属性和系统属性。消息标签是系统属性 (TAGS)。 |
过滤能力 | 精确匹配。 | 基于 SQL 语法的匹配。 |
场景 | 基于标签的简单过滤。 | 涉及标签和属性之间关系的复杂过滤。 |
有关如何使用过滤方法的更多信息,请参阅 基于标签的过滤 和 基于属性的 SQL 过滤。
订阅一致性
过滤表达式是订阅的一部分。根据 Apache RocketMQ 的发布-订阅模式,一个消费者组内的消费者订阅必须一致,包括它们的过滤表达式,以避免某些消息无法被消费的情况。有关更多信息,请参阅 订阅。
基于标签的过滤
基于标签的过滤是 Apache RocketMQ 提供的基本消息过滤功能。此功能根据生产者设置的标签过滤消息。消费者使用标签来指定要消费的消息。
场景
下图显示了电子商务交易场景中的一个示例。在这个过程中,从下订单到收到产品,会生成一系列消息,例如
订单消息
支付消息
物流消息
这些消息被发送到名为 Trade_Topic 的主题,该主题有多个系统作为其订阅者,包括
支付系统:仅订阅支付消息。
物流系统:仅订阅物流消息。
交易成功率分析系统:订阅订单和支付消息。
实时计算系统:订阅所有消息。
标签设置
生产者在发送消息之前只将一个标签附加到每个消息。
标签是字符串。建议字符串的最大长度为 128 个字符。
过滤规则
基于标签的过滤基于字符串实现精确过滤。您可以设置以下过滤规则
单标签匹配:您可以将过滤表达式设置为单个标签,以仅接收带有该标签的消息。
多标签匹配:您可以在过滤表达式中设置多个标签,以接收带有任何一个标签的消息。用两个竖线 (||) 分隔标签。例如,Tag1||Tag2||Tag3 表示带有 Tag1、Tag2 或 Tag3 的消息都会发送给消费者。
全部匹配:您可以使用星号 (*) 来匹配所有标签,这意味着主题中的所有消息都将发送给消费者。
示例
设置标签并发送消息
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
// This example indicates that the tag of the message is set to "TagA".
.setTag("TagA")
// Message body.
.setBody("messageBody".getBytes())
.build();
指定标签并订阅消息
String topic = "Your Topic";
// Subscribe to messages that carry tag "TagA".
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
指定多个标签并订阅消息
String topic = "Your Topic";
// Subscribe to messages that carry tag TagA, TagB, or TagC.
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
订阅主题中的所有消息
String topic = "Your Topic";
// Subscribe to all messages.
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe(topic, filterExpression);
基于属性的 SQL 过滤
基于属性的 SQL 过滤是 Apache RocketMQ 提供的一种高级消息过滤方法。它根据生产者为消息配置的属性和属性值(也称为键和值)过滤消息。生产者可以为一条消息设置多个属性。然后,消费者可以在 SQL 表达式中指定属性以接收特定消息。
由于标签是系统属性,因此基于标签的过滤是基于属性的 SQL 过滤的一种类型。在 SQL 语法中,标签属性用 TAGS 表示。
场景
下图显示了电子商务交易场景中的一个示例。在这个过程中,从下订单到收到产品,会生成一系列消息。这些消息被分类为订单消息和物流消息。物流消息配置了一个区域属性,区域属性的值为杭州和上海。
订单消息
物流消息
区域属性值为杭州的物流消息
区域属性值为上海的物流消息
这些消息被发送到名为 Trade_Topic 的主题,该主题有以下系统作为其订阅者
物流系统 1:仅订阅区域属性值为杭州的物流消息。
物流系统 2:订阅所有物流消息。
订单跟踪系统:仅订阅订单消息。
实时计算系统:订阅所有消息。
消息属性设置
生产者可以在发送消息之前为消息设置自定义属性。每个属性都是一个自定义的键值对。
可以为一条消息设置多个属性。
过滤规则
编写过滤表达式时,必须遵循 SQL92 语法。具体来说
语法 | 描述 | 示例 |
---|---|---|
IS NULL | 指定属性不存在。 | a IS NULL :属性 a 不存在。 |
IS NOT NULL | 指定属性存在。 | a IS NOT NULL :属性 a 存在。 |
> >= < <= | 比较数值。该语法不能用于比较字符串。如果用于比较字符串,则在启动消费者时会报告错误。注意 可以转换为数值的字符串也被视为数值。 | a IS NOT NULL AND a > 100 :属性 a 存在,并且属性 a 的值大于 100。 a IS NOT NULL AND a > 'abc' :错误示例。abc 是一个字符串。因此,您不能将 a 与 abc 进行比较。 |
BETWEEN xxx AND xxx | 比较数值。该语法不能用于比较字符串。如果用于比较字符串,则在启动消费者时会报告错误。该语法等效于>= xxx 并且 \<= xxx。这意味着属性的值介于两个数值之间或等于这两个数值中的任何一个。 | a IS NOT NULL AND (a BETWEEN 10 AND 100) :属性 a 存在,并且属性 a 的值大于或等于 10 且小于或等于 100。 |
NOT BETWEEN xxx AND xxx | 比较数值。该语法不能用于比较字符串。如果用于比较字符串,则在启动消费者时会报告错误。该语法等效于 \< xxx 或>xxx。这意味着属性的值小于左侧数值或大于右侧数值。 | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) :属性 a 存在,并且属性 a 的值小于 10 或大于 100。 |
IN (xxx, xxx) | 表示属性的值包含在一个集合中。集合中的元素只能是字符串。 | a IS NOT NULL AND (a IN ('abc', 'def')) :属性 a 存在,并且属性 a 的值是 abc 或 def。 |
= <> | 等于运算符和不等于运算符。它们可以用于比较数值和字符串。 | a IS NOT NULL AND (a = 'abc' OR a<>'def') :属性 a 存在,并且属性 a 的值是 abc 或属性 a 的值不是 def。 |
AND OR | 逻辑 AND 运算符和逻辑 OR 运算符。它们可以用于组合简单的逻辑函数,每个逻辑函数必须放在括号中。 | a IS NOT NULL AND (a > 100) OR (b IS NULL) :属性 a 存在,并且属性 a 的值大于 100 或属性 b 不存在。 |
SQL 属性过滤是通过配置自定义消息属性并定义 SQL 过滤表达式来实现的。过滤表达式可能无法生成有效的结果。Apache RocketMQ 代理根据以下逻辑处理消息
异常处理:如果在评估过滤表达式时报告异常,代理默认会过滤掉接收到的消息,不会将消息传递给消费者。例如,当比较数值和非数值时会发生异常。
空值处理:如果过滤表达式的计算结果为 NULL 或值不是布尔值,代理默认会过滤掉接收到的消息,不会将消息传递给消费者。布尔值表示真值,可以是真或假。假设您没有为生产者发送的消息配置自定义属性,但此自定义属性在 SQL 表达式中用作过滤条件。在这种情况下,过滤表达式的评估结果为 NULL。
不一致数值处理:如果自定义消息属性的值是浮点数,但过滤表达式中使用的属性值是整数,代理默认会过滤掉接收到的消息,不会将消息传递给消费者。
示例
为消息设置标签和属性,并发送消息
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
// This example indicates that the message tag is set to "messageTag".
.setTag("messageTag")
// You can also set custom attributes for the messages, such as environment, region, and logical branch.
// In this example, the custom attribute is region and the attribute value is Hangzhou.
.addProperty("Region", "Hangzhou")
// Message body.
.setBody("messageBody".getBytes())
.build();
根据自定义属性订阅和过滤消息
String topic = "topic";
// Subscribe only to messages whose value of the region attribute is Hangzhou.
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
根据多个自定义属性订阅和过滤消息
String topic = "topic";
// Subscribe to messages whose value of the region attribute is Hangzhou and value of the price attribute is greater than 30.
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
订阅主题中的所有消息
String topic = "topic";
// Subscribe to all the messages.
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);
使用说明
正确设置消息的主题和标签。
您可以使用主题、标签和属性来拆分消息。在拆分消息时,请注意以下事项
消息类型:不同类型的消息,例如有序消息和普通消息,必须使用不同的主题进行拆分。不要使用标签来拆分消息类型。
业务领域:不同的业务领域和部门必须使用不同的主题。例如,物流消息和支付消息的主题必须不同。物流消息可以通过使用标签进一步划分为普通消息和紧急消息。
消息数量和重要性:数量或链接重要性不同的消息必须拆分为不同的主题。