顺序消息
顺序消息是 Apache RocketMQ 提供的一种高级消息类型。本文介绍了顺序消息的适用场景、工作原理、使用限制、使用示例及使用注意事项。
适用场景
在有序事件处理、交易撮合、数据实时增量同步等场景下,异构系统间需要保持强一致性,要求上游应用在事件发生变更时,能按顺序将消息投递给下游应用。Apache RocketMQ 提供的顺序消息可以帮助您实现消息的顺序传输。
场景一:交易撮合 
例如,在证券股票交易场景中,对于同一笔买入订单,如果多人出价相同,则先出价者优先成交。因此,下游订单处理系统必须按照出价的顺序来处理订单。
场景二:数据实时增量同步
普通消息
顺序消息
例如,在数据库增量数据同步场景中,利用 Apache RocketMQ 的顺序消息,将上游源数据库的增删改操作记录(Binlog)按发送顺序发送至下游查询系统,下游系统按序接收并执行,从而确保下游数据库状态与上游保持一致。若使用普通消息,则可能因乱序导致状态不一致。
工作原理
顺序消息定义
顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照消息发送的顺序获取消息,从而实现业务场景中的顺序处理。
顺序消息的核心特征是:发送顺序、存储顺序、消费顺序的一致性。
Apache RocketMQ 通过“消息组(Message Group)”来保证消息的顺序性。您必须为顺序消息配置消息组,同一消息组内的消息遵循先进先出(FIFO)原则。不同消息组之间、或者没有配置消息组的消息之间,不保证顺序性。
基于消息组的排序能力,您可以根据业务逻辑自定义排序粒度,既能实现业务系统的部分顺序性,又能最大限度地保证系统的并发度和吞吐量。
消息顺序性
Apache RocketMQ 中的消息顺序性分为:生产顺序和消费顺序。
生产顺序:Apache RocketMQ 通过生产者与服务端之间的协议,确保消息从生产者发送到服务端是串行的,且按照发送顺序进行存储和持久化。
要确保生产顺序,需满足以下条件:
单生产者:生产顺序仅适用于单个生产者。对于不同系统中的不同生产者,即使配置了相同的消息组,Apache RocketMQ 也无法确定其发送顺序。
串行发送:Apache RocketMQ 生产者支持多线程安全访问,但如果生产者使用多线程并发发送消息,Apache RocketMQ 无法确定来自不同线程的消息顺序。
在满足上述条件下,同一消息组的消息会按发送顺序存储在同一个队列中。下图展示了 Apache RocketMQ 的顺序存储逻辑。

