顺序消息
顺序消息是 Apache RocketMQ 中一种高级消息类型。本主题描述了顺序消息的场景、工作机制、限制、使用示例和使用注意事项。
场景
异构系统在有序事件处理、交易撮合、实时增量数据同步等场景下使用状态同步来保持强一致性。上述场景要求当事件发生变化时,消息必须按序从上游应用传递到下游应用。Apache RocketMQ 提供了顺序消息来帮助您实现有序消息传输。
场景1:交易撮合
例如,在证券和股票交易场景中,如果多个竞价者对同一笔订单报出相同的价格,则以最先报出价格的竞价者获胜。因此,下游订单处理系统必须设计为按照报价顺序处理订单。
场景2:实时增量数据同步
普通消息 顺序消息
例如,您想要对数据库修改相关数据进行增量同步。您可以使用 Apache RocketMQ 提供的顺序消息将消息从上游源数据库传输到下游查询系统。消息可以是添加、删除和修改操作的二进制日志。下游系统按照消息发送的顺序检索消息,以使数据库状态以相同的顺序更新。顺序消息可帮助您确保上游系统中的操作与下游系统中的状态数据之间的一致性。如果在此场景中使用普通消息,可能会导致状态不一致。
工作机制
顺序消息的定义
顺序消息是 Apache RocketMQ 中的一种高级消息类型。顺序消息按照消息的发送顺序传递给消费者。此消息类型允许您在业务场景中实现有序处理。
顺序消息的定义特征是消息发送、存储和传递的顺序。
Apache RocketMQ 使用消息组来确定顺序消息的顺序。您必须为顺序消息配置消息组。消息组中的消息按照先进先出 (FIFO) 的顺序处理。消息排序不适用于不同的消息组或不在消息组中的消息。
基于消息组的消息排序允许您根据业务逻辑指定细粒度的消息排序。这有助于您在业务系统中实现部分消息排序,并提高业务系统的并发度和吞吐量。
消息顺序
Apache RocketMQ 中有两种消息顺序类型:生产顺序和消费顺序。
生产顺序:Apache RocketMQ 使用生产者与服务器之间建立的协议,确保消息从生产者串行发送到服务器,并按照消息的发送顺序存储和持久化。
要确保消息的生产顺序,请确保满足以下条件:
单个生产者:消息的生产顺序适用于单个生产者。Apache RocketMQ 无法确定来自不同系统中不同生产者的消息的顺序,即使您为这些消息配置了相同的消息组。
串行传输:Apache RocketMQ 中的生产者支持使用多线程进行安全访问。如果生产者使用多线程并发发送消息,Apache RocketMQ 无法确定来自不同线程的消息的顺序。
如果满足上述条件的生产者向 Apache RocketMQ 发送消息,则属于同一消息组的消息将按照发送顺序存储在同一队列中。下图描述了 Apache RocketMQ 的顺序存储逻辑。
在上图中,来自消息组 1 和消息组 4 的消息存储在同一队列(消息队列 1)中。Apache RocketMQ 确保来自消息组 1 的消息 G1-M1、G1-M2 和 G1-M3 按照发送顺序存储在队列中。来自消息组 4 的消息 G4-M1 和 G4-M2 也按照发送顺序存储。但是,来自消息组 1 和消息组 4 的消息没有特定的存储顺序。
消费顺序:
Apache RocketMQ 使用消费者与服务器之间建立的协议,确保消息按照存储顺序消费。
要确保消息的消费顺序,请确保满足以下条件:
投递顺序:Apache RocketMQ 通过客户端 SDK 和服务器端通信协议,确保消息按照服务器上的消息存储顺序投递。当消费者应用消费消息时,应用必须遵循接收-处理-回复路径,以防止异步处理导致消息乱序。
注意- 当 PushConsumer 消费者消费消息时,Apache RocketMQ 确保消息按照存储顺序逐一投递给消费者。
- 当 SimpleConsumer 消费者消费消息时,消费者可能会一次拉取多条消息,业务应用必须有解决方案来实现消息消费顺序。有关消费者类型的更多信息,请参阅消费者类型。
有限重试:Apache RocketMQ 限制顺序消息的投递重试次数。如果消息达到最大投递重试次数,Apache RocketMQ 将停止重试该消息的投递,以防止队列中其他消息持续等待投递。
在消费顺序至关重要的场景中,我们建议您指定适当的重试次数,以防止消息乱序处理。
生产顺序与消费顺序的组合
如果您希望消息按照 FIFO 原则进行处理,则需要生产顺序和消费顺序。在大多数业务场景中,一个生产者可能对应多个消费者,并且并非所有消费者都需要有序消费消息。您可以结合生产顺序和消费顺序的设置,以满足不同业务场景的需求。例如,您可以发送顺序消息并使用非顺序并发消费来提高吞吐量。下表描述了生产顺序和消费顺序设置的不同组合。
生产顺序 | 消费顺序 | 效果 |
---|---|---|
配置消息组以实现消息的有序投递。 | 消息的有序消费 | 消息的顺序在消息组级别得到保证。同一消息组中的消息以相同的顺序发送和消费。 |
配置消息组以实现消息的有序投递。 | 并发消费 | 消息并发按时间顺序消费。 |
不配置消息组以实现消息的无序投递。 | 消息的有序消费 | 消息的顺序在队列级别得到保证。消息消费基于队列的属性。Apache RocketMQ 确保消费顺序与队列中的存储顺序相同,但不一定与消息发送顺序相同。 |
不配置消息组以实现消息的无序投递。 | 并发消费 | 消息并发按时间顺序消费。 |
顺序消息的生命周期
已初始化:消息由生产者构建和初始化,并准备发送到 Broker。
就绪:消息已发送到 Broker,对消费者可见并可供消费。
处理中:消息由消费者获取,并根据消费者的本地业务逻辑进行处理。
在此过程中,Broker 等待消费者完成消费并提交消费结果。如果在一定时间内未收到消费者的响应,Apache RocketMQ 将重试消息。更多信息,请参阅消费重试。
已确认:消费者完成消费并向 Broker 提交消费结果。Broker 标记当前消息是否成功消费。
默认情况下,Apache RocketMQ 保留所有消息。当提交消费结果时,消息数据被逻辑标记为已消费,而不是立即删除。因此,消费者可以在消息因保留期过期或存储空间不足而被删除之前回溯消息进行重新消费。
已删除:当消息的保留期过期或存储空间不足时,Apache RocketMQ 以滚动方式从物理文件中删除最早保存的消息。更多信息,请参阅消息存储与清理。
消息消费失败或超时会触发服务器的重试逻辑。如果消息触发了消费重试,则该消息的生命周期结束。原始消息被视为一条具有新消息 ID 的新消息。
如果顺序消息触发了消费重试,则只有在该顺序消息被消费后,其后面的消息才能被处理。
使用限制
顺序消息仅支持 MessageType 为 FIFO 的主题。
示例
创建主题
在 Apache RocketMQ 5.0 中创建主题,建议使用 mqadmin 工具。但值得注意的是,消息类型需要作为属性参数添加。以下是一个例子:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO
创建订阅组
在 Apache RocketMQ 5.0 中创建订阅组,建议使用 mqadmin 工具。但值得注意的是,-o
选项需要设置为 true。以下是一个例子:
sh mqadmin updateSubGroup -c <cluster_name> -g <consumer_group_name> -n <nameserver_address> -o true
发送消息
与普通消息相比,顺序消息必须为其配置消息组。我们建议您根据业务需求细粒度地配置消息组,以实现工作负载解耦和并发扩展。
创建 FIFO 主题
./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
- -c 集群名称
- -t 主题名称
- -n NameServer 地址
- -o 创建有序主题的标志
创建 FIFO 订阅组
./bin/mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -n 127.0.0.1:9876 -o true
- -c 集群名称
- -g 订阅组名称
- -n NameServer 地址
- -o 创建有序订阅组的标志
以下示例代码提供了如何在 Java 中发送和接收顺序消息的示例:
// Send ordered messages.
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// Configure a message group for the ordered messages. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Make sure that ordered delivery is applied to the consumer group. Otherwise, the messages are delivered concurrently and in no particular order.
// Consumption example 1: If the consumer type is PushConsumer, the consumer needs to only process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If the consumer type is SimpleConsumer, the consumer must actively obtain the message for consumption and submit the consumption result.
// If the consumption of a message in the message group has not finished, the next message in the message group cannot be retrieved if you call the Receive function.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用注意事项
使用串行消费以防止消息乱序处理。
我们建议您使用串行消息消费而不是批量消费。同时消费多条消息可能会导致消息乱序处理。
例如,消息 1、2、3、4 按 1-2-3-4 的顺序发送,批量消费的顺序是 1-[2, 3](批量处理但失败)-[2, 3](重试)-4。如果消息 3 处理失败,系统可能会重复处理消息 2。结果导致消息乱序消费。
避免在消息组中包含大量消息。
Apache RocketMQ 确保同一消息组中的消息存储在同一队列中。包含大量消息的消息组会导致相应队列过载。这会影响消息传递性能并阻碍可伸缩性。配置消息组时,可以使用订单 ID 和用户 ID 作为消息排序条件。这可确保同一用户的消息顺序。
我们建议您在业务应用中按消息组拆分消息。例如,您可以使用订单 ID 和用户 ID 作为消息组关键字来实现同一用户消息的有序处理。您无需确保不同用户消息的顺序。