RocketMQ Connect 快速入门
快速入门
本教程将在独立模式下启动一个RocketMQ连接器示例项目“rocketmq-connect-sample”,以帮助您理解连接器的工作原理。该示例项目提供了一个源连接器,用于从源文件读取数据并将其发送到RocketMQ集群。它还提供了一个汇聚连接器,用于从RocketMQ集群读取消息并将其写入目标文件。
1. 准备工作:启动 RocketMQ
- Linux/Unix/Mac
- 64位 JDK 1.8+;
- Maven 3.2.x+;
- 启动RocketMQ。可以使用 RocketMQ 4.x 或 RocketMQ 5.x 版本;
- 使用工具测试RocketMQ消息收发。
这里,使用环境变量NAMESRV_ADDR告知工具客户端RocketMQ的NameServer地址为localhost:9876。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注意:RocketMQ具有自动创建Topic和Group的特性。当发送或订阅消息时,如果相应的Topic或Group不存在,RocketMQ将自动创建它们。因此,无需提前创建Topic和Group。
2. 构建连接器运行时
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
注意:项目默认已包含rocketmq-connect-sample的代码,因此无需单独构建rocketmq-connect-sample插件。
3. 以独立模式运行连接器工作进程
修改配置
修改connect-standalone.conf
文件,配置RocketMQ连接地址等信息。详情请参考9. 配置文件说明。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
在独立模式下,RocketMQ Connect会将同步检查点信息持久化到本地文件目录storePathRootDir中。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
如果想重置同步检查点,需要删除持久化的检查点文件。
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
以独立模式启动连接器工作进程
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
提示:您可以修改docker/connect/bin/runconnect.sh
以根据需要调整JVM启动参数。
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
查看启动日志文件
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果运行时成功启动,您将在日志文件中看到以下打印信息
独立工作进程启动成功。
要退出tail -f
命令的日志跟踪模式,您可以按下Ctrl + C
组合键。
4. 启动源连接器
创建源文件并写入测试数据
mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
注意:文件中不应存在空行(如果遇到空行,演示程序会抛出错误)。源连接器将持续读取源文件,并将每行数据转换为消息体,发送到RocketMQ供汇聚连接器消费。
启动源连接器
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'
如果curl请求返回状态200,则表示创建成功。示例响应:
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}
查看日志文件
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果您看到以下日志,则表示文件源连接器已成功启动
启动连接器 fileSourceConnector 并设置目标状态为 STARTED 成功!!
源连接器配置说明
键 | 可空 | 默认值 | 描述 |
---|---|---|---|
connector.class | false | 实现Connector接口的类名(包含包名) | |
filename | false | 源文件的名称(建议使用绝对路径) | |
connect.topicname | false | 同步文件数据所需Topic |
5. 启动汇聚连接器
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'
如果curl请求返回状态200,则表示创建成功。示例响应:
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}
查看日志文件
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果您看到以下日志,则表示文件汇聚连接器已成功启动
启动连接器 fileSinkConnector 并设置目标状态为 STARTED 成功!!
检查汇聚连接器是否已将数据写入目标文件
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
如果test-sink-file.txt文件已生成且内容与test-source-file.txt相同,则表示整个流程运行正确。
继续向源文件test-source-file.txt写入测试数据
cd /Users/YourUsername/rocketmqconnect/
echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt
# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
注意:文件内容顺序可能会有所不同,因为rocketmq-connect-sample
在向RocketMQ主题发送和接收消息时使用普通消息
。这与顺序消息
不同,消费普通消息
不能保证顺序。
汇聚连接器配置说明
键 | 可空 | 默认值 | 描述 |
---|---|---|---|
connector.class | false | 实现Connector接口的类名(包含包名) | |
filename | false | 汇聚连接器拉取数据并将其保存到文件(建议使用绝对路径) | |
connect.topicnames | false | 汇聚连接器需要处理的数据消息的Topic |
提示:示例项目rocketmq-connect-sample
的配置文件说明仅供参考,不同的源/汇聚连接器有不同的配置,请参考具体的源/汇聚连接器。
6. 停止连接器
停止连接器的RESTful命令格式为http://(您的worker ip):(端口)/connectors/(连接器名称)/stop
要停止演示中的两个连接器,您可以使用以下命令
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
如果curl请求返回状态200,则表示连接器停止成功。示例响应:
{"status":200,"body":"Connector[fileSinkConnector]deleted successfully"}
如果您看到以下日志消息,则表示文件汇聚连接器已成功关闭
tail -100f ~/logs/rocketmqconnect/connect_default.log
连接器 fileSinkConnector 已成功关闭。
7. 停止Worker进程
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh
8. 日志目录
您可以使用以下命令查看日志目录
ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect
9. 配置文件说明
根据您的使用情况修改RESTful端口、storeRoot路径、Nameserver地址等信息。
以下是一个配置文件示例
#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1
# Http prot for user to access REST API
httpPort=8082
# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=
storePathRootDir配置说明
在独立模式下,RocketMQ Connect会将同步检查点信息持久化到storePathRootDir指定的本地文件目录中。持久化文件包括:
键 | 描述 |
---|---|
connectorConfig.json | 连接器配置持久化文件 |
position.json | 源连接数据处理进度持久化文件 |
taskConfig.json | 任务配置持久化文件 |
offset.json | 汇聚连接数据消费进度持久化文件 |
connectorStatus.json | 连接器状态持久化文件 |
taskStatus.json | 任务状态持久化文件 |