跳到主要内容
版本: 5.0

生产者

本节描述了 Apache RocketMQ 中生产者的概念。它还描述了生产者在消息模型中的作用、生产者属性和兼容性,以及使用生产者的一些注意事项。

定义

Apache RocketMQ 中的生产者是一个功能性消息实体,负责创建消息并将其发送到服务器。

生产者通常集成在业务系统中,用于将数据封装为 Apache RocketMQ 中的消息并发送到服务器。有关消息的更多信息,请参阅消息

生产者端定义了以下消息投递元素:

  • 传输模式:生产者可以在 API 操作中指定消息传输模式。Apache RocketMQ 支持同步传输和异步传输。

  • 批量传输:生产者可以在 API 操作中指定批量传输。例如,可以指定一次发送的消息数量或大小。

  • 事务行为:Apache RocketMQ 支持事务消息。生产者参与事务检查以确保事务的最终一致性。有关更多信息,请参阅事务消息

生产者和 Topic 具有 N 对 N 的关系。一个生产者可以将消息发送到多个 Topic,一个 Topic 可以接收来自多个生产者的消息。这种多对多关系有助于性能扩展和灾难恢复。生产者与 Topic

模型关系

下图显示了生产者在 Apache RocketMQ 消息模型中的作用。生产者

  1. 消息由生产者初始化并发送到 Apache RocketMQ 服务器。

  2. 消息按照到达 Apache RocketMQ 服务器的顺序存储在 Topic 的指定队列中。

  3. 消费者根据指定的订阅关系从 Apache RocketMQ 服务器获取并消费消息。

内部属性

客户端ID

  • 定义:生产者客户端的身份标识。此属性用于区分不同的生产者。客户端ID在集群中是全局唯一的。

  • 值:客户端ID由 Apache RocketMQ SDK 自动生成。它主要用于日志查看和问题定位等运维目的。客户端ID不能被修改。

通信参数

  • (必需):用于连接服务器的端点。此端点用于标识集群。

    访问点必须按照格式配置。我们建议您使用域名,以避免使用IP地址,防止节点变更导致热点迁移失败。

  • (可选):客户端用于认证的凭证。

    仅当服务器启用身份识别和认证时才需要传输。

  • 请求超时 (可选):网络请求的超时时间。有关值范围和默认值的更多信息,请参阅参数限制

预绑定 Topic 列表

  • 定义:Apache RocketMQ 生产者发送消息的 Topic 列表。预绑定 Topic 提供以下好处:

    • 事务消息 (必需):事务消息必须指定预绑定 Topic 列表属性。在事务消息场景中,当生产者从故障中恢复或重启时,生产者会检查事务消息 Topic 是否包含未提交的事务消息。这可以防止生产者发送新消息到 Topic 后,由于 Topic 中未提交的事务消息导致延迟。

    • 非事务消息 (可选):服务器在生产者初始化期间根据预绑定 Topic 列表检查目标 Topic 的访问权限和有效性,而不是在应用程序启动后才执行检查。我们建议您为非事务消息指定预绑定 Topic 列表属性。

      如果未为非事务消息指定预绑定 Topic 列表属性或目标 Topic 已更改,Apache RocketMQ 会动态检查并识别目标 Topic。

  • 限制:对于事务消息,必须指定预绑定 Topic 并与事务检查器一起使用。

事务检查器

  • Apache RocketMQ 使用事务消息机制,要求生产者实现事务检查器以确保事务的最终一致性。有关更多信息,请参阅事务消息

  • 当生产者发送事务消息时,必须配置事务检查器并与预绑定 Topic 一起使用。

发送重试策略

发送重试策略指定了生产者在消息投递失败后如何重试消息投递。有关更多信息,请参阅消息发送重试

版本兼容性

从 Apache RocketMQ 5.x 版本开始,生产者是匿名的,并且生产组已停用。对于 Apache RocketMQ 3.x 和 4.x 版本,现有生产组可以停用,不影响您的业务。

使用注意事项

我们建议您限制单个进程上的生产者数量。

在 Apache RocketMQ 中,生产者和 Topic 提供多对多的通信形式。单个生产者可以将消息发送到多个 Topic。我们建议您创建并初始化业务场景所需的最小数量的生产者,并尽可能地重用生产者。例如,在需要向多个 Topic 投递消息的场景中,您无需为每个 Topic 创建一个生产者。

我们建议您不要定期创建和销毁生产者。

Apache RocketMQ 的生产者是底层可重用的资源,类似于数据库的连接池。您无需每次发送消息时都创建生产者,也无需在发送消息后销毁生产者。如果您定期创建和销毁生产者,会在 Broker 上产生大量的短连接请求。这会给您的系统带来高负载。

  • 正确用法示例

      Producer p = ProducerBuilder.build();
    for (int i =0;i<n;i++)
    {
    Message m= MessageBuilder.build();
    p.send(m);
    }
    p.shutdown();
  • 错误用法示例

      for (int i =0;i<n;i++)
    {
    Producer p = ProducerBuilder.build();
    Message m= MessageBuilder.build();
    p.send(m);
    p.shutdown();
    }