Kafka Connect 提供了便捷的機制,能將資料從各種來源系統匯入 Kafka,再串流至目標系統。本文以 MySQL 到 Elasticsearch 的資料串流為例,詳細說明 Kafka Connect 的設定與應用。首先,我們會比較 Standalone 和 Distributed 兩種執行模式,並使用 FileStream Connector 示範基本的資料讀寫流程。接著,會逐步引導讀者安裝和設定 JDBC Connector,建立 MySQL 資料函式庫和表格,並組態 JDBC Source Connector 將資料匯入 Kafka。最後,我們將示範如何使用 Elasticsearch Connector 將 Kafka 中的資料寫入 Elasticsearch,並進行索引驗證和搜尋。過程中,我們也會探討 Debezium 專案提供的變更資料擷取功能,以及如何利用單一訊息轉換功能簡化資料處理流程。

Kafka Connect 實戰:從 MySQL 到 Elasticsearch 的資料串流

Kafka Connect 是 Apache Kafka 的一部分,提供了一種可擴充套件且可靠的方式來連線 Kafka 與外部系統,如資料函式庫、檔案系統和搜尋引擎等。在本篇文章中,我們將探討如何使用 Kafka Connect 將 MySQL 資料函式庫中的資料串流到 Elasticsearch 中進行索引。

瞭解 Kafka Connect 的 Standalone 和 Distributed 模式

在開始之前,我們先來瞭解一下 Kafka Connect 的兩種執行模式:Standalone 和 Distributed。Standalone 模式適用於單機環境,所有 Connector 和 Task 都執行在同一台機器上。Distributed 模式則適用於叢集環境,多台機器共同執行 Connector 和 Task,提供高用性和擴充套件性。

使用 FileStreamSource 和 FileStreamSink Connector

首先,我們使用 Kafka 自帶的 FileStreamSource 和 FileStreamSink Connector 來示範如何將檔案中的資料讀入 Kafka 主題,並將主題中的資料寫入另一個檔案。

步驟 1:啟動 Kafka Connect

bin/connect-distributed.sh config/connect-distributed.properties &

步驟 2:建立 FileStreamSource Connector

echo '{"name":"load-kafka-config", "config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors -H "Content-Type: application/json"

#### 內容解密:

此命令建立了一個名為 load-kafka-config 的 Connector,使用 FileStreamSource 類別讀取 config/server.properties 檔案,並將其內容寫入 kafka-config-topic 主題。

步驟 3:驗證資料是否正確寫入主題

bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning

#### 內容解密:

此命令使用 Kafka Console Consumer 從 kafka-config-topic 主題中讀取資料,並顯示在終端上。

步驟 4:建立 FileStreamSink Connector

echo '{"name":"dump-kafka-config", "config":{"connector.class":"FileStreamSink","file":"copy-of-server-properties","topics":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

#### 內容解密:

此命令建立了一個名為 dump-kafka-config 的 Connector,使用 FileStreamSink 類別將 kafka-config-topic 主題中的資料寫入 copy-of-server-properties 檔案。

將 MySQL 資料串流到 Elasticsearch

現在,我們來示範如何使用 Kafka Connect 將 MySQL 資料函式庫中的資料串流到 Elasticsearch 中。

步驟 1:安裝 MySQL 和 Elasticsearch

brew install mysql
brew install elasticsearch

#### 內容解密:

此命令使用 Homebrew 安裝 MySQL 和 Elasticsearch。

步驟 2:安裝必要的 Connector

您可以透過 Confluent Hub Client 或從原始碼構建來安裝必要的 Connector。

Kafka Connect 與 JDBC Connector 的安裝與設定

Kafka Connect 是一個用於在 Kafka 與外部系統(如資料函式庫、檔案系統等)之間進行資料整合的工具。本文將介紹如何安裝和設定 Kafka Connect 以及 JDBC Connector,以實作 MySQL 資料函式庫與 Kafka 之間的資料串流。

下載和建置 Kafka Connect JDBC Connector

首先,我們需要下載並建置 Kafka Connect JDBC Connector。以下是步驟:

  1. 使用 Git 將 Confluent 提供的 Kafka Connect JDBC Connector 的原始碼下載到本地端:
git clone https://github.com/confluentinc/kafka-connect-jdbc
  1. 進入專案目錄並使用 Maven 建置專案:
cd kafka-connect-jdbc
mvn install -DskipTests
  1. 同樣地,下載並建置 Kafka Connect Elasticsearch Connector(本文雖提及,但主要聚焦於 JDBC Connector)。

設定 Kafka Connect

  1. 建立一個目錄來存放 Connector 的 JAR 檔案,例如 /opt/connectors
  2. 修改 config/connect-distributed.properties 檔案,將 plugin.path 設定為 /opt/connectors
  3. 將建置好的 JDBC Connector JAR 檔案及其依賴項複製到 /opt/connectors/jdbc 目錄下。
  4. 由於 JDBC Connector 需要 MySQL JDBC Driver,因此需從 MySQL 官方網站下載該驅動程式,並將其放置於 /opt/connectors/jdbc 目錄下。

程式碼片段:建立目錄並複製 JAR 檔案

