跳到主内容
版本:5.0

使用 Kubernetes 运行 RocketMQ

本节描述了如何在 Kubernetes 中快速部署单节点 RocketMQ 5.x 服务并执行简单的消息发送和接收。

系统要求
  • 一个正在运行的 Kubernetes 集群
  • 已安装 Helm 3.7.0+
  • 64位 JDK 1.8+

1.安装 Helm

确保你的系统上已安装 Helm

$ helm version

如果 Helm(3.7.0 或更高版本)未安装,你可以使用以下命令安装

$ curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash

2.下载 RocketMQ Helm Chart

$ helm pull oci://registry-1.docker.io/apache/rocketmq --version 0.0.1 
$ tar -zxvf rocketmq-0.0.1.tgz

3.部署 RocketMQ

使用 Helm chart 部署 RocketMQ。

# Modify the configuration in values.yaml
$ vim values.yaml
## values.yaml, adjust memory requests and limits in broker resources according to available memory size ##
resources:
limits:
cpu: 2
memory: 10Gi
requests:
cpu: 2
memory: 10Gi
##values.yaml##
$ helm install rocketmq-demo ./rocketmq
# Check pod status
# If the parameters are normal, it indicates successful deployment
$ kubectl get pods -o wide -n default
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
rocketmq-demo-broker-0 1/1 Running 0 6h3m 192.168.58.225 k8s-node02 <none> <none>
rocketmq-demo-nameserver-757877747b-k669k 1/1 Running 0 6h3m 192.168.58.226 k8s-node02 <none> <none>
rocketmq-demo-proxy-6c569bd457-wcg6g 1/1 Running 0 6h3m 192.168.85.227 k8s-node01 <none> <none>

4.验证消息发送和接收

使用 JAVA SDK 测试消息发送和接收(由于本地网络和 k8s 网络不在同一个内网中,因此你需要将项目在本地打包并远程运行。打包后,将 jar 文件从 target 目录复制到目标服务器并执行 java -jar jar 文件名)。具体如下

1)在 IDE 中创建一个 Java 项目。

2)在 pom.xml 文件中添加以下依赖项以导入 Java 库

 ...... 
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
.....

3)登录到 Pod(需要管理工具),或者也可以在主机上执行

# Log into the pod
$ kubectl exec -ti rocketmq-demo-broker-0 -- /bin/bash

# Create Topic using mqadmin tools
$ sh mqadmin updatetopic -t TestTopic -c DefaultCluster

# Create subscription group using mqadmin tools
$ sh mqadmin updateSubGroup -c DefaultCluster -g TestGroup

4)在创建的 Java 项目中,创建一个发送普通消息的程序 (ProducerDemo.java);示例代码如下

package com.rocketmq.producer;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class ProducerDemo {
public static void main(String[] args) throws ClientException {
// The endpoint address, which needs to be set to the address and port list of the Proxy; the following is the proxy address in the k8s environment.
String endpoint = "192.168.85.227:8081";
// The target Topic name for sending messages, which needs to be created in advance.
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// When initializing the Producer, communication configuration and pre-bound Topic need to be set.
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// Sending normal messages.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Set message index key for precise search of a specific message.
.setKeys("messageKey")
// Set message Tag for filtering messages based on specific tags on the consumer side.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message, you need to pay attention to the sending result and handle failures and other exceptions.
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
} catch (ClientException e) {
}
// producer.close();
}
}

5)在创建的 Java 项目中,创建一个订阅普通消息的程序 (Consumer.java)。Apache RocketMQ 支持 SimpleConsumerPushConsumer,这里我们使用 PushConsumer。


package com.rocketmq.consumer;

import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.util.List;

public class Consumer {
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// The endpoint address, which needs to be set to the address and port list of the Proxy; the following is the proxy address in the k8s environment.
String endpoints = "192.168.85.227:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// The filter rule for subscribing to messages, indicating subscription to messages of all Tags.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Specify the consumer group to which the consumer belongs; the Group needs to be created in advance.
String consumerGroup = "TestGroup";
// Specify which target Topic needs to be subscribed to; the Topic needs to be created in advance.
String topic = "TestTopic";
// Initialize PushConsumer, binding to the consumer group ConsumerGroup, communication parameters, and subscription relationship.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set consumer group.
.setConsumerGroup(consumerGroup)
// Set pre-bound subscription relationship.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set consumption listener.
.setMessageListener(messageView -> {
// Process the message and return the consumption result.
System.out.println("Consume message successfully, messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// If you don't need to use PushConsumer anymore, you can close this instance.
// pushConsumer.close();
}
}

5.释放 RocketMQ 资源

#Release all RocketMQ resources
$ helm uninstall rocketmq-demo