ksqlDB 建立在 Kafka 和 Kafka Streams 之上,提供簡化的串流處理 SQL 語法和更易用的操作介面。本文從 ksqlDB 伺服器的啟動與 CLI 的基本操作開始,逐步引導讀者瞭解如何建立串流、插入資料,以及如何使用 auto.offset.reset 引數控制資料讀取的起始位置。接著,文章探討 Kafka Connect 的核心概念,包含聯結器、任務、工作節點、轉換器等,並解釋如何透過 Kafka Connect 將 ksqlDB 與外部資料來源和資料儲存系統整合,實作完整的串流 ETL 流程。最後,文章詳細說明瞭外部模式和嵌入模式的差異、組態方式及適用場景,並提供 Converter 的組態範例,讓讀者能根據實際需求選擇合適的整合模式和序列化格式。
開始使用 ksqlDB
安裝 ksqlDB 後,我們需要了解如何執行 ksqlDB 伺服器。在本文中,我們將學習如何啟動 ksqlDB 伺服器並使用 ksqlDB CLI 提交查詢。
執行 ksqlDB 伺服器
首先,我們需要為 ksqlDB 伺服器建立組態檔案。我們將兩個最重要的組態屬性儲存到名為 ksql-server.properties 的檔案中:
listeners=http://0.0.0.0:8088
bootstrap.servers=kafka:9092
listeners屬性指定了 ksqlDB 伺服器的 REST API 端點,這將繫結到所有 IPv4 介面。bootstrap.servers屬性指定了 Kafka 叢集中的一個或多個代理的主機和埠對,這將用於建立與 Kafka 叢集的連線。
儲存組態檔案後,我們可以使用以下命令啟動 ksqlDB 伺服器:
ksql-server-start ksql-server.properties
如果使用 Docker Compose,則可以在 docker-compose.yml 檔案中設定此命令。啟動後,您應該會在控制檯中看到許多資訊被列印預出來,包括類別似於以下的行:
[2020-11-28 00:53:11,530] INFO ksqlDB API server listening on http://0.0.0.0:8088
這表示 ksqlDB 伺服器已成功啟動並正在監聽指定的埠。
預先建立主題
在本教程中,我們將預先建立 users 主題。可以使用以下命令建立主題:
kafka-topics \
--bootstrap-server localhost:9092 \
--topic users \
--replication-factor 1 \
--partitions 4 \
--create
如果使用 Docker Compose,請在命令前加上 docker-compose exec kafka。
程式碼說明
kafka-topics --bootstrap-server localhost:9092 --topic users --replication-factor 1 --partitions 4 --create
內容解密:
kafka-topics是用於管理 Kafka 主題的命令列工具。--bootstrap-server localhost:9092指定了 Kafka 代理的地址和埠。--topic users指定了要建立的主題名稱。--replication-factor 1指定了主題的副本因子為 1。--partitions 4指定了主題的分割槽數為 4。--create指定了要建立主題。
使用 ksqlDB CLI
現在我們可以使用 ksqlDB CLI 提交查詢。首先,需要執行 ksql 命令並指定 ksqlDB 伺服器的 REST 端點:
ksql http://0.0.0.0:8088
如果使用 Docker Compose,請執行 docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088。
進入 CLI 後,可以執行各種查詢和陳述式。也可以從 CLI 調整各種 ksqlDB 組態。例如,可以執行以下 SET 陳述式以確保查詢從基礎 Kafka 主題的開頭讀取:
SET 'auto.offset.reset' = 'earliest';
程式碼說明
SET 'auto.offset.reset' = 'earliest';
內容解密:
SET陳述式用於設定 ksqlDB 的組態屬性。'auto.offset.reset'是要設定的組態屬性的名稱。'earliest'是該屬性的值,表示查詢將從基礎 Kafka 主題的開頭讀取。
建模資料
我們可以使用 CREATE STREAM DDL 陳述式在 ksqlDB 中建模 users 主題中的資料:
CREATE STREAM users (
ROWKEY INT KEY,
USERNAME VARCHAR
) WITH (
KAFKA_TOPIC='users',
VALUE_FORMAT='JSON'
);
程式碼說明
CREATE STREAM users (
ROWKEY INT KEY,
USERNAME VARCHAR
) WITH (
KAFKA_TOPIC='users',
VALUE_FORMAT='JSON'
);
內容解密:
CREATE STREAM陳述式用於在 ksqlDB 中建立流。users是流的名稱。ROWKEY INT KEY指定了流的鍵的資料型別為 INT。USERNAME VARCHAR指定了流中的一個欄位,名稱為 USERNAME,資料型別為 VARCHAR。WITH子句用於傳遞額外的屬性。KAFKA_TOPIC='users'指定了流對應的 Kafka 主題名稱。VALUE_FORMAT='JSON'指定了 Kafka 主題中記錄值的序列化格式為 JSON。
建立流後,可以使用 INSERT INTO 陳述式插入測試資料:
INSERT INTO users (username) VALUES ('izzy');
INSERT INTO users (username) VALUES ('elyse');
INSERT INTO users (username) VALUES ('mitch');
程式碼說明
INSERT INTO users (username) VALUES ('izzy');
INSERT INTO users (username) VALUES ('elyse');
INSERT INTO users (username) VALUES ('mitch');
內容解密:
INSERT INTO陳述式用於向流中插入資料。users是流的名稱。(username)指定了要插入的欄位名稱。VALUES ('izzy')、VALUES ('elyse')和VALUES ('mitch')分別指定了要插入的值。
資料整合與 ksqlDB
在建立串流處理應用程式時,第一步是考慮要處理的資料目前位於何處,以及最終會將經過豐富化/轉換的資料寫入何處。由於 ksqlDB 在底層使用了 Kafka Streams,因此所建立的應用程式的直接輸入和輸出始終是 Kafka 主題。ksqlDB 也使得與其他資料來源整合變得簡單,包括來自熱門的第三方系統,如 Elasticsearch、PostgreSQL、MySQL、Google PubSub、Amazon Kinesis、MongoDB 等等。
Kafka Connect 簡介
Kafka Connect 是 Kafka 生態系統中的五個 API 之一,用於將外部資料儲存、API 和檔案系統連線到 Kafka。一旦資料進入 Kafka,就可以使用 ksqlDB 對其進行處理、轉換和豐富化。Kafka Connect 的主要元件如下:
聯結器(Connectors)
聯結器是可以安裝在工作節點上的程式碼包,它們促進了 Kafka 與其他系統之間的資料流動,可以分為兩類別:
- 來源聯結器(Source connectors):從外部系統讀取資料到 Kafka
- 接收聯結器(Sink connectors):將資料從 Kafka 寫入外部系統
任務(Tasks)
任務是聯結器內的工作單元。可以組態任務的數量,以控制單個工作節點例項可以執行的任務量。
工作節點(Workers)
工作節點是執行聯結器的 JVM 行程。可以佈署多個工作節點來幫助平行化/分散工作,並在部分故障(例如,一個工作節點離線)的情況下實作容錯。
轉換器(Converters)
轉換器是處理 Connect 中資料序列化和反序列化的程式碼。必須在工作節點級別指定預設轉換器(例如,AvroConverter),但也可以在聯結器級別覆寫轉換器。
Connect 叢集
Connect 叢集是指一個或多個 Kafka Connect 工作節點,共同作為一個群組,將資料移入和移出 Kafka。
外部模式與嵌入模式
在 ksqlDB 中,Kafka Connect 整合可以以兩種不同的模式執行。本文描述每種模式以及何時使用它們。我們將首先檢視外部模式。
外部模式
外部模式是指 Kafka Connect 以獨立的模式執行,不受 ksqlDB 的管理。在這種模式下,需要手動組態和管理 Kafka Connect 工作節點。
嵌入模式
嵌入模式是指 Kafka Connect 被嵌入到 ksqlDB 中,由 ksqlDB 管理。在這種模式下,ksqlDB 將自動處理 Kafka Connect 的組態和管理。
使用 ksqlDB 進行資料整合
ksqlDB 提供了一種簡化的方法來整合外部資料來源和接收器,使用 Kafka Connect API。本章將介紹如何使用 ksqlDB 的高階抽象來處理 Connect API,包括:
- 組態 Kafka Connect 工作節點
- 安裝來源和接收聯結器
- 在 ksqlDB 中建立、刪除和檢查來源和接收聯結器
- 使用 Kafka Connect API 檢查來源和接收聯結器
- 在 Confluent Schema Registry 中檢視聯結器架構
透過本章的學習,您將瞭解使用 ksqlDB 執行串流 ETL(提取、轉換和載入)操作的兩個任務:
- 從外部系統提取資料到 Kafka
- 將資料從 Kafka 載入到外部系統
轉換資料的部分將在接下來的兩章中介紹,因為它與串流處理的關係更密切。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title KsqlDB 串流處理與資料整合實戰
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml此圖示展示了 Kafka Connect 的主要元件及其之間的關係。
內容解密:
此 Plantuml 圖表展示了 Kafka Connect 的架構,包括其主要元件:聯結器、任務、工作節點和轉換器。圖表清晰地呈現了這些元件之間的關係,有助於讀者更好地理解 Kafka Connect 的工作原理。
- 聯結器被分為來源聯結器和接收聯結器,分別負責從外部系統讀取資料到 Kafka 和將資料從 Kafka 寫入外部系統。
- 任務是聯結器內的工作單元,其數量可以根據需要進行組態。
- 工作節點是執行聯結器的 JVM 行程,可以佈署多個工作節點以實作平行化和容錯。
- 轉換器負責處理 Connect 中的資料序列化和反序列化,可以在工作節點級別指定預設轉換器,也可以在聯結器級別覆寫轉換器。
本章介紹了使用 ksqlDB 進行資料整合的基本概念和技術,包括 Kafka Connect 的簡介、外部模式與嵌入模式的比較,以及使用 ksqlDB 進行資料整合的步驟和方法。透過本章的學習,讀者可以瞭解到如何使用 ksqlDB 將外部資料來源和接收器與 Kafka 整合,實作串流 ETL 操作。
Kafka Connect 整合模式:外部模式與嵌入模式
ksqlDB 提供兩種與 Kafka Connect 整合的模式:外部模式(External Mode)與嵌入模式(Embedded Mode)。這兩種模式適用於不同的使用場景和需求。
外部模式(External Mode)
在外部模式下,ksqlDB 連線到一個獨立執行的 Kafka Connect 叢集。這種模式允許 ksqlDB 與 Kafka Connect 叢集分開佈署和管理。要啟用外部模式,需要在 ksqlDB 的組態檔案中設定 ksql.connect.url 屬性,指向 Kafka Connect 叢集的 URL。
ksql.connect.url=http://localhost:8083
外部模式的優缺點
- 優點:
- 可以獨立擴充套件串流處理和資料輸入/輸出工作負載。
- 適合高吞吐量的來源/目標主題。
- 適合已經擁有現有 Kafka Connect 叢集的情況。
- 缺點:
- 需要單獨管理 Kafka Connect 叢集。
- 需要確保聯結器安裝在 Kafka Connect 工作節點上。
嵌入模式(Embedded Mode)
在嵌入模式下,Kafka Connect 工作節點與 ksqlDB 伺服器執行在相同的 JVM 中。這種模式下,Kafka Connect 工作節點以分散式模式執行,可以跨多個協作的工作節點分配工作。
要啟用嵌入模式,需要在 ksqlDB 伺服器的組態檔案中設定 ksql.connect.worker.config 屬性,指向 Kafka Connect 工作節點的組態檔案。
ksql.connect.worker.config=/etc/ksqldb-server/connect.properties
嵌入模式的優缺點
- 優點:
- 可以簡化資料整合的設定和管理。
- 適合低至中等吞吐量的來源/目標主題。
- 可以將串流處理和資料輸入/輸出工作負載一起擴充套件。
- 缺點:
- Kafka Connect 工作節點會與 ksqlDB 伺服器共用計算/記憶體資源。
- 當 ksqlDB 伺服器重新啟動時,Kafka Connect 工作節點也會重新啟動。
組態 Kafka Connect 工作節點
無論是外部模式還是嵌入模式,都需要組態 Kafka Connect 工作節點。以下是一個範例組態:
bootstrap.servers=localhost:9092
group.id=ksql-connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
config.storage.topic=ksql-connect-configs
offset.storage.topic=ksql-connect-offsets
status.storage.topic=ksql-connect-statuses
errors.tolerance=all
plugin.path=/opt/confluent/share/java/
組態說明
bootstrap.servers:Kafka 叢集的初始連線地址。group.id:Kafka Connect 叢集的 ID,具有相同 ID 的工作節點屬於同一個叢集。key.converter和value.converter:控制 Kafka 訊息中鍵和值的序列化格式。config.storage.topic、offset.storage.topic和status.storage.topic:用於儲存聯結器和任務組態相關資訊的主題。errors.tolerance:組態錯誤處理策略,可以設定為none或all。plugin.path:指定安裝聯結器、轉換器和轉換的外掛程式路徑。
設定錯誤處理策略
錯誤處理策略由 errors.tolerance 屬性控制。若設為 none,當發生錯誤時會立即失敗;若設為 all,則可以忽略錯誤或將錯誤路由到指定的 Kafka 主題。
組態選擇建議
- 當需要獨立擴充套件串流處理和資料輸入/輸出工作負載,或期望高吞吐量時,選擇外部模式。
- 當需要簡化資料整合設定,或期望低至中等吞吐量時,選擇嵌入模式。
詳細組態注意事項
- 確保正確設定
ksql.connect.url或ksql.connect.worker.config屬性。 - 正確組態 Kafka Connect 工作節點,包括
bootstrap.servers、group.id、key.converter、value.converter等屬性。 - 根據需求選擇合適的錯誤處理策略。
組態Kafka Connect的Converter與序列化格式
在Kafka Connect中,converter的組態扮演著至關重要的角色,尤其是在資料的序列化和反序列化方面。正確地選擇和組態converter對於確保資料能夠正確地在Kafka Connect和ksqlDB之間傳輸至關重要。
Converter與序列化格式的關係
當我們在ksqlDB中建立一個stream或table時,需要指定資料的序列化格式。例如,在下面的陳述式中,我們告訴ksqlDB users 主題中的資料是以JSON格式序列化的。
CREATE STREAM users (
ROWKEY INT KEY,
USERNAME VARCHAR
) WITH (
KAFKA_TOPIC='users',
VALUE_FORMAT='JSON'
);
然而,當資料來源於Kafka Connect時,資料的序列化格式取決於所使用的converter類別。key.converter 和 value.converter 屬性控制著Kafka Connect如何序列化記錄的鍵和值。
常見的Converter類別及其對應的ksqlDB序列化格式
下表列出了常見的converter類別及其對應的ksqlDB序列化格式:
| 型別 | Converter類別 | 需要Schema Registry | ksqlDB序列化格式 |
|---|---|---|---|
| Avro | io.confluent.connect.avro.AvroConverter | 是 | AVRO |
| Protobuf | io.confluent.connect.protobuf.ProtobufConverter | 是 | PROTOBUF |
| JSON (with Schema Registry) | io.confluent.connect.json.JsonSchemaConverter | 是 | JSON_SR |
| JSON | org.apache.kafka.connect.json.JsonConverter | 否 | JSON |
| String | org.apache.kafka.connect.storage.StringConverter | 否 | KAFKA |
內容解密:
- AvroConverter:使用Avro格式進行序列化,需要Confluent Schema Registry來儲存記錄的schema。
- ProtobufConverter:使用Protobuf格式進行序列化,同樣需要Schema Registry。
- JsonSchemaConverter:使用JSON格式並結合Schema Registry進行序列化。
- JsonConverter:使用JSON格式進行序列化,不需要Schema Registry。但可以透過組態
value.converter.schemas.enable來嵌入schema資訊。 - StringConverter:將資料以字串形式進行序列化,不需要Schema Registry。
組態Converter
為了使用特定的converter,需要在Kafka Connect的工作組態中設定相應的屬性。例如,若要使用AvroConverter,需要新增以下組態:
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
內容解密:
- 這裡指定了使用
AvroConverter來序列化記錄的值,並且設定了Schema Registry的URL。
教學:安裝和使用Connector
在本教學中,我們將使用JDBC源connector從PostgreSQL串流資料到Kafka,然後建立一個Elasticsearch接收connector將資料從Kafka寫入Elasticsearch。
安裝Connector
安裝connector主要有兩種方式:手動安裝和透過Confluent Hub自動安裝。在本文中,我們將使用Confluent提供的CLI工具confluent-hub來安裝connector。
# 使用confluent-hub安裝connector
confluent-hub install <connector-name>
內容解密:
confluent-hub是一個由Confluent開發的CLI工具,用於簡化connector的安裝過程。