跳到主要内容
版本: 5.0

普通消息

普通消息是 Apache RocketMQ 中不具有特殊功能的消息。它们不同于具有特定功能的消息,例如 FIFO 消息、延时消息和事务消息。本文介绍普通消息的场景、工作机制、使用方式和使用注意事项。

场景

普通消息通常应用于微服务解耦、数据集成以及事件驱动等场景。这些场景大多对消息处理的时效性或顺序性没有要求,只需具备可靠的传输通道即可。

场景一:微服务异步解耦 在线消息处理

上图展示了一个电商在线交易场景。在该场景中,上游订单系统将下单、支付等行为封装为独立的普通消息发送至 Apache RocketMQ 服务端。下游系统根据业务按需向服务端订阅消息,并根据本地消费逻辑处理任务。消息之间相互独立,无需关联。

场景二:数据集成传输 数据传输

上图以离线日志采集为例。前端应用的操作日志由采集组件进行收集并转发到 Apache RocketMQ。每条消息均为一条日志数据,无需 Apache RocketMQ 进行处理。Apache RocketMQ 仅需将日志数据投递到下游的存储分析系统即可,后续处理任务由后端应用自行负责。

工作机制

普通消息的定义

普通消息是 Apache RocketMQ 中具备基础功能的消息。普通消息支持生产者和消费者之间进行异步解耦通信。 生命周期

普通消息的生命周期

  • 初始化(Initialized):消息由生产者构建、初始化,准备发送到服务端。

  • 就绪(Ready):消息成功发送到服务端,对消费者可见并可被消费。

  • 处理中(Inflight):消息被消费者获取,根据消费者本地业务逻辑进行处理。

    在此过程中,服务端会等待消费者完成消费并提交消费结果。如果服务端在一定时间内未收到消费者响应,Apache RocketMQ 会对该消息进行重试。具体信息,请参见消费重试

  • 已确认(Acked):消费者完成消费,并向服务端提交消费结果。服务端标记当前消息是否消费成功。

    Apache RocketMQ 默认保存所有消息。消费结果提交时,消息数据只是被逻辑标记为已消费,并不会立即删除。因此,消费者可以在消息因保存时间过期或存储空间不足被删除前,进行消息回溯重新消费。

  • 已删除(Deleted):当消息保存时间到期或存储空间不足时,Apache RocketMQ 会按照滚动删除的方式将最早保存的消息从物理文件中删除。具体信息,请参见消息存储与清理

使用限制

普通消息仅支持 MessageType 为 Normal 的 Topic。

示例

创建 Topic

在 Apache RocketMQ 5.0 中创建 Topic,推荐使用 mqadmin 工具,但需要注意将消息类型作为属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL

发送消息

您可以设置索引键和过滤标签,对普通消息进行过滤或查询。以下为 Java 语言发送和接收普通消息的示例代码:

// Send a normal message. 
MessageBuilder messageBuilder = new MessageBuilder();
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that you can accurately search for the message by using a keyword.
.setKeys("messageKey")
// Specify the message tag so that the consumer can filter the message based on the specified tag.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. You need to pay attention to the sending result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
// Consumption example 1: When you consume a normal message as a push consumer, you need only to process the message in the message listener.
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
// Return the status based on the consumption result.
return ConsumeResult.SUCCESS;
}
};
// Consumption example 2: When you consume a normal message as a simple consumer, you must obtain and consume the message, and submit the consumption result.
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, you must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}

使用注意事项

设置全局唯一索引键,方便问题排查

Apache RocketMQ 支持设置自定义索引键,即消息的 Key。在消息查询和消息轨迹查询中,索引键可以帮助您高效、精准地查找消息。

因此,发送消息时,建议您使用业务的唯一信息作为索引,例如订单 ID、用户 ID 等,以便后续快速查询消息。