跳至主要内容
版本: 5.0

消息

本节介绍 Apache RocketMQ 中消息的定义、模型关系、内部属性和行为约束。本主题还提供有关消息的使用说明。

定义

消息是 Apache RocketMQ 中数据传输的最小单位。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到 Apache RocketMQ 代理。然后,代理根据相关语义将消息传递给消费者。

Apache RocketMQ 中消息模型的特征是

  • 不可变性:消息是生成的事件。消息生成后,消息内容不会改变。即使消息通过传输通道,消息内容也保持不变。消费者获取的消息是只读消息。
  • 持久性:默认情况下,Apache RocketMQ 会持久化消息。接收到的消息存储在 Apache RocketMQ 代理的存储文件中,以确保在发生系统故障时可以跟踪和恢复消息。

模型关系

下图显示了消息在 Apache RocketMQ 领域模型中的位置。消息

  1. 消息由生产者初始化,并发送到 Apache RocketMQ 代理。

  2. 消息按接收消息的顺序存储在 Apache RocketMQ 代理上的队列中。

  3. 消费者根据指定的订阅从 Apache RocketMQ 代理获取和消费消息。

内部属性

系统保留属性

主题名称

  • 定义:消息所属主题的名称。主题名称在集群中是全局唯一的。有关更多信息,请参阅 主题

  • 值:从客户端的 SDK 获取。

消息类型

  • 定义:消息的类型。

  • 值:从客户端的 SDK 获取。Apache RocketMQ 支持以下消息类型

    • 普通:普通消息。普通消息不需要特殊语义,也不与其他普通消息相关联。

    • FIFO:FIFO 消息。Apache RocketMQ 使用消息组来确定一组指定消息的顺序。消息按发送顺序传递。

    • 延迟:延迟消息。您可以指定延迟,使消息仅在延迟时间过去后才对消费者可用,而不是在生产时立即传递消息。

    • 事务:事务消息。Apache RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性。

消息队列

  • 定义:消息所属的队列。有关更多信息,请参阅 消息队列

  • 值:由代理指定和填充。

消息偏移量

  • 定义:当前消息在队列中存储的位置。有关更多信息,请参阅 工作机制

  • 值:由代理指定和填充。有效值:0 到 Long.Max。

消息 ID

  • 定义:消息的唯一标识符。每个消息的 ID 在集群中是全局唯一的。

  • 值:由生产者客户端自动生成。消息 ID 是一个由数字和大写字母组成的 32 个字符的字符串。

(可选) 消息键

  • 定义:消息的索引键列表。您可以配置不同的键来区分消息并快速查找消息。

  • 值:由生产者客户端定义。

(可选) 消息标签

  • 定义:用于过滤消息的标签。消费者可以通过标签过滤消息,只接收包含指定标签的消息。

  • 值:由生产者客户端定义。

  • 约束:每条消息只能指定一个标签。

(可选) 计划时间

  • 定义:消息在计划时间场景中触发延迟传递时使用的毫秒级时间戳。有关更多信息,请参阅 延迟消息

  • 值:由消息生产者定义。

  • 约束:最长持续时间为 40 天。

消息发送时间

  • 定义:生产者客户端发送消息时的本地毫秒级时间戳。

  • 值:由生产者客户端填充。

  • 注意:客户端时间可能与代理时间不同。在这种情况下,消息发送时间基于客户端时间。

消息存储时间戳

  • 定义:Apache RocketMQ 代理存储消息时的本地毫秒级时间戳。

    对于延迟消息和事务消息,消息保留时间是消息生效时显示给消费者的代理时间。

  • 值:由代理填充。

  • 注意:客户端时间可能与代理时间不同。在这种情况下,消息保留时间基于代理时间。

重试次数

  • 定义:消息无法消费后,Apache RocketMQ 代理重新传递消息的次数。每次重试后,最大重试次数都会增加 1。有关更多信息,请参阅 消费重试

  • 值:由代理标记。消息第一次被消费时,重试次数为零。消息第一次无法消费时,重试次数为 1。

消息的自定义属性

自定义属性

  • 定义:生产者可以指定的扩展信息。

  • 值:由生产者根据字符串中的键值对指定。

消息负载

  • 定义:服务消息的实际消息数据。

  • 值:由生产者序列化并以二进制字节形式传输。

  • 约束:请参阅 参数限制

行为约束

消息的大小不能超过上限。如果消息的大小超过了相应的上限,则消息无法发送。

以下是消息的默认限制

  • 消息最大大小:4 MB

使用说明

不建议对单个消息进行过载传输。

Apache RocketMQ 是一种消息中间件,用于传输业务事件的数据。如果消息的大小很大,网络传输层可能会过载。这会影响错误重试和节流。我们建议您限制单个消息事件的数据大小。

如果生产环境需要过载传输,我们建议您根据固定大小拆分消息,或使用文件存储方法。

消息的不可变性

在 Apache RocketMQ 代理版本 5.x 中,消息无法修改,消费者获取的消息是只读消息。版本 3.x 和 4.x 对不可变性没有强约束。如果您想传输消息,我们建议您重新初始化消息。

  • 正确示例

    Message m = Consumer.receive();
    Message m2= MessageBuilder.buildFrom(m);
    Producer.send(m2);
  • 错误示例:

    Message m = Consumer.receive();
    m.update();
    Producer.send(m);