消费者
本节介绍 Apache RocketMQ 中消费者的定义、模型关系、内部属性和行为约束。本主题还提供消费者的版本兼容性信息和使用注意事项。
定义
消费者是接收和处理 Apache RocketMQ 中消息的实体。
消费者通常集成在业务系统中。它们从 Apache RocketMQ 代理获取消息,并将消息转换为业务逻辑可以感知和处理的信息。
以下项目决定消费者的行为
消费者身份:消费者必须与消费者组关联,以获取行为设置和消费状态。
消费者类型:Apache RocketMQ 提供了多种消费者类型,用于不同的开发场景,包括推送消费者、简单消费者和拉取消费者。有关更多信息,请参见 消费者类型。
消费者的本地设置:这些设置指定消费者客户端根据消费者类型运行的方式。例如,您可以配置消费者上的线程数量和并发设置,以实现不同的传输效果。
模型关系
下图显示了消费者在 Apache RocketMQ 的领域模型中的位置。
消息由生产者初始化并发送到 Apache RocketMQ 服务器。
消息按到达 Apache RocketMQ 服务器的顺序存储在主题的指定队列中。
消费者根据指定的订阅关系从 Apache RocketMQ 服务器获取和消费消息。
内部属性
消费者组名称
定义:与当前消费者关联的消费者组的名称。消费者从消费者组继承其行为。有关更多信息,请参见 消费者组。
值:消费者组是 Apache RocketMQ{#product-name} 的逻辑资源
. 您必须使用控制台或提前调用 API 操作来创建消费者组。有关此操作限制的更多信息,请参见 参数限制。
客户端 ID
定义:消费者客户端的身份。此属性用于区分不同的消费者。该值在集群中必须唯一。
值:客户端 ID 由 Apache RocketMQ SDK 自动生成。它主要用于 O\&M 目的,例如日志查看和问题定位。客户端 ID 不可修改。
通信参数
端点 (必需) : 用于连接到服务器的端点。此端点用于标识集群。
访问点必须以格式配置。我们建议您使用域名,避免使用 IP 地址,以防止节点更改导致热点迁移失败。
凭据 (可选) : 客户端用于身份验证的凭据。
传输仅在服务器上启用身份识别和身份验证时才需要。
- 请求超时 (可选) : 网络请求的超时时间。有关值范围和默认值的更多信息,请参见 参数限制。
预绑定订阅列表
定义:指定消费者的订阅列表。Apache RocketMQ 代理可以使用预绑定订阅列表在消费者初始化期间验证订阅主题的权限和有效性,而不是在应用程序启动后。
值:我们建议您在消费者初始化期间指定订阅或订阅主题列表。如果未指定订阅或订阅主题发生更改,Apache RocketMQ 会动态验证主题。
消息监听器
定义:消费者在 Apache RocketMQ 代理将消息推送到消费者后,用于调用消息消费逻辑的监听器。
值:消息监听器的值在消费者客户端上配置。
约束:当您以推送消费者的身份消费消息时,您必须在消费者客户端上配置消息监听器。有关消费者类型的更多信息,请参见 消费者类型。
行为约束
在 Apache RocketMQ 领域模型中,消费者管理通过消费者分组实现,同一组中的消费者共享消息以供消费。因此,为了确保组中消息的正常负载和消费,Apache RocketMQ 要求同一组中的所有消费者保持以下消费行为一致
传递顺序
消费重试策略
版本兼容性
如行为约束中所述,同一组中所有消费者的传递顺序和消费重试策略都需要保持一致。
Apache RocketMQ 服务器版本 5.x:前面的消费者的消费行为是从关联的消费者组获取的。因此,同一组中所有消费者的消费行为必须保持一致,客户端无需关注它。
Apache RocketMQ 服务器版本 3.x/ 4.x 历史:前面的消费逻辑由消费者客户端接口定义。因此,在设置消费者客户端时,您必须确保同一组中消费者的消费行为保持一致。
如果您使用 Apache RocketMQ 服务器版本 5.x 并且客户端使用以前的版本 SDK,则消费者的消费逻辑受消费者客户端接口设置的约束。
使用注意事项
我们建议您限制单个进程上的消费者数量。
Apache RocketMQ 的消费者在通信协议级别支持非阻塞传输模式。非阻塞传输模式具有更高的通信效率,并支持多线程并发访问。因此,在大多数情况下,单个进程中只需要为消费者组初始化一个消费者。在开发阶段,避免初始化具有相同配置的多个消费者。
我们建议您不要定期创建和销毁消费者。
Apache RocketMQ 的消费者是可重用的底层资源,就像数据库的连接池一样。您无需在每次接收消息时创建消费者,也不需要在消费消息后销毁消费者。如果您定期创建和销毁消费者,就会在代理上生成大量短连接请求。这会给您的系统带来很大的负载。
正确示例
Consumer c = ConsumerBuilder.build();
for (int i =0;i<n;i++)
{
Message m= c.receive();
//process message
}
c.shutdown();
错误示例
for (int i =0;i<n;i++)
{
Consumer c = ConsumerBuilder.build();
Message m= c.receive();
//process message
c.shutdown();
}