快速入门
本节将介绍快速部署单节点 RocketMQ 集群的步骤;还包括用于向其发送和接收消息的命令,作为工作证明。
- 64 位操作系统,推荐使用 Linux/Unix/macOS
- 64 位 JDK 1.8+
1. 获取 Apache RocketMQ
以下说明以 Linux 环境下 RocketMQ 5.2.0 源代码包的应用为例,介绍 RocketMQ 的安装过程。
解压缩 RocketMQ 5.2.0 的源代码包,然后编译并构建二进制可执行文件
$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0
2. 启动 NameServer
安装 RocketMQ 后,启动 NameServer
### start namesrv
$ nohup sh bin/mqnamesrv &
### verify namesrv
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
当我们在 namesrv.log 中看到 'The Name Server boot success..' 时,表示 NameServer 已成功启动。
3. 启动 Broker 和 Proxy
NameServer 启动后,我们需要启动 Broker 和 Proxy。我们推荐使用本地部署模式,在这种模式下,Broker 和 Proxy 部署在同一个进程中。我们也支持集群部署模式。了解更多 部署介绍。
### start broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### verify broker
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
当我们在 proxy.log 中看到 “The broker[brokerName,ip:port]boot success..” 时,表示 Broker 已成功启动。
到目前为止,我们已经部署了一个单主 RocketMQ 集群,并且能够通过脚本发送和接收简单消息。
4. 使用工具发送和接收消息
在使用工具进行测试之前,我们需要将 nameserver 地址设置到系统中,例如系统环境变量 NAMESRV_ADDR
。
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
5. 使用 SDK 发送和接收消息
我们也可以尝试使用客户端 SDK 发送和接收消息,您可以在 rocketmq-clients 中查看更多详细信息。
创建一个 Java 项目。
将 SDK 依赖项添加到 pom.xml 中,请记住将
rocketmq-client-java-version
替换为 最新版本。<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java-version}</version>
</dependency>使用 mqadmin cli 工具创建主题。
$ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
在您创建的 Java 项目中,创建一个发送消息的程序,并使用以下代码运行它
import java.io.IOException;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException, IOException {
String endpoint = "localhost:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}在您创建的 Java 项目中,创建一个消费者演示程序并运行它。Apache RocketMQ 支持 简单消费者 和 推送消费者。
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "YourConsumerGroup";
String topic = "TestTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// pushConsumer.close();
}
}
6. 关闭服务器
完成练习后,我们可以使用以下命令关闭服务。
$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK