消息 (Message)
本节介绍 Apache RocketMQ 中消息的定义、模型关系、内部属性和行为约束。本主题还提供了有关消息的使用说明。
定义
消息是 Apache RocketMQ 中数据传输的最小单元。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到 Apache RocketMQ Broker。随后,Broker 会根据相关语义将消息分发给消费者。
Apache RocketMQ 中消息模型的特征为:
- 不可变性:消息一旦产生即成为一个事件。在消息生成后,其内容不会发生改变。即使消息经过传输通道,其内容也保持不变。消费者获取的消息均为只读消息。
- 持久性:默认情况下,Apache RocketMQ 会对消息进行持久化处理。接收到的消息存储在 Apache RocketMQ Broker 的存储文件中,以确保在系统故障时可以对消息进行追踪和恢复。
模型关系
下图显示了消息在 Apache RocketMQ 领域模型中的位置。
消息由生产者初始化,并发送至 Apache RocketMQ Broker。
消息在 Apache RocketMQ Broker 上按照接收顺序存储在队列中。
消费者根据指定的订阅关系从 Apache RocketMQ Broker 获取并消费消息。
内部属性
系统保留属性
主题名称
定义:消息所属的主题名称。主题名称在集群中全局唯一。更多信息,请参见 主题。
取值:从客户端 SDK 获取。
消息类型
定义:消息的类型。
取值:从客户端 SDK 获取。Apache RocketMQ 支持以下消息类型:
消息队列
定义:消息所属的队列。更多信息,请参见 消息队列。
取值:由 Broker 指定并填充。
消息位点 (Message Offset)
定义:当前消息在队列中的存储位置。更多信息,请参见 工作机制。
取值:由 Broker 指定并填充。有效值:0 到 Long.Max。
消息 ID
定义:消息的唯一标识符。每条消息的 ID 在集群中全局唯一。
取值:由生产者客户端自动生成。消息 ID 是一个由 32 个字符组成的字符串,包含数字和大写字母。
(可选)消息 Key
定义:消息的索引键列表。您可以配置不同的 Key 来区分消息并快速查找消息。
取值:由生产者客户端定义。
(可选)消息 Tag
定义:用于过滤消息的标签。消费者可以通过标签过滤消息,仅接收包含指定标签的消息。
取值:由生产者客户端定义。
约束:每条消息只能指定一个标签。
(可选)定时时间
定义:在定时场景中,消息触发延迟投递时所使用的毫秒级时间戳。更多信息,请参见 延迟消息。
取值:由消息生产者定义。
约束:最长持续时间为 40 天。
消息发送时间
定义:消息发送时,生产者客户端的本地毫秒级时间戳。
取值:由生产者客户端填充。
注意:客户端时间可能与 Broker 时间不同。在这种情况下,消息发送时间以客户端时间为准。
消息存储时间
定义:消息存储时,Apache RocketMQ Broker 的本地毫秒级时间戳。
对于延迟消息和事务消息,消息保留时间是指消息生效时展示给消费者的 Broker 时间。
取值:由 Broker 填充。
注意:客户端时间可能与 Broker 时间不同。在这种情况下,消息保留时间以 Broker 时间为准。
重试次数
定义:消息消费失败后,Apache RocketMQ Broker 重新投递消息的次数。每次重试后,最大重试次数加一。更多信息,请参见 消费重试。
取值:由 Broker 标记。第一次消费消息时,重试次数为零。第一次消费失败时,重试次数为一。
消息的自定义属性
自定义属性
定义:生产者可以指定的扩展信息。
取值:由生产者基于字符串键值对进行指定。
消息负载 (Message Payload)
定义:业务消息的实际数据。
取值:由生产者序列化并以二进制字节流进行传输。
约束:请参见 参数限制。
行为约束
消息大小不能超过上限。如果消息大小超过相应的上限,消息将发送失败。
以下描述了消息的默认限制:
- 最大消息大小:4 MB
使用说明
不建议单条消息进行超大负载传输。
Apache RocketMQ 是一种用于传输业务事件数据的消息中间件。如果消息过大,网络传输层可能会过载。这会影响错误重试和限流机制。建议限制单条消息事件的数据大小。
如果生产环境确实需要传输大数据,建议根据固定大小对消息进行拆分,或采用文件存储方式。
消息的不可变性
在 Apache RocketMQ Broker 5.x 版本中,消息无法被修改,消费者获取的消息均为只读消息。3.x 和 4.x 版本未实施有关不可变性的强制约束。建议若需传输消息,请重新初始化消息。
正确示例
Message m = Consumer.receive();
Message m2= MessageBuilder.buildFrom(m);
Producer.send(m2);
错误示例:
Message m = Consumer.receive();
m.update();
Producer.send(m);