跳过主内容
版本: 5.0

RocketMQ Connect 快速入门

快速入门

本教程将在独立模式下启动一个RocketMQ连接器示例项目“rocketmq-connect-sample”,以帮助您理解连接器的工作原理。该示例项目提供了一个源连接器,用于从源文件读取数据并将其发送到RocketMQ集群。它还提供了一个汇聚连接器,用于从RocketMQ集群读取消息并将其写入目标文件。

1. 准备工作:启动 RocketMQ

  1. Linux/Unix/Mac
  2. 64位 JDK 1.8+;
  3. Maven 3.2.x+;
  4. 启动RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 版本;
  5. 使用工具测试RocketMQ消息收发。

这里,使用环境变量NAMESRV_ADDR告知工具客户端RocketMQ的NameServer地址为localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意:RocketMQ具有自动创建Topic和Group的特性。当发送或订阅消息时,如果相应的Topic或Group不存在,RocketMQ将自动创建它们。因此,无需提前创建Topic和Group。

2. 构建连接器运行时

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

注意:项目默认已包含rocketmq-connect-sample的代码,因此无需单独构建rocketmq-connect-sample插件。

3. 以独立模式运行连接器工作进程

修改配置

修改connect-standalone.conf文件,配置RocketMQ连接地址等信息。详情请参考9. 配置文件说明

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

在独立模式下,RocketMQ Connect会将同步检查点信息持久化到本地文件目录storePathRootDir中。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果想重置同步检查点,需要删除持久化的检查点文件。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

以独立模式启动连接器工作进程

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

提示:您可以修改docker/connect/bin/runconnect.sh以根据需要调整JVM启动参数。

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

查看启动日志文件

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果运行时成功启动,您将在日志文件中看到以下打印信息

独立工作进程启动成功。

要退出tail -f命令的日志跟踪模式,您可以按下Ctrl + C组合键。

4. 启动源连接器

创建源文件并写入测试数据

mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt

echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt

注意:文件中不应存在空行(如果遇到空行,演示程序会抛出错误)。源连接器将持续读取源文件,并将每行数据转换为消息体,发送到RocketMQ供汇聚连接器消费。

启动源连接器

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'

如果curl请求返回状态200,则表示创建成功。示例响应:

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}

查看日志文件

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果您看到以下日志,则表示文件源连接器已成功启动

启动连接器 fileSourceConnector 并设置目标状态为 STARTED 成功!!

源连接器配置说明

可空默认值描述
connector.classfalse实现Connector接口的类名(包含包名)
filenamefalse源文件的名称(建议使用绝对路径)
connect.topicnamefalse同步文件数据所需Topic

5. 启动汇聚连接器

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'

如果curl请求返回状态200,则表示创建成功。示例响应:

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}

查看日志文件

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果您看到以下日志,则表示文件汇聚连接器已成功启动

启动连接器 fileSinkConnector 并设置目标状态为 STARTED 成功!!

检查汇聚连接器是否已将数据写入目标文件

cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

如果test-sink-file.txt文件已生成且内容与test-source-file.txt相同,则表示整个流程运行正确。

继续向源文件test-source-file.txt写入测试数据

cd /Users/YourUsername/rocketmqconnect/

echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt

# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

注意:文件内容顺序可能会有所不同,因为rocketmq-connect-sample在向RocketMQ主题发送和接收消息时使用普通消息。这与顺序消息不同,消费普通消息不能保证顺序。

汇聚连接器配置说明

可空默认值描述
connector.classfalse实现Connector接口的类名(包含包名)
filenamefalse汇聚连接器拉取数据并将其保存到文件(建议使用绝对路径)
connect.topicnamesfalse汇聚连接器需要处理的数据消息的Topic

提示:示例项目rocketmq-connect-sample的配置文件说明仅供参考,不同的源/汇聚连接器有不同的配置,请参考具体的源/汇聚连接器。

6. 停止连接器

停止连接器的RESTful命令格式为http://(您的worker ip):(端口)/connectors/(连接器名称)/stop

要停止演示中的两个连接器,您可以使用以下命令

curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop

如果curl请求返回状态200,则表示连接器停止成功。示例响应:

{"status":200,"body":"Connector[fileSinkConnector]deleted successfully"}

如果您看到以下日志消息,则表示文件汇聚连接器已成功关闭

tail -100f ~/logs/rocketmqconnect/connect_default.log

连接器 fileSinkConnector 已成功关闭。

7. 停止Worker进程

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh

8. 日志目录

您可以使用以下命令查看日志目录

ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect

9. 配置文件说明

根据您的使用情况修改RESTful端口、storeRoot路径、Nameserver地址等信息。

以下是一个配置文件示例

#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1

# Http prot for user to access REST API
httpPort=8082

# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876

# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=

storePathRootDir配置说明

在独立模式下,RocketMQ Connect会将同步检查点信息持久化到storePathRootDir指定的本地文件目录中。持久化文件包括:

描述
connectorConfig.json连接器配置持久化文件
position.json源连接数据处理进度持久化文件
taskConfig.json任务配置持久化文件
offset.json汇聚连接数据消费进度持久化文件
connectorStatus.json连接器状态持久化文件
taskStatus.json任务状态持久化文件