mkdir /opt/connectors/jdbc
mkdir /opt/connectors/elastic
cp .../kafka-connect-jdbc/target/kafka-connect-jdbc-10.3.x-SNAPSHOT.jar /opt/connectors/jdbc
cp ../kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-11.1.0-SNAPSHOT.jar /opt/connectors/elastic
cp ../kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-11.1.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/* /opt/connectors/elastic

內容解密:

  • 上述步驟首先建立了兩個目錄,分別用於存放 JDBC 和 Elasticsearch Connector 的檔案。
  • 將建置好的 JAR 檔案複製到對應的目錄中,以供 Kafka Connect 使用。
  • 特別注意,Elasticsearch Connector 的相關檔案需要被複製到指定的目錄下。

啟動 Kafka Connect 並驗證 Connector

  1. 重新啟動 Kafka Connect workers。
  2. 使用 curl 命令查詢可用的 Connector plugins:
curl http://localhost:8083/connector-plugins

這將列出所有可用的 Connector,包括新安裝的 JDBC Connector。

程式碼片段:查詢 Connector plugins

[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "11.1.0-SNAPSHOT"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "10.3.x-SNAPSHOT"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "10.3.x-SNAPSHOT"
  }
]

內容解密:

  • 該命令傳回一個 JSON 物件,列出了當前 Kafka Connect 環境中所有可用的 Connector plugins。
  • 從傳回結果中可以看到,JDBC Source 和 Sink Connector 以及 Elasticsearch Sink Connector 都已正確安裝。

設定 MySQL 資料函式庫與 JDBC Source Connector

  1. 在 MySQL 中建立一個新的資料函式庫和表格,並插入一些範例資料。
  2. 使用 curl 命令查詢 JDBC Source Connector 的組態選項。
  3. 建立並組態 JDBC Source Connector,將 MySQL 中的資料串流到 Kafka。

程式碼片段:建立 MySQL 資料函式庫和表格

create database test;
use test;
create table login (username varchar(30), login_time datetime);
insert into login values ('gwenshap', now());
insert into login values ('tpalino', now());

內容解密:

  • 上述 SQL 陳述式建立了一個名為 test 的資料函式庫,並在其中建立了一個名為 login 的表格。
  • login 表格中插入了兩條範例資料。

組態 JDBC Source Connector

使用以下命令組態 JDBC Source Connector:

echo '{"name":"mysql-login-connector", "config": {"connector.class":"JdbcSourceConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/test?user=root","mode":"timestamp","table.whitelist":"login","validate.non.null":false,"timestamp.column.name":"login_time","topic.prefix":"mysql."}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

內容解密:

  • 該命令透過 Kafka Connect 的 REST API 建立了一個名為 mysql-login-connector 的 JDBC Source Connector。
  • 組態中指定了連線 MySQL 資料函式庫的 URL、使用者名稱、以及要監控的表格等資訊。

驗證資料串流

使用 Kafka Console Consumer 檢視是否能從 mysql.login 主題中讀取到資料:

bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

如果出現錯誤或無法讀取到資料,請檢查 Connect worker 的日誌以找出問題所在。

程式碼片段:檢視 Connect worker 日誌中的錯誤資訊

[2016-10-16 19:39:40,482] ERROR Error while starting connector mysql-login-connector (org.apache.kafka.connect.runtime.WorkerConnector:108)
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Access denied for user 'root;'@'localhost' (using password: NO)

內容解密:

  • 日誌中記錄了啟動 mysql-login-connector 時遇到的錯誤,指出因許可權問題導致連線 MySQL 失敗。
  • 需要根據具體錯誤資訊進行相應的調整和修復。

資料串流與Kafka Connect的應用

在前面的章節中,我們已經成功地使用Kafka Connect的JDBC聯結器將MySQL資料函式庫中的登入記錄串流到Kafka主題中。現在,我們將進一步探討如何將這些資料寫入Elasticsearch,以實作更豐富的資料分析和搜尋功能。

變更資料擷取與Debezium專案

目前使用的JDBC聯結器採用JDBC和SQL掃描資料函式庫表格以偵測新記錄。這種方法相對低效,有時甚至不準確。幸運的是,大多數關聯式資料函式庫都具備交易日誌(redo log、binlog或write-ahead log),許多資料函式庫允許外部系統直接從其交易日誌讀取資料,這是一種更準確、更高效的變更資料擷取(Change Data Capture)過程。Debezium專案提供了一系列高品質、開源的變更資料擷取聯結器,適用於多種資料函式庫。如果您計劃將資料從關聯式資料函式庫串流到Kafka,強烈建議使用Debezium變更資料擷取聯結器。

將MySQL資料寫入Elasticsearch

首先,啟動Elasticsearch並驗證其是否正常運作:

gwen$ elasticsearch &
gwen$ curl http://localhost:9200/

接下來,建立並啟動Elasticsearch聯結器:

echo '{"name":"elastic-login-connector", "config": {"connector.class":"ElasticsearchSinkConnector","connection.url":"http://localhost:9200","type.name":"mysql-data","topics":"mysql.login","key.ignore":true}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

設定解說:

  • connection.url:指定Elasticsearch伺服器的URL。
  • topics:指定要寫入Elasticsearch的Kafka主題,本例中為mysql.login
  • key.ignore:由於Kafka中的事件缺乏鍵(key),因此設定為true以使用主題名稱、分割區ID和偏移量作為每個事件的鍵。

驗證Elasticsearch索引

檢查是否已建立mysql.login索引:

gwen$ curl 'localhost:9200/_cat/indices?v'

搜尋索引中的記錄:

gwen$ curl -s -X "GET" "http://localhost:9200/mysql.login/_search?pretty=true"

搜尋結果解說:

  • 顯示了mysql.login索引中的兩筆記錄,分別對應MySQL資料函式庫中的兩筆登入記錄。

自定義聯結器與單一訊息轉換

如果現有的聯結器無法滿足需求,您可以自行開發新的聯結器並貢獻給社群。Kafka Connect提供了單一訊息轉換(SMT)的功能,可以在不編寫程式碼的情況下對訊息進行轉換。常見的SMT包括:

  • Cast:轉換欄位資料型別。
  • MaskField:將敏感資料替換為null。