图中,MessageGroup 1 和 MessageGroup 4 的消息被存储在同一个队列(MessageQueue 1)中。Apache RocketMQ 保证 MessageGroup 1 的 G1-M1、G1-M2、G1-M3 按发送顺序存储,MessageGroup 4 的 G4-M1、G4-M2 也按发送顺序存储。但 MessageGroup 1 和 MessageGroup 4 之间的消息顺序是不确定的。
消费顺序:
Apache RocketMQ 通过消费者与服务端之间的协议,确保消息被消费的顺序与存储顺序一致。
要确保消费顺序,需满足以下条件:
投递顺序:Apache RocketMQ 通过客户端 SDK 和服务端通信协议,保证消息按服务端存储顺序进行投递。消费应用必须遵循“接收-处理-回复”的逻辑,避免因异步处理导致乱序。
注意- 在使用 PushConsumer 进行消费时,Apache RocketMQ 保证消息逐条按照存储顺序投递给消费者。
- 在使用 SimpleConsumer 进行消费时,消费者可能一次拉取多条消息,业务应用需自行保证消费顺序。更多信息,请参考消费类型。
重试限制:Apache RocketMQ 对顺序消息的重试次数有限制。当消息达到最大重试次数仍未成功时,将不再重试,以防止单个消息阻塞整个队列的消费。
在对消费顺序有严格要求的场景中,建议合理设置重试次数,以防因反复重试导致乱序。
生产顺序与消费顺序的组合
若要实现端到端的 FIFO,则必须同时满足生产顺序和消费顺序。在大多数业务场景下,生产者可能对应多个消费者,并非所有消费者都需要严格的顺序消费。您可以根据需求组合配置。例如,发送顺序消息,但使用非顺序并发消费以提升吞吐量。下表展示了不同组合的效果:
| 生产顺序 | 消费顺序 | 效果 |
|---|---|---|
| 配置消息组,实现顺序投递。 | 顺序消费 | 消息组级别顺序。同一消息组内的消息发送和消费顺序一致。 |
| 配置消息组,实现顺序投递。 | 并发消费 | 消息按照发送顺序并发消费。 |
| 不配置消息组,实现非顺序投递。 | 顺序消费 | 队列级别顺序。消息消费顺序与队列中的存储顺序一致,但不一定与发送顺序一致。 |
| 不配置消息组,实现非顺序投递。 | 并发消费 | 消息按照发送顺序并发消费。 |
顺序消息的生命周期 
初始化:生产者构建并初始化消息,准备发送至 Broker。
就绪(Ready):消息已发送至 Broker,对消费者可见并可消费。
处理中(Inflight):消息被消费者获取,并按照本地业务逻辑进行处理。
在此期间,Broker 等待消费者提交结果。若超时未收到反馈,Apache RocketMQ 会对该消息进行重试。详情请参考消费重试。
确认(Acked):消费者完成消费并提交结果,Broker 标记消息消费状态。
默认情况下,Apache RocketMQ 保留所有消息。消费确认后仅进行逻辑标记,不会立即物理删除。因此,在消息过期或空间不足被清理前,消费者可以回溯消费。
删除:当消息过期或存储空间不足时,Apache RocketMQ 会按滚动方式删除最早的物理文件。详情请参考消息存储与清理。
消费失败或超时将触发服务端重试逻辑。若消息触发重试,其原生命周期结束,重试后的消息会被视为一条全新的消息(具有新的消息 ID)。
对于顺序消息,若某条消息触发重试,则该消息之后的其他消息必须等待该消息处理完成才能继续消费。
使用限制
顺序消息仅支持 MessageType 为 FIFO 的主题(Topic)。
示例
创建主题(Topic)
在 Apache RocketMQ 5.0 中,建议使用 mqadmin 工具创建主题。注意,需要添加消息类型作为属性参数。以下是示例:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO
创建订阅组(subscriptionGroup)
建议使用 mqadmin 工具创建。注意,-o 选项必须设置为 true。以下是示例:
sh mqadmin updateSubGroup -c <cluster_name> -g <consumer_group_name> -n <nameserver_address> -o true
发送消息
与普通消息相比,顺序消息必须配置消息组。建议根据业务需求配置细粒度的消息组,以便实现负载解耦和并发扩展。
创建 FIFO 主题
./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
- -c 集群名称
- -t 主题名称
- -n nameserver 地址
- -o 创建顺序主题的标志
创建 FIFO 订阅组
./bin/mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -n 127.0.0.1:9876 -o true
- -c 集群名称
- -g 订阅组名称
- -n nameserver 地址
- -o 创建顺序订阅组的标志
以下 Java 示例代码展示了如何发送和接收顺序消息:
// Send ordered messages.
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// Configure a message group for the ordered messages. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Make sure that ordered delivery is applied to the consumer group. Otherwise, the messages are delivered concurrently and in no particular order.
// Consumption example 1: If the consumer type is PushConsumer, the consumer needs to only process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If the consumer type is SimpleConsumer, the consumer must actively obtain the message for consumption and submit the consumption result.
// If the consumption of a message in the message group has not finished, the next message in the message group cannot be retrieved if you call the Receive function.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用说明
使用串行消费以避免乱序。
建议采用单条串行消费,而非批量消费。批量消费可能导致处理过程中的乱序。
例如,按 1-2-3-4 顺序发送,批量消费顺序为 1-[2, 3](批量处理但失败)-[2, 3](重试)-4。如果消息 3 处理失败,系统可能会反复重试消息 2,导致乱序。
避免单个消息组包含海量消息。
Apache RocketMQ 保证同一消息组的消息存储在同一队列中。过大的消息组会导致队列负载过高,影响性能并阻碍扩展。配置消息组时,可以使用 Order ID 或 User ID 作为序列化条件。
建议在业务层面按消息组拆分消息。例如,使用订单 ID 或用户 ID 作为消息组关键字,仅确保同一用户的消息有序,而无需保证不同用户之间的顺序。