使用 Kubernetes 运行 RocketMQ
本节描述了如何在 Kubernetes 中快速部署单节点 RocketMQ 5.x 服务并执行简单的消息发送和接收。
系统要求
- 一个正在运行的 Kubernetes 集群
- 已安装 Helm 3.7.0+
- 64位 JDK 1.8+
1.安装 Helm
确保你的系统上已安装 Helm
$ helm version
如果 Helm(3.7.0 或更高版本)未安装,你可以使用以下命令安装
$ curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
2.下载 RocketMQ Helm Chart
$ helm pull oci://registry-1.docker.io/apache/rocketmq --version 0.0.1
$ tar -zxvf rocketmq-0.0.1.tgz
3.部署 RocketMQ
使用 Helm chart 部署 RocketMQ。
# Modify the configuration in values.yaml
$ vim values.yaml
## values.yaml, adjust memory requests and limits in broker resources according to available memory size ##
resources:
limits:
cpu: 2
memory: 10Gi
requests:
cpu: 2
memory: 10Gi
##values.yaml##
$ helm install rocketmq-demo ./rocketmq
# Check pod status
# If the parameters are normal, it indicates successful deployment
$ kubectl get pods -o wide -n default
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
rocketmq-demo-broker-0 1/1 Running 0 6h3m 192.168.58.225 k8s-node02 <none> <none>
rocketmq-demo-nameserver-757877747b-k669k 1/1 Running 0 6h3m 192.168.58.226 k8s-node02 <none> <none>
rocketmq-demo-proxy-6c569bd457-wcg6g 1/1 Running 0 6h3m 192.168.85.227 k8s-node01 <none> <none>
4.验证消息发送和接收
使用 JAVA SDK 测试消息发送和接收(由于本地网络和 k8s 网络不在同一个内网中,因此你需要将项目在本地打包并远程运行。打包后,将 jar 文件从 target 目录复制到目标服务器并执行 java -jar jar 文件名)。具体如下
1)在 IDE 中创建一个 Java 项目。
2)在 pom.xml 文件中添加以下依赖项以导入 Java 库
......
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
.....
3)登录到 Pod(需要管理工具),或者也可以在主机上执行
# Log into the pod
$ kubectl exec -ti rocketmq-demo-broker-0 -- /bin/bash
# Create Topic using mqadmin tools
$ sh mqadmin updatetopic -t TestTopic -c DefaultCluster
# Create subscription group using mqadmin tools
$ sh mqadmin updateSubGroup -c DefaultCluster -g TestGroup
4)在创建的 Java 项目中,创建一个发送普通消息的程序 (ProducerDemo.java);示例代码如下
package com.rocketmq.producer;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerDemo {
public static void main(String[] args) throws ClientException {
// The endpoint address, which needs to be set to the address and port list of the Proxy; the following is the proxy address in the k8s environment.
String endpoint = "192.168.85.227:8081";
// The target Topic name for sending messages, which needs to be created in advance.
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// When initializing the Producer, communication configuration and pre-bound Topic need to be set.
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// Sending normal messages.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// Set message index key for precise search of a specific message.
.setKeys("messageKey")
// Set message Tag for filtering messages based on specific tags on the consumer side.
.setTag("messageTag")
// Message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message, you need to pay attention to the sending result and handle failures and other exceptions.
SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
} catch (ClientException e) {
}
// producer.close();
}
}
5)在创建的 Java 项目中,创建一个订阅普通消息的程序 (Consumer.java)。Apache RocketMQ 支持 SimpleConsumer 和 PushConsumer,这里我们使用 PushConsumer。
package com.rocketmq.consumer;
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// The endpoint address, which needs to be set to the address and port list of the Proxy; the following is the proxy address in the k8s environment.
String endpoints = "192.168.85.227:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// The filter rule for subscribing to messages, indicating subscription to messages of all Tags.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Specify the consumer group to which the consumer belongs; the Group needs to be created in advance.
String consumerGroup = "TestGroup";
// Specify which target Topic needs to be subscribed to; the Topic needs to be created in advance.
String topic = "TestTopic";
// Initialize PushConsumer, binding to the consumer group ConsumerGroup, communication parameters, and subscription relationship.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set consumer group.
.setConsumerGroup(consumerGroup)
// Set pre-bound subscription relationship.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set consumption listener.
.setMessageListener(messageView -> {
// Process the message and return the consumption result.
System.out.println("Consume message successfully, messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// If you don't need to use PushConsumer anymore, you can close this instance.
// pushConsumer.close();
}
}
5.释放 RocketMQ 资源
#Release all RocketMQ resources
$ helm uninstall rocketmq-demo