跳转至主要内容
版本: 5.0

消费者 (Consumer)

本节介绍了 Apache RocketMQ 中消费者的定义、模型关系、内部属性和行为约束。此外,还提供了消费者相关的版本兼容性信息和使用注意事项。

定义

消费者是 Apache RocketMQ 中接收并处理消息的实体。

消费者通常集成在业务系统中。它们从 Apache RocketMQ Broker 获取消息,并将消息转换为业务逻辑可感知和处理的信息。

以下因素决定了消费者的行为:

  • 消费者身份:消费者必须关联一个消费者组,以获取行为设置和消费状态。

  • 消费者类型:Apache RocketMQ 针对不同的开发场景提供了多种消费者类型,包括推模式消费者(Push Consumer)、简单消费者(Simple Consumer)和拉模式消费者(Pull Consumer)。更多信息,请参见 消费者类型

  • 消费者本地设置:这些设置指定了消费者客户端如何根据消费者类型进行运行。例如,您可以配置消费者上的线程数和并发设置,以实现不同的传输效果。

模型关系

下图展示了消费者在 Apache RocketMQ 领域模型中的位置。Consumers

  1. 消息由生产者初始化并发送到 Apache RocketMQ 服务端。

  2. 消息按到达 Apache RocketMQ 服务端的顺序存储在主题的指定队列中。

  3. 消费者根据指定的订阅关系从 Apache RocketMQ 服务端获取并消费消息。

内部属性

消费者组名称

  • 定义:当前消费者关联的消费者组名称。消费者从消费者组继承其行为。更多信息,请参见 消费者组

  • 取值:消费者组是 Apache RocketMQ{#product-name}

    的逻辑资源。您必须提前使用控制台或调用 API 操作来创建消费者组。有关此操作限制的更多信息,请参见 参数限制

客户端 ID (Client ID)

  • 定义:消费者客户端的标识。此属性用于区分不同的消费者。该值在集群内必须是唯一的。

  • 取值:客户端 ID 由 Apache RocketMQ SDK 自动生成。它主要用于日志查看和问题定位等运维目的。客户端 ID 不可修改。

通信参数

  • 接入点 (Endpoints) (必选):用于连接服务器的接入点。此接入点用于标识集群。

    接入点必须按格式进行配置。建议使用域名,避免使用 IP 地址,以防节点变更时无法进行热点迁移。

  • 凭证 (Credential) (可选):客户端用于身份验证的凭证。

    仅当服务器启用了身份识别和认证时,才需要传输此项。

  • 请求超时时间 (Request Timeout) (可选):网络请求的超时时间。有关取值范围和默认值的更多信息,请参见 参数限制

预绑定订阅列表

  • 定义:指定消费者的订阅列表。Apache RocketMQ Broker 可以利用预绑定订阅列表在消费者初始化时(而非应用程序启动后)验证所订阅 Topic 的权限和有效性。

  • 取值:建议在消费者初始化时指定订阅或已订阅 Topic 的列表。如果未指定订阅或变更了已订阅的 Topic,Apache RocketMQ 会动态验证这些 Topic。

消息监听器 (Message Listener)

  • 定义:消费者在 Apache RocketMQ Broker 将消息推送给消费者后,用于调用消息消费逻辑的监听器。

  • 取值:消息监听器的值在消费者客户端上进行配置。

  • 约束:当您以推模式消费者身份消费消息时,必须在消费者客户端配置消息监听器。有关消费者类型的更多信息,请参见 消费者类型

行为约束

在 Apache RocketMQ 领域模型中,消费者管理通过消费者分组实现,同一组内的消费者共享消息进行消费。因此,为确保组内消息的正常负载和消费,Apache RocketMQ 要求同一组内的所有消费者保持以下消费行为一致:

  • 投递顺序

  • 消费重试策略

版本兼容性

如“行为约束”所述,同一组内所有消费者的投递顺序和消费重试策略需要保持一致。

  • Apache RocketMQ 服务端 5.x 版本:上述消费行为均从关联的消费者组获取。因此,同一组内所有消费者的消费行为必须保持一致,客户端无需特别关注。

  • Apache RocketMQ 服务端 3.x/4.x 旧版本:上述消费逻辑由消费者客户端接口定义。因此,在设置消费者客户端时,必须确保同一组内消费者的消费行为一致。

如果您使用的是 Apache RocketMQ 服务端 5.x 版本,但客户端使用的是旧版本 SDK,则消费者的消费逻辑需遵循消费者客户端接口的设置。

使用说明

建议限制单个进程中的消费者数量。

Apache RocketMQ 的消费者在通信协议层面支持非阻塞传输模式。该模式具有更高的通信效率,并支持多线程并发访问。因此,在大多数场景下,单个进程中仅需为一个消费者组初始化一个消费者即可。在开发阶段,请避免使用相同的配置初始化多个消费者。

建议不要频繁创建和销毁消费者。

Apache RocketMQ 的消费者是底层资源,可以像数据库连接池一样重复使用。您无需在每次接收消息时创建消费者,也无需在消费消息后销毁它们。如果频繁创建和销毁消费者,Broker 上会产生大量的短连接请求,这将给您的系统带来沉重的负载。

  • 正确示例

    Consumer c = ConsumerBuilder.build();
    for (int i =0;i<n;i++)
    {
    Message m= c.receive();
    //process message
    }
    c.shutdown();
  • 错误示例

    for (int i =0;i<n;i++)
    {
    Consumer c = ConsumerBuilder.build();
    Message m= c.receive();
    //process message
    c.shutdown();
    }