ksqlDB 提供根據 SQL 的串流處理能力,簡化 Kafka 資料流的處理邏輯。透過 CREATE STREAM AS SELECT 陳述式,可以建立衍生串流,實作資料的過濾、轉換和聚合。文章以 Netflix 變更追蹤應用程式為例,示範如何從 production_changes 串流中提取 season_length 變更,建立 season_length_changes 衍生串流,並設定 Kafka 主題、資料格式、分割槽數和副本數等。此外,ksqlDB 提供 SHOW QUERIES、EXPLAIN 等指令,方便開發者監控和理解查詢執行狀況,並能透過 TERMINATE 指令停止查詢。文章也涵蓋進階用法,例如使用 INNER JOIN 將 season_length_changes 串流與 titles 表格關聯,豐富資料內容,並探討資料型別轉換、重新分割槽等議題,以及如何持久化關聯查詢結果。
使用 ksqlDB 進行串流處理
ksqlDB 提供了一種簡便的方法來處理 Kafka 中的資料流,透過 SQL 陳述式即可實作複雜的資料處理邏輯。本章節將探討如何使用 ksqlDB 建立衍生串流(derived streams)以及如何與底層查詢進行互動。
建立衍生串流
在 ksqlDB 中,衍生串流是透過 CREATE STREAM AS SELECT (CSAS) 陳述式建立的。這類別查詢允許我們對現有的串流或表格進行過濾、轉換和處理,以產生新的資料集。
CREATE STREAM season_length_changes
WITH (
KAFKA_TOPIC = 'season_length_changes',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 4,
REPLICAS = 1
) AS
SELECT
ROWKEY,
title_id,
IFNULL(after->season_id, before->season_id) AS season_id,
before->episode_count AS old_episode_count,
after->episode_count AS new_episode_count,
created_at
FROM production_changes
WHERE change_type = 'season_length'
EMIT CHANGES;
內容解密:
CREATE STREAM season_length_changes: 定義了一個新的衍生串流season_length_changes。WITH子句: 指定了建立串流的相關組態,包括 Kafka 主題、資料格式、分割槽數和副本數。AS SELECT: 定義了用於填充衍生串流的查詢邏輯。IFNULL條件表示式: 用於處理資料完整性問題,確保在after或before物件中選擇正確的season_id。EMIT CHANGES: 表示這是一個連續查詢,會持續輸出變更到結果串流中。
與底層查詢互動
建立衍生串流後,ksqlDB 會生成一個連續查詢。我們可以透過以下陳述式與這些查詢互動:
檢視查詢
SHOW QUERIES;
此命令會列出當前活躍的查詢及其狀態。
解釋查詢
EXPLAIN CSAS_SEASON_LENGTH_CHANGES_0;
此命令提供了有關特定查詢的詳細資訊,包括欄位名稱和型別、ksqlDB 伺服器的狀態,以及用於執行查詢的 Kafka Streams拓撲結構描述。
終止查詢
在刪除一個串流或表格之前,如果有查詢正在寫入該串流或表格,則需要先終止這些查詢。
ksqlDB 的強大之處在於其能夠簡化 Kafka 資料流的處理邏輯,使得使用者可以專注於業務邏輯的實作,而無需深入瞭解底層的 Kafka Streams 實作細節。未來,隨著 ksqlDB 功能的不斷增強,我們可以預期會有更多高效、簡便的資料處理方案出現。
中級與進階串流處理使用 ksqlDB
在上一章中,我們學習瞭如何使用 ksqlDB 進行基本的資料前處理和轉換任務。我們討論的 SQL 陳述式是無狀態的,它們允許我們過濾資料、扁平化複雜或巢狀結構、使用投影來重塑資料等。在本章中,我們將透過討論一些資料豐富和聚合的使用案例來加深對 ksqlDB 的理解。我們將討論的大多數陳述式是有狀態的(例如,涉及多個記錄,這是連線和聚合所必需的),並且是根據時間的(例如,視窗操作),使它們在底層更加複雜,但也更強大。
專案設定
如果您想在逐步完成每個拓撲步驟時參考程式碼,請克隆儲存函式庫並更改為包含本章教程的目錄。以下命令將完成這項工作:
$ git clone git@github.com:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-11/
如前一章所述,我們將使用 Docker Compose 來啟動本教程所需的所有元件(例如,Kafka、ksqlDB 伺服器、CLI 等)。只需在克隆儲存函式庫後執行以下命令即可啟動每個元件:
docker-compose up
除非另有說明,否則本章中討論的 SQL 陳述式將在 ksqlDB CLI 中執行。您可以使用以下命令登入到 ksqlDB CLI:
docker-compose exec ksqldb-cli \
ksql http://ksqldb-server:8088 --config-file /etc/ksqldb-cli/cli.properties
到目前為止,設定與第 10 章幾乎相同。但是,本章建立在前一章的教程基礎上,因此我們實際上需要設定我們的 ksqlDB 環境,使其達到上一章結束時的狀態。這使我們進入了一個非常重要的 ksqlDB 陳述式,我們將在下一節中討論。
從 SQL 檔案引導環境
我們已經完成了部分教程來構建 Netflix 變更追蹤應用程式,因此我們有一組先前建立的查詢,需要重新執行以繼續我們離開的地方。這似乎是本教程特有的,但執行一組查詢來設定您的環境是一種常見的開發工作流程,而 ksqlDB 包含一個特殊的陳述式,使這變得容易。
此陳述式的語法如下:
RUN SCRIPT <sql_file>
SQL 檔案可以包含任意數量的要執行的查詢。
例如,我們可以將前一章的所有查詢(參見例 10-4)放在一個名為 /etc/sql/init.sql 的檔案中,然後執行以下命令來重新建立我們之前使用的集合和查詢:
ksql> RUN SCRIPT '/etc/sql/init.sql' ;
當執行 RUN SCRIPT 時,SQL 檔案中包含的每個陳述式的輸出都將傳回給客戶端。在 CLI 的情況下,輸出只是列印到控制檯。
內容解密:
RUN SCRIPT陳述式用於執行包含多個 SQL 陳述式的 SQL 檔案。- 這對於設定 ksqlDB 環境非常有用,可以重新建立所需的集合和查詢。
- 在本教程中,我們使用它來繼續上一章的工作。
使用連線來組合和豐富資料
在本文中,我們將學習如何使用連線來組合和豐富資料。連線是一種強大的操作,可以用於將多個資料流或表格合併為一個。
執行聚合操作
聚合操作用於對資料進行匯總和分析。在 ksqlDB 中,我們可以使用聚合函式來計算資料的匯總值,例如總和、平均值等。
對物化檢視執行提取查詢
提取查詢是一種特殊的查詢,用於從物化檢視中檢索資料。在 ksqlDB 中,我們可以使用提取查詢來查詢資料的最新狀態。
使用內建的 ksqlDB 函式
ksqlDB 提供了許多內建函式,可以用於執行各種任務,例如字串操作、日期和時間處理等。
建立自定義函式
除了內建函式之外,ksqlDB 還允許我們建立自定義函式,以滿足特定的需求。自定義函式可以使用 Java 語言編寫。
使用ksqlDB進行資料豐富與串流處理
在前一章節中,我們已經使用RUN SCRIPT命令執行了一系列的SQL陳述式來建立我們的Netflix變更追蹤應用程式。在本章節中,我們將繼續探討如何豐富我們的資料流(stream),並使用JOIN操作將不同的資料來源結合起來。
資料豐富(Data Enrichment)
資料豐富是指對原始資料進行改進或增強的過程。它超越了簡單的資料轉換,後者通常只改變資料的格式或結構。資料豐富涉及向資料中新增新的資訊,而JOIN操作是資料函式庫領域中最廣泛使用的豐富技術之一。
在我們的應用程式中,步驟3需要對season_length_changes資料流進行豐富,使其包含來自titles表格的資訊。
JOIN操作
JOIN操作涉及使用一個連線謂詞(join predicate),即一個布林表示式,當找到相關記錄時解析為true,否則為false,將多個資料來源中的相關記錄結合起來。JOIN在關係型和串流世界中都很常見,因為資料通常分散在多個來源中,需要被整合在一起進行處理和分析。
ksqlDB中的JOIN型別
ksqlDB支援多種JOIN型別,可以使用兩個維度來表達:
- 使用的
JOIN表示式(INNER JOIN、LEFT JOIN和FULL JOIN) - 被連線的集合型別(
STREAM、TABLE)
| SQL表示式 | 描述 |
|---|---|
INNER JOIN | 當連線雙方都有相同的鍵值時觸發內連線。 |
LEFT JOIN | 當左側收到一筆記錄時觸發左連線。如果右側沒有匹配的鍵值,則右側的值設為null。 |
FULL JOIN | 當連線任一側收到一筆記錄時觸發全連線。如果另一側沒有匹配的鍵值,則對應的值設為null。 |
ksqlDB中的JOIN組合
| 連線型別 | 支援的表示式 | 是否需要視窗(Windowed) |
|---|---|---|
| Stream-Stream | INNER JOIN、LEFT JOIN、FULL JOIN | 是 |
| Stream-Table | INNER JOIN、LEFT JOIN | 否 |
| Table-Table | INNER JOIN、LEFT JOIN、FULL JOIN | 否 |
值得注意的是,Stream-Stream連線需要視窗化,因為串流是無界的,我們必須將相關記錄的搜尋限制在使用者定義的時間範圍內。
執行JOIN查詢的前提條件
在開始寫我們的JOIN查詢之前,需要注意一些前提條件:
- 連線表示式中參照的所有欄位必須具有相同的資料型別。
- 連線雙方必須具有相同的分割區數量。
- 底層主題中的資料必須使用相同的分割策略寫入。
內容解密:
- 相同資料型別:確保用於連線的欄位具有相同的資料型別,例如都是
STRING或INT。 - 相同的分割區數量:連線雙方的分割區數量必須匹配,以避免錯誤。
- 相同的分割策略:生產者應該使用預設的分割器,根據輸入記錄的鍵建立雜湊值。
例子:豐富season_length_changes串流
我們的預處理資料串流season_length_changes包含一個名為title_id的欄位。我們希望使用這個值來查詢更多關於標題的資訊(包括標題名稱,例如《怪奇物語》或《黑鏡》),這些資訊儲存在titles表格中。
SELECT
s.title_id,
t.title,
s.season_id,
s.old_episode_count,
s.new_episode_count,
s.created_at
FROM season_length_changes s
INNER JOIN titles t
ON s.title_id = t.id
EMIT CHANGES;
內容解密:
INNER JOIN:我們使用內連線,因為只有當titles表格中有對應的記錄時,我們才希望觸發連線。title_id:用於連線titles表格的欄位。EMIT CHANGES:持續輸出變更結果。
在這個例子中,我們展示瞭如何使用ksqlDB進行資料豐富和串流處理,透過結合不同的資料來源來獲得更有價值的資訊。
資料豐富化:使用 ksqlDB 進行資料關聯
在串流處理中,資料豐富化是一個非常重要的步驟,它能夠將多個資料來源的資料進行整合,以提供更完整的資訊。ksqlDB 提供了一種簡單有效的方式來實作資料豐富化。在本章中,我們將探討如何使用 ksqlDB 進行資料關聯,以滿足 Netflix 變更追蹤應用程式的需求。
資料關聯的基本概念
在 ksqlDB 中,資料關聯是透過 JOIN 操作來實作的。JOIN 操作允許我們將兩個或多個資料來源的資料進行整合,根據指定的關聯條件,將相關的資料記錄合併成一個新的記錄。
使用 PRIMARY KEY 進行資料關聯
在進行資料關聯時,使用 PRIMARY KEY 是最有效的方式,因為記錄已經按照這個值在底層的狀態儲存中進行了鍵值對應。這也保證了不會有多於一條記錄與串流記錄相匹配,因為鍵是表中的唯一約束。
SELECT
s.title_id,
t.title,
s.season_id,
s.old_episode_count,
s.new_episode_count,
s.created_at
FROM season_length_changes s
INNER JOIN titles t
ON s.title_id = t.id
EMIT CHANGES;
資料豐富化的輸出結果
上述陳述式的輸出結果顯示了新的豐富化記錄,其中包含了標題名稱(見 TITLE 列):
|TITLE_ID |TITLE |SEASON_ID |OLD_EPISODE_COUNT |NEW_EPISODE_COUNT |CREATED_AT | |
–|
-|
|
|
|
–| |1 |Stranger Things |1 |12 |8 |2021-02-08…|
處理資料型別不匹配的問題
在某些情況下,參與關聯的資料來源可能在關聯屬性上指定了不同的資料型別。例如,如果 s.title_id 是以 VARCHAR 編碼的,而我們需要將其與 ROWKEY (以 INT 編碼)進行關聯。在這種情況下,我們可以使用內建的 CAST 表示式將 s.title_id 轉換為 INT 型別,以滿足第一個關聯先決條件。
SELECT ...
FROM season_length_changes s
INNER JOIN titles t
ON CAST(s.title_id AS INT) = t.id
EMIT CHANGES;
內容解密:
CAST(s.title_id AS INT)將s.title_id從VARCHAR轉換為INT,使其與t.id的型別相匹配。- 這種轉換使得兩個資料來源能夠根據
title_id和id進行正確的關聯。
資料重新分割槽
在某些情況下,可能需要重新分割槽資料才能執行關聯操作。例如,如果 titles 表有 8 個分割槽,而 season_length_changes 表有 4 個分割槽,就需要重新分割槽其中一個集合以匹配另一方的分割槽數量。
CREATE TABLE titles_repartition
WITH (PARTITIONS=4) AS
SELECT * FROM titles
EMIT CHANGES;
內容解密:
CREATE TABLE titles_repartition WITH (PARTITIONS=4)建立了一個新的表titles_repartition,並指定了 4 個分割槽。SELECT * FROM titles EMIT CHANGES;將titles表的資料複製到新的表titles_repartition中。
持久化關聯查詢
為了使關聯查詢的結果持久化,我們需要使用 CREATE STREAM AS SELECT (簡稱 CSAS)陳述式。
CREATE STREAM season_length_changes_enriched
WITH (
KAFKA_TOPIC = 'season_length_changes_enriched',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 4,
TIMESTAMP='created_at',
TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'
) AS
SELECT
s.title_id,
t.title,
s.season_id,
s.old_episode_count,
s.new_episode_count,
s.created_at
FROM season_length_changes s
INNER JOIN titles t
ON s.title_id = t.id
EMIT CHANGES;
內容解密:
CREATE STREAM season_length_changes_enriched ...建立了一個新的串流season_length_changes_enriched,並指定了相關的 Kafka 主題和格式。SELECT ... FROM season_length_changes s INNER JOIN titles t ...執行了與之前相同的關聯查詢,但現在結果將被寫入新的 Kafka 主題中。TIMESTAMP='created_at'指定了用於時間相關操作的時間戳欄位。PARTITIONS = 4指定了新主題的分割槽數量。
時間視窗關聯
在某些應用場景中,可能需要使用時間視窗關聯。時間視窗關聯是一種特殊的關聯操作,它根據時間屬性對記錄進行分組。
時間視窗的概念
時間視窗關聯使用滑動視窗(sliding windows)來分組記錄,這些記錄落在組態的時間邊界內。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 時間視窗的概念
rectangle "within window" as node1
rectangle "outside window" as node2
node1 --> node2
@enduml