跳至主要内容
版本: 5.0

生产者

本节介绍 Apache RocketMQ 中生产者的概念。它还描述了生产者在消息模型中的作用、生产者属性和兼容性,以及使用生产者的注意事项。

定义

Apache RocketMQ 中的生产者是一个功能性消息实体,它创建消息并将其发送到服务器。

生产者通常集成在业务系统中,用于将数据封装为 Apache RocketMQ 中的消息,并将消息发送到服务器。有关消息的更多信息,请参阅 消息

以下消息传递元素在生产者端定义

  • 传输模式:生产者可以在 API 操作中指定消息传输模式。Apache RocketMQ 支持同步传输和异步传输。

  • 批量传输:生产者可以在 API 操作中指定批量传输。例如,可以指定一次发送的消息数量或大小。

  • 事务行为:Apache RocketMQ 支持事务消息。生产者参与事务检查,以确保事务的最终一致性。有关更多信息,请参阅 事务消息

生产者和主题之间存在多对多关系。一个生产者可以向多个主题发送消息,一个主题可以接收来自多个生产者的消息。这种多对多关系有助于提高性能扩展和灾难恢复能力。 生产者和主题

模型关系

下图显示了生产者在 Apache RocketMQ 消息模型中的作用。 生产者

  1. 消息由生产者初始化并发送到 Apache RocketMQ 服务器。

  2. 消息按到达 Apache RocketMQ 服务器的顺序存储在主题的指定队列中。

  3. 消费者根据指定的订阅关系从 Apache RocketMQ 服务器获取并消费消息。

内部属性

客户端 ID

  • 定义:生产者客户端的身份。此属性用于区分不同的生产者。客户端 ID 在集群中是全局唯一的。

  • 值:客户端 ID 由 Apache RocketMQ SDK 自动生成。它主要用于 O\&M 目的,例如查看日志和定位问题。客户端 ID 不可修改。

通信参数

  • (必需) : 用于连接到服务器的端点。此端点用于标识集群。

    访问点必须以格式配置。我们建议您使用域名,避免使用 IP 地址,以防止节点更改导致热点迁移失败。

  • (可选) : 客户端用于身份验证的凭据。

    仅当服务器上启用了身份识别和身份验证时才需要传输。

  • 请求超时 (可选) : 网络请求的超时时间。有关值范围和默认值的更多信息,请参阅 参数限制

预绑定主题列表

  • 定义:Apache RocketMQ 生产者发送消息的主题列表。预绑定主题提供以下好处

    • 事务消息 (必需): 事务消息必须指定预绑定主题列表属性。在事务消息场景中,当生产者从故障中恢复或重新启动时,生产者会检查事务消息主题是否包含未提交的事务消息。这可以防止生产者向主题发送新消息后,主题中未提交的事务消息导致延迟。

    • 非事务消息 (可选): 服务器在生产者初始化期间根据预绑定主题列表检查目标主题的访问权限和有效性,而不是在应用程序启动后执行检查。我们建议您为非事务消息指定预绑定主题列表属性。

      如果未为非事务消息指定预绑定主题列表属性或更改了目标主题,Apache RocketMQ 会动态检查和识别目标主题。

  • 限制:对于事务消息,必须指定预绑定主题,并与事务检查器一起使用。

事务检查器

  • Apache RocketMQ 使用事务消息机制,要求生产者实现事务检查器,以确保事务的最终一致性。有关更多信息,请参阅 事务消息

  • 当生产者发送事务消息时,必须配置事务检查器,并与预绑定主题一起使用。

发送重试策略

发送重试策略指定生产者在消息传递尝试失败时如何重试消息传递。有关更多信息,请参阅 消息发送重试

版本兼容性

从 Apache RocketMQ 5.x 版本开始,生产者是匿名的,生产者组已停用。对于 Apache RocketMQ 3.x 和 4.x 版本,可以停用现有的生产者组,而不会影响您的业务。

使用注意事项

我们建议您限制单个进程上的生产者数量。

在 Apache RocketMQ 中,生产者和主题提供了一种多对多的通信形式。单个生产者可以向多个主题发送消息。我们建议您创建和初始化业务场景所需的最小数量的生产者,并尽可能地重用生产者。例如,在需要向多个主题传递消息的场景中,您不需要为每个主题创建一个生产者。

我们建议您不要定期创建和销毁生产者。

Apache RocketMQ 的生产者是底层资源,可以像数据库的连接池一样重用。您不需要在每次发送消息时创建生产者,也不需要在发送消息后销毁生产者。如果您定期创建和销毁生产者,会在代理上生成大量短连接请求。这会给您的系统带来很大的负载。

  • 正确使用示例

      Producer p = ProducerBuilder.build();
    for (int i =0;i<n;i++)
    {
    Message m= MessageBuilder.build();
    p.send(m);
    }
    p.shutdown();
  • 错误使用示例

      for (int i =0;i<n;i++)
    {
    Producer p = ProducerBuilder.build();
    Message m= MessageBuilder.build();
    p.send(m);
    p.shutdown();
    }