延迟消息
延迟消息是 Apache RocketMQ 中的一种高级消息类型。本文档将介绍延迟消息的场景、工作机制、限制、使用示例和使用注意事项。
定时消息和延迟消息本质上是相同的。它们都根据消息设置的定时时间在固定时间点将消息传递给消费者。因此,在以下章节中将统一使用“延迟消息”的说法。
场景
在分布式定时调度和任务超时处理等场景中,需要精确可靠的基于时间的事件触发。Apache RocketMQ 提供延迟消息来帮助您简化定时调度任务的开发,并实现高性能、可扩展且可靠的定时触发。
场景一:分布式定时调度
分布式定时调度场景涉及需要各种时间粒度级别的任务,例如每天凌晨 5 点执行文件清理任务或每 2 分钟触发一次推送消息。传统的基于数据集的定时调度解决方案在分布式场景中复杂且效率低下。相比之下,Apache RocketMQ 中的延迟消息允许您封装多种类型的时间触发器。
场景二:任务超时处理
任务超时处理的一个典型场景是电商支付,其中未支付的订单在经过特定时间段后被取消,而不是立即取消。在这种情况下,您可以使用 Apache RocketMQ 中的延迟消息来检查和触发超时任务。
基于延迟消息的任务超时处理提供以下优势:
多种时间粒度级别和简化开发:Apache RocketMQ 中的定时消息没有固定时间增量的限制。您可以以任何时间粒度级别触发任务,并且无需去重。
高性能和可扩展性:Apache RocketMQ 中的延迟消息提供高并发和可扩展性。这优于传统的数据库扫描方法,后者实现复杂,并且由于频繁的 API 调用进行扫描而可能导致性能瓶颈。
工作机制
延迟消息的定义
延迟消息是 Apache RocketMQ 中的一种高级消息。延迟消息允许消费者在消息发送到服务器后,仅在指定时间段或指定时间点后才消费消息。您可以使用延迟消息在分布式场景中实现延迟调度和触发。
时间设置规则
Apache RocketMQ 中延迟消息的定时或延迟时间以时间戳表示,而不是时间段。
定时时间以毫秒级 Unix 时间戳格式表示。您必须将消息投递的定时时间转换为毫秒级 Unix 时间戳。您可以使用 Unix 时间戳转换器 将时间转换为毫秒级 Unix 时间戳。
定时时间必须在允许的时间范围内。如果定时时间超出范围,则定时时间不生效,消息将立即从服务端投递。
默认情况下,延迟消息的最大时间范围为 24 小时。您无法更改默认值。更多信息,请参见参数限制。
定时时间必须晚于当前时间。如果定时时间设置为早于当前时间,则定时时间不生效,消息将立即从服务端投递。
以下章节提供了两个时间设置示例:
延迟消息:如果当前时间是 2022-06-09 17:30:00,您想在 2022-06-09 19:20:00 投递消息,则定时时间的毫秒级 Unix 时间戳为 1654773600000。
延迟消息:如果当前时间是 2022-06-09 17:30:00,您想在 1 小时后投递消息,则消息投递时间为 2022-06-09 18:30:00,毫秒级 Unix 时间戳为 1654770600000。
定时消息的生命周期
初始化:消息由生产者构建并初始化,准备发送到服务器。
定时中:消息发送到服务端,在指定投递时间之前,消息存储在基于时间的存储系统中。不会立即为消息创建索引。
就绪:在指定时间,消息被写入常规存储引擎,消息对消费者可见,并等待消费者消费。
处理中:消息被消费者获取,并根据消费者的本地业务逻辑进行处理。
在此过程中,Broker 等待消费者完成消费并提交消费结果。如果在一定时间内没有收到消费者的响应,Apache RocketMQ 将重试该消息。更多信息,请参见消费重试。
确认:消费者完成消费并将消费结果提交给 Broker。Broker 标记当前消息是否成功消费。
默认情况下,Apache RocketMQ 会保留所有消息。当提交消费结果时,消息数据被逻辑标记为已消费,而不是立即删除。因此,在消息因保留期过期或存储空间不足而被删除之前,消费者可以回溯消息以重新消费。
- 删除:当消息的保留期过期或存储空间不足时,Apache RocketMQ 会以滚动方式从物理文件中删除最早保存的消息。更多信息,请参见消息存储与清理。
使用限制
消息类型一致性
延迟消息只能发送到 `MessageType` 为 `Delay` 的 Topic。
时间粒度
Apache RocketMQ 中延迟消息的时间粒度精确到毫秒。默认粒度值为 1000 毫秒。
Apache RocketMQ 中延迟消息的状态可以持久化存储。如果消息系统发生故障并重新启动,消息仍将根据指定的投递时间进行投递。但是,如果存储系统发生异常或重新启动,投递延迟消息可能会出现延迟。
示例
创建 Topic
在 Apache RocketMQ 5.0 中创建 Topic,建议使用 `mqadmin` 工具。但是,值得注意的是,消息类型需要作为属性参数添加。示例如下:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Delay
发送消息
与普通消息不同,延迟消息必须指定投递时间戳。
创建 DELAY Topic
/bin/mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
- -c 集群名称
- -t Topic 名称
- -n Nameserver 地址
- -a 额外属性,我们添加一个 `message.type` 属性,值为 `DELAY`,以支持投递 DELAY 消息。
以下代码提供了延迟消息投递和消费的 Java 示例:
// Send delay messages.
MessageBuilder messageBuilder = null;
// Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time.
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
}
使用注意事项
我们建议您不要为大量消息安排相同的投递时间。
延迟消息在指定投递时间之前存储在基于时间的存储系统中。如果您为大量延迟消息指定相同的投递时间,系统必须在投递时间同时处理这些消息。这会使系统负载过重,并导致消息投递延迟。