跳到主要内容
版本: 5.0

消息

本节介绍 Apache RocketMQ 中消息的定义、模型关系、内部属性和行为约束,并提供消息的使用注意事项。

定义

消息是 Apache RocketMQ 中最小的数据传输单元。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到 Apache RocketMQ 代理服务器。然后,代理服务器根据相关语义将消息投递给消费者。

Apache RocketMQ 消息模型的特点是:

  • 不可变性:消息是已生成的事件。消息生成后,其内容不会更改。即使消息通过传输通道,其内容也保持不变。消费者获取的消息是只读消息。
  • 持久性:默认情况下,Apache RocketMQ 持久化消息。收到的消息存储在 Apache RocketMQ 代理服务器的存储文件中,以确保在系统发生故障时可以跟踪和恢复消息。

模型关系

下图显示了消息在 Apache RocketMQ 领域模型中的位置。消息

  1. 消息由生产者初始化并发送到 Apache RocketMQ 代理服务器。

  2. 消息按照在 Apache RocketMQ 代理服务器上接收的顺序存储在队列中。

  3. 消费者根据指定的订阅从 Apache RocketMQ 代理服务器获取和消费消息。

内部属性

系统保留属性

主题名称

  • 定义:消息所属主题的名称。主题名称在集群中是全局唯一的。更多信息,请参见主题

  • 值:从客户端 SDK 获取。

消息类型

  • 定义:消息的类型。

  • 值:从客户端 SDK 获取。Apache RocketMQ 支持以下消息类型:

    • 普通消息:普通消息。普通消息不需要特殊语义,也不与其他普通消息关联。

    • 顺序消息:顺序消息。Apache RocketMQ 使用消息组来确定一组指定消息的顺序。消息按照发送顺序投递。

    • 延时消息:延时消息。您可以指定一个延时,使消息在延时结束后才可供消费者消费,而不是在生成时立即投递。

    • 事务消息:事务消息。Apache RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性。

消息队列

  • 定义:消息所属的队列。更多信息,请参见消息队列

  • 值:由代理服务器指定和填充。

消息偏移量

  • 定义:当前消息在队列中的存储位置。更多信息,请参见工作机制

  • 值:由代理服务器指定和填充。有效值:0 到 Long.Max。

消息 ID

  • 定义:消息的唯一标识符。每个消息的 ID 在集群中是全局唯一的。

  • 值:由生产者客户端自动生成。消息 ID 是一个由数字和大写字母组成的32个字符的字符串。

(可选)消息键

  • 定义:消息的索引键列表。您可以配置不同的键来区分消息并快速查找消息。

  • 值:由生产者客户端定义。

(可选)消息标签

  • 定义:用于过滤消息的标签。消费者可以通过标签过滤消息,只接收包含指定标签的消息。

  • 值:由生产者客户端定义。

  • 约束:每条消息只能指定一个标签。

(可选)预定时间

  • 定义:消息在预定时间场景中触发延时投递时使用的毫秒级时间戳。更多信息,请参见延时消息

  • 值:由消息生产者定义。

  • 约束:最大持续时间为40天。

消息发送时间

  • 定义:消息发送时生产者客户端的本地毫秒级时间戳。

  • 值:由生产者客户端填充。

  • 注意:客户端时间可能与代理服务器时间不同。在这种情况下,消息发送时间以客户端时间为准。

消息存储时间戳

  • 定义:消息存储时 Apache RocketMQ 代理服务器的本地毫秒级时间戳。

    对于延时消息和事务消息,消息生效时,消息保留时间为消费者看到的代理服务器时间。

  • 值:由代理服务器填充。

  • 注意:客户端时间可能与代理服务器时间不同。在这种情况下,消息保留时间以代理服务器时间为准。

重试次数

  • 定义:消息消费失败后,Apache RocketMQ 代理服务器重新投递消息的次数。每次重试后,最大重试次数增加一次。更多信息,请参见消费重试

  • 值:由代理服务器标记。首次消费消息时,重试次数为零。首次消费消息失败时,重试次数为一。

消息自定义属性

自定义属性

  • 定义:生产者可以指定的扩展信息。

  • 值:由生产者根据字符串的键值对指定。

消息负载

  • 定义:服务消息的实际消息数据。

  • 值:由生产者序列化并以二进制字节传输。

  • 约束:参见参数限制

行为约束

消息大小不能超过上限。如果消息大小超过相应的上限,则消息发送失败。

以下描述了消息的默认限制:

  • 消息最大大小:4 MB

使用注意事项

不建议单个消息进行超大负载传输。

Apache RocketMQ 是一种用于传输业务事件数据的消息中间件。如果消息大小过大,可能会导致网络传输层过载。这会影响错误重试和流量控制。建议您限制单个消息事件的数据大小。

如果生产环境需要超大负载传输,建议您根据固定大小拆分消息或使用文件存储方式。

消息的不可变性

在 Apache RocketMQ 代理服务器 5.x 版本中,消息不可修改,消费者获取的消息是只读消息。在 3.x 和 4.x 版本中,对不可变性没有强制约束。如果您想传输消息,建议您重新初始化消息。

  • 正确示例

    Message m = Consumer.receive();
    Message m2= MessageBuilder.buildFrom(m);
    Producer.send(m2);
  • 错误示例:

    Message m = Consumer.receive();
    m.update();
    Producer.send(m);