跳至主要内容
版本:5.0

RocketMQ Connect 实战 5

Elasticsearch 源 -> RocketMQ Connect -> Elasticsearch 接收器

准备工作

启动 RocketMQ

  1. Linux/Unix/Mac
  2. 64 位 JDK 1.8+
  3. Maven 3.2.x+
  4. 启动 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本。
  5. 使用工具测试 RocketMQ 消息发送和接收。

这里使用环境变量 NAMESRV_ADDR 通知工具客户端 RocketMQ 的 NameServer 地址为 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意:RocketMQ 具有自动创建主题和组的功能。在发送或订阅消息时,如果相应的主题或组不存在,RocketMQ 会自动创建它们。因此,无需提前创建主题和组。

以下是内容的英文翻译

构建连接器运行时

克隆仓库并构建 RocketMQ Connect 项目

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

构建 Elasticsearch 连接器插件

构建 Elasticsearch RocketMQ 连接器插件

cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-elasticsearch/

mvn clean package -Dmaven.test.skip=true

将编译后的 Elasticsearch RocketMQ 连接器插件 JAR 文件复制到运行时使用的插件目录中

mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins

cp target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

以独立模式运行连接器工作器

修改 connect-standalone.conf 文件以配置 RocketMQ 连接地址和其他信息。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

示例配置信息如下

workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678

clusterName="DefaultCluster"

# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

在独立模式下,RocketMQ Connect 会将同步检查点信息持久存储在由 storePathRootDir 指定的本地文件目录中。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果要重置同步检查点,请删除持久性文件

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

要以独立模式启动连接器工作器,请执行以下操作

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

设置 Elasticsearch 服务

Elasticsearch 是一个开源搜索和分析引擎。

我们将使用两个独立的 Elasticsearch Docker 实例作为我们的源数据库和目标数据库

docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.1

docker run --name es1 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-v /Users/YourUsername/rocketmqconnect/es/es1_data:/usr/share/elasticsearch/data \
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1

docker run --name es2 -p 9201:9200 -p 9301:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data \
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1

Docker 命令说明

  • --name es2:指定容器的名称,例如 es2
  • -p 9201:9200 -p 9301:9300:将 Elasticsearch 容器上的端口 9200 和 9300 映射到主机端口 9201 和 9301,以便可以通过主机访问 Elasticsearch 服务。
  • -e discovery.type=single-node:配置 Elasticsearch 在单节点上运行,无需在集群中发现其他节点,适合单服务器部署。
  • -v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data:将主机上的目录挂载到容器内的 /usr/share/elasticsearch/data,用于持久存储 Elasticsearch 数据。

这将运行一个自定义配置的 Elasticsearch 实例,该实例在容器上具有持久数据存储,可以通过主机上的端口 9200 访问,使其在本地机器上的开发或测试环境中非常有用。

查看 Elasticsearch 日志

docker logs -f es1

docker logs -f es2

验证 Elasticsearch 是否已成功启动

# Check Elasticsearch instance 1
curl -XGET https://127.0.0.1:9200

# Check Elasticsearch instance 2
curl -XGET https://127.0.0.1:9201

成功连接和正确操作将导致包含有关 Elasticsearch 及其版本号的 JSON 响应。

设置 Kibana 服务

Kibana 是一个开源数据可视化工具,允许用户交互式地探索和理解存储在 Elasticsearch 集群中的数据。它提供丰富的功能,例如图表、图形和仪表板。

为了方便起见,我们将在 Docker 中设置两个独立的 Kibana 实例,并使用以下命令将它们链接到我们之前建立的 Elasticsearch 容器

docker pull docker.elastic.co/kibana/kibana:7.15.1

docker run --name kibana1 --link es1:elasticsearch -p 5601:5601 -d docker.elastic.co/kibana/kibana:7.15.1

docker run --name kibana2 --link es2:elasticsearch -p 5602:5601 -d docker.elastic.co/kibana/kibana:7.15.1

Docker 命令说明

  • --name kibana2:为新容器分配一个名称,例如 kibana2
  • --link es2:elasticsearch:将容器链接到另一个名为 Elasticsearch 的实例(在本例中为 'es2')。这使 Kibana 和 Elasticsearch 之间能够通信。
  • -p 5602:5601:将 Kibana 的默认端口 (5601) 映射到主机上的相同端口,使其可以通过浏览器访问。
  • -d:以分离模式运行 Docker 容器。

容器启动后,您可以监控其日志输出

docker logs -f kibana1

docker logs -f kibana2

