跳转至主要内容
版本: 5.0

生产者 (Producer)

本节介绍 Apache RocketMQ 中生产者的概念。同时还描述了生产者在消息模型中的角色、生产者属性与兼容性,以及使用生产者时的一些注意事项。

定义

Apache RocketMQ 中的生产者是一种功能性消息实体,用于创建消息并将其发送到服务器。

生产者通常集成在业务系统中,负责将数据封装为 Apache RocketMQ 消息并发送到服务器。有关消息的更多信息,请参阅 消息

生产者端定义了以下消息投递要素:

  • 传输模式:生产者可以在 API 操作中指定消息传输模式。Apache RocketMQ 支持同步传输和异步传输。

  • 批量传输:生产者可以在 API 操作中指定批量传输。例如,可以指定一次发送的消息数量或大小。

  • 事务行为:Apache RocketMQ 支持事务消息。生产者参与事务检查以确保事务的最终一致性。有关详细信息,请参阅 事务消息

生产者与主题之间存在多对多关系。一个生产者可以向多个主题发送消息,一个主题也可以接收来自多个生产者的消息。这种多对多关系有助于实现性能扩展和灾难恢复。Producers and topics

模型关系

下图展示了生产者在 Apache RocketMQ 消息模型中的角色。Producer

  1. 消息由生产者初始化并发送到 Apache RocketMQ 服务端。

  2. 消息按到达 Apache RocketMQ 服务端的顺序存储在主题的指定队列中。

  3. 消费者根据指定的订阅关系从 Apache RocketMQ 服务端获取并消费消息。

内部属性

客户端 ID

  • 定义:生产者客户端的标识。此属性用于区分不同的生产者。客户端 ID 在集群内是全局唯一的。

  • 值:客户端 ID 由 Apache RocketMQ SDK 自动生成。它主要用于运维目的,如查看日志和定位问题。客户端 ID 不可修改。

通信参数

  • (必需):用于连接服务器的端点。此端点用于标识集群。

    接入点必须按格式配置。我们建议您使用域名,以避免使用 IP 地址,从而防止因节点变更导致无法进行热点迁移。

  • (可选):客户端用于身份验证的凭证。

    仅当服务器启用了身份识别和认证时,才需要传输此项。

  • 请求超时时间 (可选):网络请求的超时周期。有关值范围和默认值的详细信息,请参阅 参数限制

预绑定主题列表

  • 定义:Apache RocketMQ 生产者发送消息的目标主题列表。预绑定主题提供以下优势:

    • 事务消息 (必需):事务消息必须指定预绑定主题列表属性。在事务消息场景中,当生产者从故障中恢复或重启时,生产者会检查事务消息主题是否包含未提交的事务消息。这可以防止生产者向主题发送新消息后,主题中残留的未提交事务消息导致延迟。

    • 非事务消息 (可选):服务器会在生产者初始化期间根据预绑定主题列表检查目标主题的访问权限和有效性,而不是在应用程序启动后再进行检查。我们建议您为非事务消息也指定预绑定主题列表属性。

      如果未为非事务消息指定预绑定主题列表属性,或者目标主题发生变更,Apache RocketMQ 会动态检查并识别目标主题。

  • 限制:对于事务消息,必须指定预绑定主题并与事务检查器配合使用。

事务检查器

  • Apache RocketMQ 使用一种事务消息机制,要求生产者实现事务检查器以确保事务的最终一致性。有关详细信息,请参阅 事务消息

  • 当生产者发送事务消息时,必须配置事务检查器并将其与预绑定主题结合使用。

发送重试策略

发送重试策略指定了生产者在消息发送失败时如何重试投递。有关详细信息,请参阅 消息发送重试

版本兼容性

从 Apache RocketMQ 5.x 版本开始,生产者是匿名的,且已停止使用生产者组(Producer Group)。对于 Apache RocketMQ 3.x 和 4.x 版本,现有的生产者组可以停止使用,且不会影响您的业务。

使用说明

我们建议您限制单个进程中的生产者数量。

在 Apache RocketMQ 中,生产者和主题提供多对多的通信形式。单个生产者可以向多个主题发送消息。我们建议您创建并初始化业务场景所需的最少数量的生产者,并尽可能复用生产者。例如,在需要向多个主题发送消息的场景中,您无需为每个主题创建一个生产者。

我们建议您不要定期创建和销毁生产者。

Apache RocketMQ 的生产者是底层资源,可以像数据库连接池一样复用。您无需在每次发送消息时创建生产者,也无需在发送消息后销毁它们。如果您定期创建和销毁生产者,会在 Broker 上产生大量短连接请求,这会给您的系统带来沉重的负担。

  • 正确用法示例

      Producer p = ProducerBuilder.build();
    for (int i =0;i<n;i++)
    {
    Message m= MessageBuilder.build();
    p.send(m);
    }
    p.shutdown();
  • 错误用法示例

      for (int i =0;i<n;i++)
    {
    Producer p = ProducerBuilder.build();
    Message m= MessageBuilder.build();
    p.send(m);
    p.shutdown();
    }