跳转至主要内容
版本: 5.0

顺序消息

顺序消息是 Apache RocketMQ 提供的一种高级消息类型。本文介绍了顺序消息的适用场景、工作原理、使用限制、使用示例及使用注意事项。

适用场景

在有序事件处理、交易撮合、数据实时增量同步等场景下,异构系统间需要保持强一致性,要求上游应用在事件发生变更时,能按顺序将消息投递给下游应用。Apache RocketMQ 提供的顺序消息可以帮助您实现消息的顺序传输。

场景一:交易撮合 交易撮合

例如,在证券股票交易场景中,对于同一笔买入订单,如果多人出价相同,则先出价者优先成交。因此,下游订单处理系统必须按照出价的顺序来处理订单。

场景二:数据实时增量同步

普通消息普通消息 顺序消息顺序消息

例如,在数据库增量数据同步场景中,利用 Apache RocketMQ 的顺序消息,将上游源数据库的增删改操作记录(Binlog)按发送顺序发送至下游查询系统,下游系统按序接收并执行,从而确保下游数据库状态与上游保持一致。若使用普通消息,则可能因乱序导致状态不一致。

工作原理

顺序消息定义

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照消息发送的顺序获取消息,从而实现业务场景中的顺序处理。

顺序消息的核心特征是:发送顺序、存储顺序、消费顺序的一致性。

Apache RocketMQ 通过“消息组(Message Group)”来保证消息的顺序性。您必须为顺序消息配置消息组,同一消息组内的消息遵循先进先出(FIFO)原则。不同消息组之间、或者没有配置消息组的消息之间,不保证顺序性。

基于消息组的排序能力,您可以根据业务逻辑自定义排序粒度,既能实现业务系统的部分顺序性,又能最大限度地保证系统的并发度和吞吐量。

消息顺序性

Apache RocketMQ 中的消息顺序性分为:生产顺序和消费顺序。

  • 生产顺序:Apache RocketMQ 通过生产者与服务端之间的协议,确保消息从生产者发送到服务端是串行的,且按照发送顺序进行存储和持久化。

    要确保生产顺序,需满足以下条件:

    • 单生产者:生产顺序仅适用于单个生产者。对于不同系统中的不同生产者,即使配置了相同的消息组,Apache RocketMQ 也无法确定其发送顺序。

    • 串行发送:Apache RocketMQ 生产者支持多线程安全访问,但如果生产者使用多线程并发发送消息,Apache RocketMQ 无法确定来自不同线程的消息顺序。

在满足上述条件下,同一消息组的消息会按发送顺序存储在同一个队列中。下图展示了 Apache RocketMQ 的顺序存储逻辑。

顺序存储逻辑

图中,MessageGroup 1 和 MessageGroup 4 的消息被存储在同一个队列(MessageQueue 1)中。Apache RocketMQ 保证 MessageGroup 1 的 G1-M1、G1-M2、G1-M3 按发送顺序存储,MessageGroup 4 的 G4-M1、G4-M2 也按发送顺序存储。但 MessageGroup 1 和 MessageGroup 4 之间的消息顺序是不确定的。

  • 消费顺序

    Apache RocketMQ 通过消费者与服务端之间的协议,确保消息被消费的顺序与存储顺序一致。

    要确保消费顺序,需满足以下条件:

    • 投递顺序:Apache RocketMQ 通过客户端 SDK 和服务端通信协议,保证消息按服务端存储顺序进行投递。消费应用必须遵循“接收-处理-回复”的逻辑,避免因异步处理导致乱序。

      注意
      • 在使用 PushConsumer 进行消费时,Apache RocketMQ 保证消息逐条按照存储顺序投递给消费者。
      • 在使用 SimpleConsumer 进行消费时,消费者可能一次拉取多条消息,业务应用需自行保证消费顺序。更多信息,请参考消费类型
    • 重试限制:Apache RocketMQ 对顺序消息的重试次数有限制。当消息达到最大重试次数仍未成功时,将不再重试,以防止单个消息阻塞整个队列的消费。

    在对消费顺序有严格要求的场景中,建议合理设置重试次数,以防因反复重试导致乱序。

生产顺序与消费顺序的组合

