顺序消息
顺序消息是 Apache RocketMQ 中具有高级功能的消息类型。本主题介绍顺序消息的场景、工作机制、限制、使用示例和使用注意事项。
场景
异构系统在诸如有序事件处理、交易撮合和实时增量数据同步等场景中使用状态同步来维护强一致性。在事件发生变化时,上述场景需要将消息从上游应用程序按顺序传递到下游应用程序。Apache RocketMQ 提供顺序消息来帮助您实现有序消息传输。
场景 1:交易撮合
例如,在证券和股票交易场景中,如果多个竞价者对一个竞价单出价相同的价格,则最先出价的竞价者胜出。因此,下游订单处理系统必须设计为按出价价格的顺序处理订单。
场景 2:实时增量数据同步
普通消息 FIFO 消息
例如,您希望对与数据库修改相关的數據进行增量同步。您可以使用 Apache RocketMQ 提供的顺序消息将消息从上游源数据库传输到下游查询系统。消息可以是添加、删除和修改操作的二进制日志。下游系统按消息发送的顺序检索消息,以使数据库状态按相同顺序更新。顺序消息可以帮助您确保上游系统中的操作与下游系统中的状态数据之间的一致性。如果您在此场景中使用普通消息,则可能会发生状态不一致。
工作机制
顺序消息的定义
顺序消息是 Apache RocketMQ 中的一种高级消息类型。顺序消息按消息发送的顺序传递给消费者。这种消息类型允许您在业务场景中实现有序处理。
顺序消息的定义特征是消息发送、存储和传递的顺序。
Apache RocketMQ 使用消息组来确定顺序消息的顺序。您必须为顺序消息配置消息组。消息组中的消息按先进先出 (FIFO) 顺序处理。消息排序不适用于不同的消息组或不在消息组中的消息。
基于消息组的消息排序允许您根据您的业务逻辑指定细粒度的消息排序。这有助于您在业务系统中实现部分消息排序,并提高业务系统的并发度和吞吐量。
消息排序
Apache RocketMQ 中应用两种类型的消息顺序:生产顺序和消费顺序。
生产顺序 : Apache RocketMQ 使用生产者和服务器之间建立的协议来确保消息从生产者到服务器的串行发送,以及消息按发送顺序存储和持久化。
要确保消息的生产顺序,请确保满足以下条件
单个生产者:消息的生产顺序适用于单个生产者。即使您为消息配置了相同的消息组,Apache RocketMQ 也无法确定来自不同生产者的消息的顺序。
串行传输:Apache RocketMQ 中的生产者支持使用多个线程进行安全访问。如果生产者使用多个线程并发发送消息,Apache RocketMQ 无法确定来自不同线程的消息的顺序。
如果满足上述条件的生产者向 Apache RocketMQ 发送消息,则属于同一消息组的消息将按发送顺序存储在同一队列中。下图描述了 Apache RocketMQ 的顺序存储逻辑。
在上图中,来自 MessageGroup 1 和 MessageGroup 4 的消息存储在同一个队列 (MessageQueue 1) 中。Apache RocketMQ 确保来自 MessageGroup 1 的消息 G1-M1、G1-M2 和 G1-M3 按发送顺序存储在队列中。来自 MessageGroup 4 的消息 G4-M1 和 G4-M2 也按发送顺序存储。但是,来自 MessageGroup 1 和 MessageGroup 4 的消息按无特定顺序存储。
消费顺序 :
Apache RocketMQ 使用消费者和服务器之间建立的协议来确保消息按存储顺序消费。
要确保消息的消费顺序,请确保满足以下条件
传递顺序:Apache RocketMQ 使用客户端 SDK 和服务器端的通信协议来确保消息按服务器上的消息存储顺序传递。当消费者应用程序消费消息时,应用程序必须遵循接收-处理-回复路径,以防止由异步处理导致的乱序消息。
注意- 当 PushConsumer 消费者消费消息时,Apache RocketMQ 确保消息按存储顺序逐个传递给消费者。
- 当 SimpleConsumer 消费者消费消息时,消费者可能会一次拉取多条消息,业务应用程序必须有解决方案来实现消息消费顺序。有关消费者类型的更多信息,请参阅 消费者类型。
有限重试:Apache RocketMQ 限制顺序消息的传递重试次数。如果消息达到最大传递重试次数,Apache RocketMQ 将停止重试消息的传递以供消费。这可以防止队列中的其他消息不断等待传递。
在消费顺序至关重要的场景中,我们建议您指定适当的重试次数,以防止乱序消息处理。
生产顺序和消费顺序的组合
如果您希望消息按 FIFO 原则进行处理,则需要生产顺序和消费顺序。在大多数业务场景中,生产者可能会映射到多个消费者,并非所有消费者都需要按顺序消费消息。您可以将生产顺序和消费顺序的设置组合起来,以满足不同业务场景中的需求。例如,您可以发送顺序消息,并使用非顺序并发消费来提高吞吐量。下表描述了生产顺序和消费顺序设置的不同组合。
生产顺序 | 消费顺序 | 效果 |
---|---|---|
配置消息组以实现消息的有序传递。 | 按顺序消费消息 | 在消息组级别确保消息顺序。同一消息组中的消息按相同顺序发送和消费。 |
配置消息组以实现消息的有序传递。 | 并发消费 | 消息按时间顺序并发消费。 |
配置无消息组以实现消息的无序传递。 | 按顺序消费消息 | 在队列级别确保消息顺序。消息消费基于队列的属性。Apache RocketMQ 确保消费顺序与队列中的存储顺序相同,但不一定与消息发送顺序相同。 |
配置无消息组以实现消息的无序传递。 | 并发消费 | 消息按时间顺序并发消费。 |
顺序消息的生命周期
初始化:消息由生产者构建和初始化,并准备发送到代理。
就绪:消息发送到代理,对消费者可见,可供消费。
在传输中:消息由消费者获取,并根据消费者的本地业务逻辑进行处理。
在此过程中,代理等待消费者完成消费并提交消费结果。如果在一定时间内没有收到消费者的响应,Apache RocketMQ 将重试消息。有关更多信息,请参阅 消费重试。
已确认:消费者完成消费并向代理提交消费结果。代理标记当前消息是否已成功消费。
默认情况下,Apache RocketMQ 会保留所有消息。当提交消费结果时,消息数据在逻辑上被标记为已消费,而不是立即删除。因此,消费者可以在消息因保留期到期或存储空间不足而被删除之前回溯消息以重新消费。
已删除:当消息的保留期到期或存储空间不足时,Apache RocketMQ 会以滚动方式从物理文件中删除保存的最早消息。有关更多信息,请参阅 消息存储和清理。
消息消费失败或超时会触发服务器的重试逻辑。如果为消息触发了消费重试,则该消息将到达其生命周期的末尾。原始消息被视为具有新消息 ID 的新消息。
如果为顺序消息触发了消费重试,则只有在消费了顺序消息之后才能处理顺序消息后面的消息。
使用限制
顺序消息仅支持 MessageType 为 FIFO 的主题。
示例
创建主题
对于在 Apache RocketMQ 5.0 中创建主题,建议使用 mqadmin 工具。但是,值得注意的是,消息类型需要作为属性参数添加。以下是一个示例
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO
发送消息
与普通消息相比,顺序消息必须配置消息组。我们建议您根据业务需求在细粒度级别配置消息组,以允许工作负载解耦和并发扩展。
创建 FIFO 主题
./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
- -c 集群名称
- -t 主题名称
- -n 命名服务器地址
- -o 创建有序主题的标志
以下示例代码提供了一个如何在 Java 中发送和接收有序消息的示例
// Send ordered messages.
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// Configure a message group for the ordered messages. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Make sure that ordered delivery is applied to the consumer group. Otherwise, the messages are delivered concurrently and in no particular order.
// Consumption example 1: If the consumer type is PushConsumer, the consumer needs to only process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If the consumer type is SimpleConsumer, the consumer must actively obtain the message for consumption and submit the consumption result.
// If the consumption of a message in the message group has not finished, the next message in the message group cannot be retrieved if you call the Receive function.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用说明
使用串行消费来防止消息乱序处理。
我们建议您使用串行消息消费而不是批量消费。同时消费多条消息可能会导致消息乱序处理。
例如,消息 1、2、3 和 4 按 1-2-3-4 的顺序发送,批量消费的顺序为 1-[2, 3](批量处理但失败)-[2, 3](重试)-4。如果消息 3 处理失败,系统可能会重复处理消息 2。因此,会导致消息乱序消费。
避免在消息组中包含大量消息。
Apache RocketMQ 确保同一消息组中的消息存储在同一个队列中。包含大量消息的消息组会导致相应的队列过载。这会影响消息性能并阻碍可扩展性。配置消息组时,可以使用订单 ID 和用户 ID 作为消息排序条件。这确保了同一用户的消息顺序。
我们建议您在业务应用程序中按消息组拆分消息。例如,您可以使用订单 ID 和用户 ID 作为消息组关键字来实现同一用户的消息有序处理。您不需要确保不同用户的消息顺序。