消费者 (Consumer)
本节介绍了 Apache RocketMQ 中消费者的定义、模型关系、内部属性和行为约束。此外,还提供了消费者相关的版本兼容性信息和使用注意事项。
定义
消费者是 Apache RocketMQ 中接收并处理消息的实体。
消费者通常集成在业务系统中。它们从 Apache RocketMQ Broker 获取消息,并将消息转换为业务逻辑可感知和处理的信息。
以下因素决定了消费者的行为:
消费者身份:消费者必须关联一个消费者组,以获取行为设置和消费状态。
消费者类型:Apache RocketMQ 针对不同的开发场景提供了多种消费者类型,包括推模式消费者(Push Consumer)、简单消费者(Simple Consumer)和拉模式消费者(Pull Consumer)。更多信息,请参见 消费者类型。
消费者本地设置:这些设置指定了消费者客户端如何根据消费者类型进行运行。例如,您可以配置消费者上的线程数和并发设置,以实现不同的传输效果。
模型关系
下图展示了消费者在 Apache RocketMQ 领域模型中的位置。
消息由生产者初始化并发送到 Apache RocketMQ 服务端。
消息按到达 Apache RocketMQ 服务端的顺序存储在主题的指定队列中。
消费者根据指定的订阅关系从 Apache RocketMQ 服务端获取并消费消息。
内部属性
消费者组名称
定义:当前消费者关联的消费者组名称。消费者从消费者组继承其行为。更多信息,请参见 消费者组。
取值:消费者组是 Apache RocketMQ{#product-name}
的逻辑资源。您必须提前使用控制台或调用 API 操作来创建消费者组。有关此操作限制的更多信息,请参见 参数限制。
客户端 ID (Client ID)
定义:消费者客户端的标识。此属性用于区分不同的消费者。该值在集群内必须是唯一的。
取值:客户端 ID 由 Apache RocketMQ SDK 自动生成。它主要用于日志查看和问题定位等运维目的。客户端 ID 不可修改。
通信参数
接入点 (Endpoints) (必选):用于连接服务器的接入点。此接入点用于标识集群。
接入点必须按格式进行配置。建议使用域名,避免使用 IP 地址,以防节点变更时无法进行热点迁移。
凭证 (Credential) (可选):客户端用于身份验证的凭证。
仅当服务器启用了身份识别和认证时,才需要传输此项。
- 请求超时时间 (Request Timeout) (可选):网络请求的超时时间。有关取值范围和默认值的更多信息,请参见 参数限制。
预绑定订阅列表
定义:指定消费者的订阅列表。Apache RocketMQ Broker 可以利用预绑定订阅列表在消费者初始化时(而非应用程序启动后)验证所订阅 Topic 的权限和有效性。
取值:建议在消费者初始化时指定订阅或已订阅 Topic 的列表。如果未指定订阅或变更了已订阅的 Topic,Apache RocketMQ 会动态验证这些 Topic。
消息监听器 (Message Listener)
定义:消费者在 Apache RocketMQ Broker 将消息推送给消费者后,用于调用消息消费逻辑的监听器。
取值:消息监听器的值在消费者客户端上进行配置。
约束:当您以推模式消费者身份消费消息时,必须在消费者客户端配置消息监听器。有关消费者类型的更多信息,请参见 消费者类型。
行为约束
在 Apache RocketMQ 领域模型中,消费者管理通过消费者分组实现,同一组内的消费者共享消息进行消费。因此,为确保组内消息的正常负载和消费,Apache RocketMQ 要求同一组内的所有消费者保持以下消费行为一致:
投递顺序
消费重试策略
版本兼容性
如“行为约束”所述,同一组内所有消费者的投递顺序和消费重试策略需要保持一致。
Apache RocketMQ 服务端 5.x 版本:上述消费行为均从关联的消费者组获取。因此,同一组内所有消费者的消费行为必须保持一致,客户端无需特别关注。
Apache RocketMQ 服务端 3.x/4.x 旧版本:上述消费逻辑由消费者客户端接口定义。因此,在设置消费者客户端时,必须确保同一组内消费者的消费行为一致。
如果您使用的是 Apache RocketMQ 服务端 5.x 版本,但客户端使用的是旧版本 SDK,则消费者的消费逻辑需遵循消费者客户端接口的设置。
使用说明
建议限制单个进程中的消费者数量。
Apache RocketMQ 的消费者在通信协议层面支持非阻塞传输模式。该模式具有更高的通信效率,并支持多线程并发访问。因此,在大多数场景下,单个进程中仅需为一个消费者组初始化一个消费者即可。在开发阶段,请避免使用相同的配置初始化多个消费者。
建议不要频繁创建和销毁消费者。
Apache RocketMQ 的消费者是底层资源,可以像数据库连接池一样重复使用。您无需在每次接收消息时创建消费者,也无需在消费消息后销毁它们。如果频繁创建和销毁消费者,Broker 上会产生大量的短连接请求,这将给您的系统带来沉重的负载。
正确示例
Consumer c = ConsumerBuilder.build();
for (int i =0;i<n;i++)
{
Message m= c.receive();
//process message
}
c.shutdown();
错误示例
for (int i =0;i<n;i++)
{
Consumer c = ConsumerBuilder.build();
Message m= c.receive();
//process message
c.shutdown();
}