RocketMQ Connect 实践 4
SFTP 服务器 (文件数据) -> RocketMQ Connect -> SFTP 服务器 (文件)
准备工作
启动 RocketMQ
- Linux/Unix/Mac
- 64位 JDK 1.8+;
- Maven 3.2.x+;
- 启动 RocketMQ。可以使用 RocketMQ 4.x 或 RocketMQ 5.x 5.x 版本;
- 使用工具测试 RocketMQ 消息发送和接收。
这里,使用环境变量 NAMESRV_ADDR 告知工具客户端 RocketMQ 的 NameServer 地址为 localhost:9876。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注意: RocketMQ 具有自动创建 Topic 和 Group 的功能。当发送或订阅消息时,如果对应的 Topic 或 Group 不存在,RocketMQ 将自动创建它们。因此,无需提前创建 Topic 和 Group。
构建连接器运行时
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
构建 SFTP 连接器插件
cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/
mvn clean package -Dmaven.test.skip=true
将 SFTP RocketMQ 连接器编译后的 jar 包放入 Plugin 目录,以便运行时加载。
mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins
以独立模式运行连接器 Worker
修改 connect-standalone.conf
文件,配置 RocketMQ 连接地址及其他信息。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
示例配置信息如下
workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
## Http port for user to access REST API
httpPort=8082
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678
clusterName="DefaultCluster"
# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins
在独立模式下,RocketMQ Connect 将同步检查点信息持久化存储在 storePathRootDir 指定的本地文件目录中。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
如果需要重置同步检查点,则需要删除持久化的检查点信息文件。
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
以独立模式启动连接器 Worker
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
搭建 SFTP 服务器
SFTP(SSH 文件传输协议)是一种用于计算机之间安全文件传输的文件传输协议。SFTP 基于 SSH(安全外壳协议)构建,并利用加密和身份验证。
我们将使用 macOS 内置的 SFTP 服务(通过启用“远程登录”访问)。有关详细说明,请参阅 允许远程电脑访问您的 Mac 文档。
创建源测试文件
创建名为 source.txt
的测试文件并向其中写入一些测试数据
mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/
cd /Users/YourUsername/rocketmqconnect/sftp-test/
touch source.txt
echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt
登录 SFTP 服务,验证是否可以正常访问。输入以下命令,然后输入您的密码
# sftp -P port YourUsername@hostname
sftp -P 22 YourUsername@127.0.0.1
注意: 由于这是您本地 MAC OS 提供的 SFTP 服务,因此地址是 127.0.0.1
端口是默认的 22。
sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
sftp> ls source.txt
sftp> bye
启动连接器
启动 SFTP Source 连接器
运行以下命令启动 SFTP source 连接器。此连接器将连接到 SFTP 服务以读取 source.txt
文件。对于文件中的每一行文本,连接器将解析并打包内容到通用 ConnectRecord 对象中,然后将其发送到 RocketMQ topic 以供 sink 连接器消费。
curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
"connect.topicname": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'
如果 curl 请求返回状态:200,则表示连接器创建成功。示例响应如下
{"status":200,"body":{"connector.class":"...
要确认文件源连接器已成功启动,请运行以下命令
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
启动连接器 SftpSourceConnector 并设置目标状态 STARTED 成功!!
启动 SFTP Sink 连接器
运行以下命令启动 SFTP sink 连接器。此连接器将订阅 RocketMQ topic 以消费消息,并将每条消息转换为单行文本,然后使用 SFTP 协议写入目标文件 sink.txt
。
curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
"connect.topicnames": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'
如果 curl 请求返回状态:200,则表示连接器创建成功。示例响应如下
{"status":200,"body":{"connector.class":"...
检查日志以确认 SFTP sink 连接器成功启动
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
启动连接器 SftpSinkConnector 并设置目标状态 STARTED 成功!!
运行以下命令确认数据已写入目标文件
cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt
如果 sink.txt
文件已生成且其内容与 source.txt
文件匹配,则整个过程正常工作。
向 source.txt
文件写入更多测试数据以继续测试
cd /Users/YourUsername/rocketmqconnect/sftp-test/
echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt
# Wait a few seconds to give the connector time to replicate data to the sink file.
sleep 10
cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt
注意: 文件内容的顺序可能有所不同,因为 rocketmq-connect-sftp
在向 RocketMQ topic 发送和接收消息时使用 普通消息
。这与 顺序消息
不同,消费 普通消息
不保证顺序。