Kafka Connect 提供了便捷的機制,能將資料從各種來源系統匯入 Kafka,再串流至目標系統。本文以 MySQL 到 Elasticsearch 的資料串流為例,詳細說明 Kafka Connect 的設定與應用。首先,我們會比較 Standalone 和 Distributed 兩種執行模式,並使用 FileStream Connector 示範基本的資料讀寫流程。接著,會逐步引導讀者安裝和設定 JDBC Connector,建立 MySQL 資料函式庫和表格,並組態 JDBC Source Connector 將資料匯入 Kafka。最後,我們將示範如何使用 Elasticsearch Connector 將 Kafka 中的資料寫入 Elasticsearch,並進行索引驗證和搜尋。過程中,我們也會探討 Debezium 專案提供的變更資料擷取功能,以及如何利用單一訊息轉換功能簡化資料處理流程。
Kafka Connect 實戰:從 MySQL 到 Elasticsearch 的資料串流
Kafka Connect 是 Apache Kafka 的一部分,提供了一種可擴充套件且可靠的方式來連線 Kafka 與外部系統,如資料函式庫、檔案系統和搜尋引擎等。在本篇文章中,我們將探討如何使用 Kafka Connect 將 MySQL 資料函式庫中的資料串流到 Elasticsearch 中進行索引。
瞭解 Kafka Connect 的 Standalone 和 Distributed 模式
在開始之前,我們先來瞭解一下 Kafka Connect 的兩種執行模式:Standalone 和 Distributed。Standalone 模式適用於單機環境,所有 Connector 和 Task 都執行在同一台機器上。Distributed 模式則適用於叢集環境,多台機器共同執行 Connector 和 Task,提供高用性和擴充套件性。
使用 FileStreamSource 和 FileStreamSink Connector
首先,我們使用 Kafka 自帶的 FileStreamSource 和 FileStreamSink Connector 來示範如何將檔案中的資料讀入 Kafka 主題,並將主題中的資料寫入另一個檔案。
步驟 1:啟動 Kafka Connect
bin/connect-distributed.sh config/connect-distributed.properties &
步驟 2:建立 FileStreamSource Connector
echo '{"name":"load-kafka-config", "config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors -H "Content-Type: application/json"
#### 內容解密:
此命令建立了一個名為 load-kafka-config 的 Connector,使用 FileStreamSource 類別讀取 config/server.properties 檔案,並將其內容寫入 kafka-config-topic 主題。
步驟 3:驗證資料是否正確寫入主題
bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning
#### 內容解密:
此命令使用 Kafka Console Consumer 從 kafka-config-topic 主題中讀取資料,並顯示在終端上。
步驟 4:建立 FileStreamSink Connector
echo '{"name":"dump-kafka-config", "config":{"connector.class":"FileStreamSink","file":"copy-of-server-properties","topics":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
#### 內容解密:
此命令建立了一個名為 dump-kafka-config 的 Connector,使用 FileStreamSink 類別將 kafka-config-topic 主題中的資料寫入 copy-of-server-properties 檔案。
將 MySQL 資料串流到 Elasticsearch
現在,我們來示範如何使用 Kafka Connect 將 MySQL 資料函式庫中的資料串流到 Elasticsearch 中。
步驟 1:安裝 MySQL 和 Elasticsearch
brew install mysql
brew install elasticsearch
#### 內容解密:
此命令使用 Homebrew 安裝 MySQL 和 Elasticsearch。
步驟 2:安裝必要的 Connector
您可以透過 Confluent Hub Client 或從原始碼構建來安裝必要的 Connector。
Kafka Connect 與 JDBC Connector 的安裝與設定
Kafka Connect 是一個用於在 Kafka 與外部系統(如資料函式庫、檔案系統等)之間進行資料整合的工具。本文將介紹如何安裝和設定 Kafka Connect 以及 JDBC Connector,以實作 MySQL 資料函式庫與 Kafka 之間的資料串流。
下載和建置 Kafka Connect JDBC Connector
首先,我們需要下載並建置 Kafka Connect JDBC Connector。以下是步驟:
- 使用 Git 將 Confluent 提供的 Kafka Connect JDBC Connector 的原始碼下載到本地端:
git clone https://github.com/confluentinc/kafka-connect-jdbc
- 進入專案目錄並使用 Maven 建置專案:
cd kafka-connect-jdbc
mvn install -DskipTests
- 同樣地,下載並建置 Kafka Connect Elasticsearch Connector(本文雖提及,但主要聚焦於 JDBC Connector)。
設定 Kafka Connect
- 建立一個目錄來存放 Connector 的 JAR 檔案,例如
/opt/connectors。 - 修改
config/connect-distributed.properties檔案,將plugin.path設定為/opt/connectors。 - 將建置好的 JDBC Connector JAR 檔案及其依賴項複製到
/opt/connectors/jdbc目錄下。 - 由於 JDBC Connector 需要 MySQL JDBC Driver,因此需從 MySQL 官方網站下載該驅動程式,並將其放置於
/opt/connectors/jdbc目錄下。
程式碼片段:建立目錄並複製 JAR 檔案
mkdir /opt/connectors/jdbc
mkdir /opt/connectors/elastic
cp .../kafka-connect-jdbc/target/kafka-connect-jdbc-10.3.x-SNAPSHOT.jar /opt/connectors/jdbc
cp ../kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-11.1.0-SNAPSHOT.jar /opt/connectors/elastic
cp ../kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-11.1.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/* /opt/connectors/elastic
內容解密:
- 上述步驟首先建立了兩個目錄,分別用於存放 JDBC 和 Elasticsearch Connector 的檔案。
- 將建置好的 JAR 檔案複製到對應的目錄中,以供 Kafka Connect 使用。
- 特別注意,Elasticsearch Connector 的相關檔案需要被複製到指定的目錄下。
啟動 Kafka Connect 並驗證 Connector
- 重新啟動 Kafka Connect workers。
- 使用
curl命令查詢可用的 Connector plugins:
curl http://localhost:8083/connector-plugins
這將列出所有可用的 Connector,包括新安裝的 JDBC Connector。
程式碼片段:查詢 Connector plugins
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "11.1.0-SNAPSHOT"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.3.x-SNAPSHOT"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.3.x-SNAPSHOT"
}
]
內容解密:
- 該命令傳回一個 JSON 物件,列出了當前 Kafka Connect 環境中所有可用的 Connector plugins。
- 從傳回結果中可以看到,JDBC Source 和 Sink Connector 以及 Elasticsearch Sink Connector 都已正確安裝。
設定 MySQL 資料函式庫與 JDBC Source Connector
- 在 MySQL 中建立一個新的資料函式庫和表格,並插入一些範例資料。
- 使用
curl命令查詢 JDBC Source Connector 的組態選項。 - 建立並組態 JDBC Source Connector,將 MySQL 中的資料串流到 Kafka。
程式碼片段:建立 MySQL 資料函式庫和表格
create database test;
use test;
create table login (username varchar(30), login_time datetime);
insert into login values ('gwenshap', now());
insert into login values ('tpalino', now());
內容解密:
- 上述 SQL 陳述式建立了一個名為
test的資料函式庫,並在其中建立了一個名為login的表格。 - 向
login表格中插入了兩條範例資料。
組態 JDBC Source Connector
使用以下命令組態 JDBC Source Connector:
echo '{"name":"mysql-login-connector", "config": {"connector.class":"JdbcSourceConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/test?user=root","mode":"timestamp","table.whitelist":"login","validate.non.null":false,"timestamp.column.name":"login_time","topic.prefix":"mysql."}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
內容解密:
- 該命令透過 Kafka Connect 的 REST API 建立了一個名為
mysql-login-connector的 JDBC Source Connector。 - 組態中指定了連線 MySQL 資料函式庫的 URL、使用者名稱、以及要監控的表格等資訊。
驗證資料串流
使用 Kafka Console Consumer 檢視是否能從 mysql.login 主題中讀取到資料:
bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning
如果出現錯誤或無法讀取到資料,請檢查 Connect worker 的日誌以找出問題所在。
程式碼片段:檢視 Connect worker 日誌中的錯誤資訊
[2016-10-16 19:39:40,482] ERROR Error while starting connector mysql-login-connector (org.apache.kafka.connect.runtime.WorkerConnector:108)
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Access denied for user 'root;'@'localhost' (using password: NO)
內容解密:
- 日誌中記錄了啟動
mysql-login-connector時遇到的錯誤,指出因許可權問題導致連線 MySQL 失敗。 - 需要根據具體錯誤資訊進行相應的調整和修復。
資料串流與Kafka Connect的應用
在前面的章節中,我們已經成功地使用Kafka Connect的JDBC聯結器將MySQL資料函式庫中的登入記錄串流到Kafka主題中。現在,我們將進一步探討如何將這些資料寫入Elasticsearch,以實作更豐富的資料分析和搜尋功能。
變更資料擷取與Debezium專案
目前使用的JDBC聯結器採用JDBC和SQL掃描資料函式庫表格以偵測新記錄。這種方法相對低效,有時甚至不準確。幸運的是,大多數關聯式資料函式庫都具備交易日誌(redo log、binlog或write-ahead log),許多資料函式庫允許外部系統直接從其交易日誌讀取資料,這是一種更準確、更高效的變更資料擷取(Change Data Capture)過程。Debezium專案提供了一系列高品質、開源的變更資料擷取聯結器,適用於多種資料函式庫。如果您計劃將資料從關聯式資料函式庫串流到Kafka,強烈建議使用Debezium變更資料擷取聯結器。
將MySQL資料寫入Elasticsearch
首先,啟動Elasticsearch並驗證其是否正常運作:
gwen$ elasticsearch &
gwen$ curl http://localhost:9200/
接下來,建立並啟動Elasticsearch聯結器:
echo '{"name":"elastic-login-connector", "config": {"connector.class":"ElasticsearchSinkConnector","connection.url":"http://localhost:9200","type.name":"mysql-data","topics":"mysql.login","key.ignore":true}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
設定解說:
connection.url:指定Elasticsearch伺服器的URL。topics:指定要寫入Elasticsearch的Kafka主題,本例中為mysql.login。key.ignore:由於Kafka中的事件缺乏鍵(key),因此設定為true以使用主題名稱、分割區ID和偏移量作為每個事件的鍵。
驗證Elasticsearch索引
檢查是否已建立mysql.login索引:
gwen$ curl 'localhost:9200/_cat/indices?v'
搜尋索引中的記錄:
gwen$ curl -s -X "GET" "http://localhost:9200/mysql.login/_search?pretty=true"
搜尋結果解說:
- 顯示了
mysql.login索引中的兩筆記錄,分別對應MySQL資料函式庫中的兩筆登入記錄。
自定義聯結器與單一訊息轉換
如果現有的聯結器無法滿足需求,您可以自行開發新的聯結器並貢獻給社群。Kafka Connect提供了單一訊息轉換(SMT)的功能,可以在不編寫程式碼的情況下對訊息進行轉換。常見的SMT包括:
Cast:轉換欄位資料型別。MaskField:將敏感資料替換為null。