要访问 Kibana 控制台页面,只需在浏览器中访问以下地址

  • kibana1:https://127.0.0.1:5601
  • kibana2:https://127.0.0.1:5602

如果它们加载正确,则表示相应的 Kibana 实例已成功启动。

将测试数据写入源 Elasticsearch

Kibana 的 Dev Tools 可以帮助您直接在 Kibana 中与 Elasticsearch 交互和操作。您可以执行各种查询和操作,分析和理解返回的数据。请参阅文档 console-kibana

批量写入测试数据

通过浏览器访问 Kibana1 控制台,从左侧菜单中找到 Dev Tools,并在页面上输入以下命令以写入测试数据

POST /_bulk
{ "index" : { "_index" : "connect_es" } }
{ "id": "1", "field1": "value1", "field2": "value2" }
{ "index" : { "_index" : "connect_es" } }
{ "id": "2", "field1": "value3", "field2": "value4" }

注意:

  • connect_es:数据的索引名称。
  • id/field1/field2:这些是字段名称,1、value1、value2 代表字段的值。

注意rocketmq-connect-elasticsearch 中存在一个限制,它要求数据中有一个字段可用于 >= 比较操作(字符串或数字)。此字段将用于记录同步检查点。在上面的示例中,id 字段是一个全局唯一的递增数字字段。

查询数据

要查询索引中的数据,请使用以下命令

GET /connect_es/_search
{
"size": 100
}

如果没有任何数据可用,则响应将为

{
"error" : {
...
"type" : "index_not_found_exception",
"reason" : "no such index [connect_es]",
"resource.type" : "index_or_alias",
"resource.id" : "connect_es",
"index_uuid" : "_na_",
"index" : "connect_es"
},
"status" : 404
}

如果有数据可用,则响应将为

{
...
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "connect_es",
"_type" : "_doc",
"_id" : "_dx49osBb46Z9cN4hYCg",
"_score" : 1.0,
"_source" : {
"id" : "1",
"field1" : "value1",
"field2" : "value2"
}
},
{
"_index" : "connect_es",
"_type" : "_doc",
"_id" : "_tx49osBb46Z9cN4hYCg",
"_score" : 1.0,
"_source" : {
"id" : "2",
"field1" : "value3",
"field2" : "value4"
}
}
]
}
}

删除数据

如果由于重复测试或其他原因需要删除索引中的数据,可以使用以下命令

DELETE /connect_es

启动连接器

启动 Elasticsearch 源连接器

运行以下命令以启动 ES 源连接器。连接器将连接到 Elasticsearch 并从 connect_es 索引中读取文档数据。它将解析 Elasticsearch 文档数据并将其打包到一个通用 ConnectRecord 对象中,该对象将发送到 RocketMQ 主题,供接收器连接器使用。

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSourceConnector -d  '{
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector",
"elasticsearchHost":"localhost",
"elasticsearchPort":9200,
"index":{
"connect_es": {
"primaryShards":1,
"id":1
}
},
"max.tasks":2,
"connect.topicname":"ConnectEsTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

注意:启动命令指定源 ES 应同步 connect_es 索引,索引中的递增字段为 id。数据将从 id=1 开始获取。

如果 curl 请求返回状态:200,则表示创建成功,示例响应将为

{"status":200,"body":{"connector.class":"...

如果看到以下日志,则表示文件源连接器已成功启动。

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

启动连接器 elasticsearchSourceConnector 并将目标状态设置为 STARTED 成功!!

启动 Elasticsearch 接收器连接器

运行以下命令以启动 ES 接收器连接器。连接器将订阅 RocketMQ 主题中的数据并使用它。它将把每条消息转换为文档数据并将其写入目标 ES。

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSinkConnector -d '{
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector",
"elasticsearchHost":"localhost",
"elasticsearchPort":9201,
"max.tasks":2,
"connect.topicnames":"ConnectEsTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

注意:启动命令指定目标 ES 的地址和端口,它对应于之前在 Docker 中启动的 ES2。

如果 curl 请求返回状态:200,则表示创建成功,示例响应将为

{"status":200,"body":{"connector.class":"...

如果看到以下日志,则表示文件源连接器已成功启动

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

启动连接器 elasticsearchSinkConnector 并将目标状态设置为 STARTED 成功!!

要检查接收器连接器是否已将数据写入目标 ES 索引

  1. 在浏览器中访问 Kibana2 控制台地址:https://127.0.0.1:5602
  2. 在 Kibana2 Dev Tools 页面中,查询索引中的数据。如果它与源 ES1 中的数据匹配,则表示连接器正在正常运行。
GET /connect_es/_search
{
"size": 100
}