普通消息
普通消息是指在 Apache RocketMQ 中没有特殊功能的消息。它们不同于具有特殊功能的消息,例如 FIFO 消息、延迟消息和事务消息。本主题描述了普通消息的场景、工作机制、用法和使用注意事项。
场景
普通消息通常用于微服务解耦、数据集成和事件驱动场景。大多数这些场景对消息处理的时序或顺序要求较低或没有要求,除了可靠的传输通道。
场景 1:微服务的异步解耦
上图展示了一个在线电子商务交易场景。在这个场景中,上游订单系统将订单创建和支付封装为独立的普通消息,并将其发送到 Apache RocketMQ 代理。然后,下游系统按需从代理订阅消息,并根据本地消费逻辑处理任务。消息之间相互独立,不需要关联。
场景 2:数据集成传输
上图以离线日志收集为例。使用仪器组件从前端应用程序收集操作日志,并将日志转发到 Apache RocketMQ。每条消息都是一段日志数据,不需要 Apache RocketMQ 进行处理。Apache RocketMQ 只需要将日志数据发送到下游存储和分析系统。后端应用程序负责后续处理任务。
工作机制
普通消息的定义
普通消息是 Apache RocketMQ 中具有基本功能的消息。普通消息支持生产者和消费者之间的异步解耦和通信。
普通消息的生命周期
初始化:消息由生产者构建和初始化,并准备发送到代理。
就绪:消息被发送到代理,对消费者可见,可供消费。
飞行中:消息被消费者获取,并根据消费者的本地业务逻辑进行处理。
在这个过程中,代理等待消费者完成消费并提交消费结果。如果在一定时间内没有收到消费者的响应,Apache RocketMQ 会重试消息。有关更多信息,请参见 消费重试。
已确认:消费者完成消费并向代理提交消费结果。代理标记当前消息是否成功消费。
默认情况下,Apache RocketMQ 会保留所有消息。当提交消费结果时,消息数据在逻辑上被标记为已消费,而不是立即删除。因此,消费者可以在消息因保留期过期或存储空间不足而被删除之前,回溯消息以重新消费。
已删除:当消息的保留期过期或存储空间不足时,Apache RocketMQ 会以滚动方式从物理文件中删除保存的最早消息。有关更多信息,请参见 消息存储和清理。
使用限制
普通消息仅支持 MessageType 为 Normal 的主题。
示例
创建主题
对于在 Apache RocketMQ 5.0 中创建主题,建议使用 mqadmin 工具。但是,值得注意的是,消息类型需要作为属性参数添加。以下是一个示例
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL
发送消息
您可以设置索引键和过滤标签来过滤或搜索普通消息。以下示例代码展示了如何在 Java 中发送和接收普通消息
// Send a normal message.
MessageBuilder messageBuilder = new MessageBuilder();
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that you can accurately search for the message by using a keyword.
.setKeys("messageKey")
// Specify the message tag so that the consumer can filter the message based on the specified tag.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. You need to pay attention to the sending result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: When you consume a normal message as a push consumer, you need only to process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: When you consume a normal message as a simple consumer, you must obtain and consume the message, and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, you must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
使用注意事项
设置全局唯一的索引键以方便排查问题
您可以在 Apache RocketMQ 中设置自定义索引键,即消息键。当您查询和跟踪消息时,索引键可以帮助您高效准确地找到这些消息。
因此,在发送消息时,建议您使用服务的唯一信息,例如订单 ID 和用户 ID,作为索引。这有助于您将来快速找到消息。