跳至主要内容
版本: 5.0

RocketMQ Connect 实战 4

SFTP 服务器 (文件数据) -> RocketMQ Connect -> SFTP 服务器 (文件)

准备

启动 RocketMQ

  1. Linux/Unix/Mac
  2. 64 位 JDK 1.8+
  3. Maven 3.2.x+
  4. 启动 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本
  5. 使用工具测试 RocketMQ 消息发送和接收。

这里使用环境变量 NAMESRV_ADDR 通知工具客户端 RocketMQ 的 NameServer 地址为 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意: RocketMQ 具有自动创建主题和组的功能。当发送或订阅消息时,如果相应的主题或组不存在,RocketMQ 将自动创建它们。因此,无需提前创建主题和组。

构建连接器运行时

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

构建 SFTP 连接器插件

cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/

mvn clean package -Dmaven.test.skip=true

将编译后的 SFTP RocketMQ 连接器 jar 包放入插件目录以供运行时加载。

mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

在独立模式下运行连接器工作器

修改 connect-standalone.conf 文件以配置 RocketMQ 连接地址和其他信息。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

示例配置信息如下

workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678

clusterName="DefaultCluster"

# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

在独立模式下,RocketMQ Connect 将同步检查点信息持久存储在由 storePathRootDir 指定的本地文件目录中。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果要重置同步检查点,则需要删除持久化的检查点信息文件。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

要以独立模式启动连接器工作器

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

设置 SFTP 服务器

SFTP (SSH 文件传输协议) 是一种文件传输协议,用于在计算机之间进行安全的文件传输。SFTP 建立在 SSH (安全外壳) 协议之上,并利用加密和身份验证。

我们将使用 macOS 中内置的 SFTP 服务 (通过启用“远程登录”访问)。有关详细说明,请参阅 允许远程计算机访问您的 Mac 文档。

创建源测试文件

创建一个名为 source.txt 的测试文件,并在其中写入一些测试数据

mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/

cd /Users/YourUsername/rocketmqconnect/sftp-test/

touch source.txt

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

登录 SFTP 服务以验证您是否可以正常访问它。输入以下命令,然后输入您的密码

# sftp -P port YourUsername@hostname
sftp -P 22 [email protected]

注意: 由于这是您本地 MAC OS 提供的 SFTP 服务,因此地址为 127.0.0.1,端口为默认的 22。

sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
sftp> ls source.txt
sftp> bye

启动连接器

启动 SFTP 源连接器

运行以下命令以启动 SFTP 源连接器。此连接器将连接到 SFTP 服务以从 source.txt 文件中读取。对于文件中的每一行文本,连接器将解析并将内容打包到一个通用的 ConnectRecord 对象中,然后将其发送到 RocketMQ 主题,供接收连接器使用。

curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
"connect.topicname": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

如果 curl 请求返回状态: 200,则表示连接器已成功创建。示例响应如下所示

{"status":200,"body":{"connector.class":"...

要确认文件源连接器已成功启动,请运行以下命令

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

启动连接器 SftpSourceConnector 并将目标状态设置为 STARTED 成功!!

启动 SFTP 接收连接器

运行以下命令以启动 SFTP 接收连接器。此连接器将订阅 RocketMQ 主题以使用消息,并将每个消息转换为单行文本,然后使用 SFTP 协议将其写入目标文件 sink.txt

curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
"connect.topicnames": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

如果 curl 请求返回状态: 200,则表示连接器已成功创建。示例响应如下所示

{"status":200,"body":{"connector.class":"...

检查日志以确认 SFTP 接收连接器已成功启动

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

启动连接器 SftpSinkConnector 并将目标状态设置为 STARTED 成功!!

通过运行以下命令确认数据已写入目标文件

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

如果 sink.txt 文件已生成,并且其内容与 source.txt 文件的内容匹配,则整个过程正常工作。

source.txt 文件写入更多测试数据以继续测试

cd /Users/YourUsername/rocketmqconnect/sftp-test/

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

# Wait a few seconds to give the connector time to replicate data to the sink file.
sleep 10

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

注意: 文件内容的顺序可能会有所不同,因为 rocketmq-connect-sftp 在向 RocketMQ 主题发送和接收消息时使用 普通消息。这与 有序消息 不同,消费 普通消息 并不保证顺序。