跳到主要内容
版本:5.0

消费者

本节介绍 Apache RocketMQ 中消费者的定义、模型关系、内部属性和行为约束。本主题还提供消费者的版本兼容性信息和使用注意事项。

定义

消费者是 Apache RocketMQ 中接收和处理消息的实体。

消费者通常集成在业务系统中。它们从 Apache RocketMQ 代理获取消息,并将消息转换为业务逻辑可以感知和处理的信息。

以下项决定了消费者行为:

  • 消费者身份:消费者必须与一个消费组关联,以获取行为设置和消费状态。

  • 消费者类型:Apache RocketMQ 为不同的开发场景提供了多种消费者类型,包括推模式消费者、简单消费者和拉模式消费者。更多信息,请参见消费者类型

  • 消费者本地设置:这些设置根据消费者类型指定消费者客户端的运行方式。例如,您可以在消费者上配置线程数和并发设置,以实现不同的传输效果。

模型关系

下图展示了消费者在 Apache RocketMQ 领域模型中的位置。消费者

  1. 消息由生产者初始化并发送到 Apache RocketMQ 服务器。

  2. 消息按照到达 Apache RocketMQ 服务器的顺序存储在 Topic 的指定队列中。

  3. 消费者根据指定的订阅关系从 Apache RocketMQ 服务器获取并消费消息。

内部属性

消费组名称

  • 定义:与当前消费者关联的消费组名称。消费者从消费组继承其行为。更多信息,请参见消费组

  • 值:消费组是 Apache RocketMQ{#product-name} 的逻辑资源。

    您必须通过控制台或提前调用 API 操作来创建消费组。有关此操作限制的更多信息,请参见参数限制

客户端 ID

  • 定义:消费者客户端的身份。此属性用于区分不同的消费者。该值在集群内必须唯一。

  • 值:客户端 ID 由 Apache RocketMQ SDK 自动生成。它主要用于日志查看、问题定位等运维目的。客户端 ID 不能修改。

通信参数

  • Endpoint (必填) :用于连接服务器的端点。此端点用于标识集群。

    访问点必须以某种格式配置。我们建议您使用域名而不是 IP 地址,以防止节点更改导致热点迁移失败。

  • Credential (可选) :客户端用于认证的凭据。

    只有在服务器上启用身份识别和认证时,才需要传输。

  • 请求超时 (可选) :网络请求的超时时间。有关值范围和默认值的更多信息,请参见参数限制

预绑定订阅列表

  • 定义:指定消费者的订阅列表。Apache RocketMQ 代理可以在消费者初始化期间(而不是应用程序启动后)使用预绑定订阅列表来验证所订阅 Topic 的权限和有效性。

  • 值:我们建议您在消费者初始化期间指定订阅或订阅 Topic 的列表。如果未指定订阅或订阅 Topic 发生更改,Apache RocketMQ 将动态验证 Topic。

消息监听器

  • 定义:Apache RocketMQ 代理将消息推送到消费者后,消费者用于调用消息消费逻辑的监听器。

  • 值:消息监听器的值在消费者客户端上配置。

  • 约束:当您作为推模式消费者消费消息时,必须在消费者客户端配置消息监听器。有关消费者类型的更多信息,请参见消费者类型

行为约束

在 Apache RocketMQ 领域模型中,消费者管理通过消费组实现,同一组中的消费者共享消息进行消费。因此,为了确保组内消息的正常负载和消费,Apache RocketMQ 要求同一组中的所有消费者保持以下消费行为一致:

  • 投递顺序

  • 消费重试策略

版本兼容性

如行为约束中所述,同一组中所有消费者的投递顺序和消费重试策略需要保持一致。

  • Apache RocketMQ 服务器 5.x 版本:前述消费者的消费行为从关联的消费组中获取。因此,同一组中所有消费者的消费行为必须一致,客户端无需关注。

  • Apache RocketMQ 服务器 3.x/ 4.x 历史版本:前述消费逻辑由消费者客户端接口定义。因此,在设置消费者客户端时,您必须确保同一组中消费者的消费行为一致。

如果您使用 Apache RocketMQ 服务器 5.x 版本,且客户端使用以前版本的 SDK,则消费者的消费逻辑将受消费者客户端接口设置的约束。

使用注意事项

我们建议您限制单个进程上的消费者数量。

Apache RocketMQ 的消费者在通信协议层面支持非阻塞传输模式。非阻塞传输模式具有更高的通信效率,并支持多线程并发访问。因此,在大多数场景下,单个进程中一个消费组只需初始化一个消费者。在开发阶段,请避免初始化多个具有相同配置的消费者。

我们建议您不要定期创建和销毁消费者。

Apache RocketMQ 的消费者是可复用的底层资源,类似于数据库连接池。您无需在每次接收消息时创建消费者,也无需在消费消息后销毁消费者。如果您定期创建和销毁消费者,代理上会生成大量短连接请求。这将对您的系统造成高负载。

  • 正确示例

    Consumer c = ConsumerBuilder.build();
    for (int i =0;i<n;i++)
    {
    Message m= c.receive();
    //process message
    }
    c.shutdown();
  • 错误示例

    for (int i =0;i<n;i++)
    {
    Consumer c = ConsumerBuilder.build();
    Message m= c.receive();
    //process message
    c.shutdown();
    }