延迟消息
延迟消息是 Apache RocketMQ 中具有高级功能的消息。本主题描述了延迟消息和延迟消息的场景、工作机制、限制、使用示例和使用注意事项。
定时消息和延迟消息本质上是一样的。它们都根据消息设置的定时时间,在固定时间将消息传递给消费者。因此,在以下部分中使用延迟消息。
场景
在分布式定时调度和任务超时处理等场景中,需要准确可靠的基于时间的事件触发。Apache RocketMQ 提供延迟消息,帮助您简化定时调度任务的开发,实现高性能、可扩展和可靠的定时触发。
场景 1:分布式定时调度
分布式定时调度场景涉及需要各种时间粒度级别的任务,例如,每天 5 点执行文件清理的任务,或每 2 分钟触发推送消息的任务。传统的基于数据集的定时调度解决方案在分布式场景中复杂且效率低下。相比之下,Apache RocketMQ 中的延迟消息允许您封装多种类型的定时触发器。
场景 2:任务超时处理
涉及任务超时处理的典型场景是电子商务支付,其中未支付的订单在超过特定时间段后被取消,而不是立即取消。在这种情况下,您可以使用 Apache RocketMQ 中的延迟消息来检查和触发超时任务。
基于延迟消息的任务超时处理提供了以下优势
各种时间粒度级别和简化的开发:Apache RocketMQ 中的定时消息没有固定时间增量的限制。您可以以任何时间粒度级别触发任务,而无需重复数据删除。
高性能和可扩展性:Apache RocketMQ 中的延迟消息提供高并发性和可扩展性。这优于传统的数据库扫描方法,这些方法实现起来很复杂,并且由于频繁的 API 调用以进行扫描而会导致性能瓶颈。
工作机制
延迟消息的定义
延迟消息是 Apache RocketMQ 中具有高级功能的消息。延迟消息允许消费者仅在指定时间段后或在指定时间才能消费发送到服务器的消息。您可以使用延迟消息在分布式场景中实现延迟调度和触发。
时间设置规则
Apache RocketMQ 中延迟消息的计划时间或延迟时间用时间戳表示,而不是时间段。
计划时间采用毫秒级 Unix 时间戳格式。您必须将消息传递的计划时间转换为毫秒级 Unix 时间戳。您可以使用Unix 时间戳转换器将时间转换为毫秒级 Unix 时间戳。
计划时间必须在允许的时间范围内。如果计划时间超出范围,则计划时间不生效,消息立即从服务器端传递。
默认情况下,延迟消息的最大时间范围为 24 小时。您无法更改默认值。有关更多信息,请参阅参数限制。
计划时间必须晚于当前时间。如果计划时间设置为早于当前时间的某个时间,则计划时间不生效,消息立即从服务器端传递。
以下部分提供了两个时间设置示例
延迟消息:如果当前时间为 2022-06-09 17:30:00,您想在 2022-06-09 19:20:00 传递消息,则计划时间的毫秒级 Unix 时间戳为 1654773600000。
延迟消息:如果当前时间为 2022-06-09 17:30:00,您想在 1 小时后传递消息,则消息传递时间为 2022-06-09 18:30:00,毫秒级 Unix 时间戳为 1654770600000。
定时消息的生命周期
初始化:消息由生产者构建和初始化,并准备发送到服务器。
定时:消息发送到服务器端,消息存储在基于时间的存储系统中,直到指定的传递时间。消息不会立即创建索引。
准备就绪:在指定的时间,消息被写入常规存储引擎,消息对消费者可见,并等待消费者消费。
正在传输:消息由消费者获取,并根据消费者的本地业务逻辑进行处理。
在此过程中,代理等待消费者完成消费并提交消费结果。如果在一定时间内没有收到消费者的响应,Apache RocketMQ 会重试消息。有关更多信息,请参阅消费重试。
已确认:消费者完成消费并向代理提交消费结果。代理标记当前消息是否成功消费。
默认情况下,Apache RocketMQ 会保留所有消息。当提交消费结果时,消息数据在逻辑上被标记为已消费,而不是立即删除。因此,消费者可以在消息因保留期到期或存储空间不足而被删除之前回溯消息以重新消费。
- 已删除:当消息的保留期到期或存储空间不足时,Apache RocketMQ 会以滚动方式从物理文件中删除保存的最早消息。有关更多信息,请参阅消息存储和清理。
使用限制
消息类型一致性
延迟消息只能发送到 MessageType 为 Delay 的主题。
时间粒度
Apache RocketMQ 中延迟消息的时间粒度可以细化到毫秒级。默认粒度值为 1000 毫秒。
Apache RocketMQ 中延迟消息的状态可以持久存储。如果消息系统出现故障并重新启动,消息仍会根据指定的传递时间进行传递。但是,如果存储系统出现异常或重新启动,可能会在传递延迟消息时出现延迟。
示例
创建主题
对于在 Apache RocketMQ 5.0 中创建主题,建议使用 mqadmin 工具。但是,需要注意的是,消息类型需要作为属性参数添加。以下是一个示例
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Delay
发送消息
与普通消息不同,延迟消息必须指定传递时间戳。
创建 DELAY 主题
/bin/mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
- -c 集群名称
- -t 主题名称
- -n 命名服务器的地址
- -a 额外的属性,我们添加了一个
message.type
属性,其值为DELAY
,以支持传递 DELAY 消息。
以下代码提供了传递和消费延迟消息的 Java 示例
// Send delay messages.
MessageBuilder messageBuilder = null;
// Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time.
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
// Specify the message index key. The system uses the key to locate the message.
.setKeys("messageKey")
// Specify the message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
// Configure the message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the messages. Focus on the result of message sending and exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
}
使用注意事项
建议您不要为大量消息安排相同的传递时间。
延迟消息在传递到消费者之前存储在基于时间的存储系统中,直到指定的传递时间。如果您为大量延迟消息指定相同的传递时间,系统必须在传递时间同时处理这些消息。这会给系统带来沉重的负载,并导致消息传递延迟。