若要实现端到端的 FIFO,则必须同时满足生产顺序和消费顺序。在大多数业务场景下,生产者可能对应多个消费者,并非所有消费者都需要严格的顺序消费。您可以根据需求组合配置。例如,发送顺序消息,但使用非顺序并发消费以提升吞吐量。下表展示了不同组合的效果:

生产顺序消费顺序效果
配置消息组,实现顺序投递。顺序消费消息组级别顺序。同一消息组内的消息发送和消费顺序一致。
配置消息组,实现顺序投递。并发消费消息按照发送顺序并发消费。
不配置消息组,实现非顺序投递。顺序消费队列级别顺序。消息消费顺序与队列中的存储顺序一致,但不一定与发送顺序一致。
不配置消息组,实现非顺序投递。并发消费消息按照发送顺序并发消费。

顺序消息的生命周期 生命周期

  • 初始化:生产者构建并初始化消息,准备发送至 Broker。

  • 就绪(Ready):消息已发送至 Broker,对消费者可见并可消费。

  • 处理中(Inflight):消息被消费者获取,并按照本地业务逻辑进行处理。

    在此期间,Broker 等待消费者提交结果。若超时未收到反馈,Apache RocketMQ 会对该消息进行重试。详情请参考消费重试

  • 确认(Acked):消费者完成消费并提交结果,Broker 标记消息消费状态。

    默认情况下,Apache RocketMQ 保留所有消息。消费确认后仅进行逻辑标记,不会立即物理删除。因此,在消息过期或空间不足被清理前,消费者可以回溯消费。

  • 删除:当消息过期或存储空间不足时,Apache RocketMQ 会按滚动方式删除最早的物理文件。详情请参考消息存储与清理

注意
  • 消费失败或超时将触发服务端重试逻辑。若消息触发重试,其原生命周期结束,重试后的消息会被视为一条全新的消息(具有新的消息 ID)。

  • 对于顺序消息,若某条消息触发重试,则该消息之后的其他消息必须等待该消息处理完成才能继续消费。

使用限制

顺序消息仅支持 MessageType 为 FIFO 的主题(Topic)。

示例

创建主题(Topic)

在 Apache RocketMQ 5.0 中,建议使用 mqadmin 工具创建主题。注意,需要添加消息类型作为属性参数。以下是示例:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=FIFO

创建订阅组(subscriptionGroup)

建议使用 mqadmin 工具创建。注意,-o 选项必须设置为 true。以下是示例:

sh mqadmin updateSubGroup -c <cluster_name> -g <consumer_group_name> -n <nameserver_address> -o true

发送消息

与普通消息相比,顺序消息必须配置消息组。建议根据业务需求配置细粒度的消息组,以便实现负载解耦和并发扩展。

创建 FIFO 主题

./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
  • -c 集群名称
  • -t 主题名称
  • -n nameserver 地址
  • -o 创建顺序主题的标志

创建 FIFO 订阅组

./bin/mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -n 127.0.0.1:9876 -o true
  • -c 集群名称
  • -g 订阅组名称
  • -n nameserver 地址
  • -o 创建顺序订阅组的标志

以下 Java 示例代码展示了如何发送和接收顺序消息:

        // Send ordered messages. 
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// Configure a message group for the ordered messages. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Make sure that ordered delivery is applied to the consumer group. Otherwise, the messages are delivered concurrently and in no particular order.
// Consumption example 1: If the consumer type is PushConsumer, the consumer needs to only process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If the consumer type is SimpleConsumer, the consumer must actively obtain the message for consumption and submit the consumption result.
// If the consumption of a message in the message group has not finished, the next message in the message group cannot be retrieved if you call the Receive function.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用说明

使用串行消费以避免乱序。

建议采用单条串行消费,而非批量消费。批量消费可能导致处理过程中的乱序。

例如,按 1-2-3-4 顺序发送,批量消费顺序为 1-[2, 3](批量处理但失败)-[2, 3](重试)-4。如果消息 3 处理失败,系统可能会反复重试消息 2,导致乱序。

避免单个消息组包含海量消息。

Apache RocketMQ 保证同一消息组的消息存储在同一队列中。过大的消息组会导致队列负载过高,影响性能并阻碍扩展。配置消息组时,可以使用 Order ID 或 User ID 作为序列化条件。

建议在业务层面按消息组拆分消息。例如,使用订单 ID 或用户 ID 作为消息组关键字,仅确保同一用户的消息有序,而无需保证不同用户之间的顺序。