Kafka Streams 提供兩種 API 建構串流應用程式:高階 DSL 和低階 Processor API。DSL 簡化開發流程,Processor API 則提供更精細的控制。本文首先以一個簡單的應用程式示範如何使用 DSL 從 “users” 主題讀取資料並印出問候訊息,涵蓋 StreamsBuilderKStreamforeach 等核心概念,同時也提供完整的程式碼範例與執行步驟。接著,引導讀者使用 Processor API 建構相同功能的拓撲,包含 TopologyaddSourceaddProcessor 等方法以及自定義 Processor 的實作。文章進一步闡述 Kafka Streams 中 Stream 和 Table 的概念,說明兩者差異與應用場景,並以 SSH 登入日誌為例,解釋如何選擇合適的資料模型。最後,文章討論無狀態處理的應用,列舉常用的無狀態運算子,並以 Twitter 資料處理為例,示範如何運用這些運算子進行過濾、轉換等操作,並簡述加密貨幣市場情緒分析的實戰演練。

使用DSL建立簡單的Kafka Streams應用程式

建立拓撲

首先,使用StreamsBuilder建立處理器拓撲:

StreamsBuilder builder = new StreamsBuilder();

內容解密:

  • StreamsBuilder:用於建立Kafka Streams處理器拓撲的主要類別。

新增源處理器

新增源處理器以從Kafka主題讀取資料:

KStream<Void, String> stream = builder.stream("users");

內容解密:

  1. builder.stream("users"):從名為users的主題讀取資料,並將其建模為流。
  2. KStream<Void, String>:表示流的鍵和值型別。在此例中,鍵為空(Void),值為字串型別。

新增流處理器

使用foreach運算元列印簡單的問候訊息:

stream.foreach(
    (key, value) -> {
        System.out.println("(DSL) Hello, " + value);
    });

內容解密:

  • foreach運算元:用於對流中的每個記錄執行指定的動作。

建置拓撲並啟動應用程式

建置拓撲並啟動Kafka Streams應用程式:

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

內容解密:

  1. builder.build():建置處理器拓撲。
  2. KafkaStreams:代表Kafka Streams應用程式的主要類別。
  3. streams.start():啟動Kafka Streams應用程式。

完整程式碼範例

完整的DSL範例程式碼如下(Example 2-1):

class DslExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<Void, String> stream = builder.stream("users");
        stream.foreach(
            (key, value) -> {
                System.out.println("(DSL) Hello, " + value);
            });
        
        Properties config = ...; // 省略組態細節
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
        
        // 當JVM關閉時關閉Kafka Streams(例如:SIGTERM)
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

內容解密:

  1. 建置拓撲:使用StreamsBuilder建立拓撲。
  2. 新增源處理器:從users主題讀取資料。
  3. 使用DSL的foreach運算元:列印簡單訊息。
  4. 建置拓撲並啟動:啟動Kafka Streams應用程式。
  5. 關閉Kafka Streams:在JVM關閉時優雅地關閉。

執行應用程式

執行以下命令啟動DSL版本的應用程式:

./gradlew runDSL --info

接著,使用kafka-console-producer生產資料到users主題:

docker-compose exec kafka bash
kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic users

輸入使用者名稱並按下Enter鍵,即可看到Kafka Streams應用程式輸出的問候訊息。

Kafka Streams 拓撲結構與 Processor API 實作

在前面的章節中,我們已經驗證了應用程式的正常運作。接下來,我們將探討如何使用低階的 Processor API 來建立相同的 Kafka Streams 拓撲結構。

Processor API 簡介

Processor API 相較於高階的 DSL(Domain Specific Language),缺乏一些抽象化的操作,但其語法更直接地反映了處理器拓撲的建立過程。我們需要使用 Topology.addSourceTopology.addProcessorTopology.addSink 等方法來定義拓撲結構。

建立 Topology 例項

首先,我們需要建立一個新的 Topology 例項:

Topology topology = new Topology();

新增 Source Processor

接下來,我們新增一個 source processor 來讀取 users 主題的資料,並建立一個 stream processor 來列印簡單的問候訊息。stream processor 會參考一個名為 SayHelloProcessor 的類別:

topology.addSource("UserSource", "users");
topology.addProcessor("SayHello", SayHelloProcessor::new, "UserSource");

在上述程式碼中,addSource 方法的第一個引數是這個 stream processor 的任意名稱(此例中為 “UserSource”)。第二個引數是這個 source processor 應該讀取的主題名稱(此例中為 “users”)。

建立 SayHelloProcessor 類別

在 Processor API 中,我們需要實作 Processor 介面來建立自訂的 stream processor。Processor 介面定義了三個方法:initprocessclose。以下是一個簡單的 SayHelloProcessor 實作範例:

public class SayHelloProcessor implements Processor<Void, String, Void, Void> {
    @Override
    public void init(ProcessorContext<Void, Void> context) {}

    @Override
    public void process(Record<Void, String> record) {
        System.out.println("(Processor API) Hello, " + record.value());
    }

    @Override
    public void close() {}
}

內容解密:

  1. 泛型引數說明:在 Processor 介面中,前兩個泛型引數(此例中為 VoidString)代表輸入鍵和值的型別。由於我們的鍵是 null,而值是使用者名稱(字串),因此選擇 VoidString 是合適的。後兩個泛型引數(此例中為 VoidVoid)代表輸出鍵和值的型別。在此例中,由於我們沒有將任何輸出鍵或值轉發到下游,因此使用 Void 是合適的。
  2. process 方法:處理邏輯位於 process 方法中。在此例中,我們列印了一個簡單的問候訊息。注意,Record 介面中的泛型引數指的是輸入記錄的鍵和值型別。
  3. initclose 方法:在此例中,由於不需要特殊的初始化和清理工作,因此這兩個方法的實作都是空的。

執行程式碼

我們可以使用與 DSL 範例相同的命令來執行程式碼:

./gradlew runProcessorAPI --info

你應該會看到以下輸出,表明你的 Kafka Streams 應用程式正在正常運作:

(Processor API) Hello, angie
(Processor API) Hello, guy
(Processor API) Hello, kate
(Processor API) Hello, mark

Streams 和 Tables

在 Kafka Streams 中,我們可以將資料視為 stream 或 table。stream 代表一連串的記錄,而 table 則代表 changelog stream,即記錄的變更歷程。

Stream 和 Table 的比較

假設我們有一個主題包含 SSH 日誌記錄,每個記錄都以使用者 ID 為鍵,如下表所示:

使用者 ID登入時間IP 位址
user12023-03-01 12:00:00192.168.1.100
user22023-03-01 12:05:00192.168.1.200
user12023-03-01 12:10:00192.168.1.150

在 stream 的視角下,我們會看到三個獨立的記錄。然而,在 table 的視角下,我們會看到每個使用者 ID 對應的最新登入記錄。

何時使用 Stream 或 Table

選擇使用 stream 或 table 取決於具體的使用場景。如果我們需要處理一連串的事件,那麼 stream 是合適的選擇。如果我們需要維護某個實體的最新狀態,那麼 table 是更好的選擇。

在 Kafka Streams 中,我們可以使用 DSL 運算子(如 streamtable)來將主題中的資料視為 stream 或 table。根據具體的需求,選擇適合的資料模型,可以幫助我們更有效地處理和分析資料。

資料流與表格的對偶性

在處理 Kafka Streams 中的資料時,我們需要決定使用哪種抽象化概念:資料流(stream)或表格(table)。這個決策取決於是否要追蹤某個鍵(key)的最新狀態或全部歷史訊息。

資料流(Streams)

資料流可以被視為資料函式庫中的插入(insert)操作。每個不同的記錄都保留在這種日誌檢視中。我們的 topic 的資料流表示如表 2-3 所示。

表 2-3. SSH 日誌的資料流檢視

鍵(Key)值(Value)偏移量(Offset)
mitch{“action”: “login”}0
mitch{“action”: “logout”}1
elyse{“action”: “login”}2
isabelle{“action”: “login”}3

表格(Tables)

表格可以被視為資料函式庫中的更新(update)操作。在這種日誌檢視中,只保留每個鍵的最新狀態。表格通常由壓縮後的 topic 建立。我們的 topic 的表格表示如表 2-4 所示。

表 2-4. SSH 日誌的表格檢視

鍵(Key)值(Value)偏移量(Offset)
mitch{“action”: “logout”}1
elyse{“action”: “login”}2
isabelle{“action”: “login”}3

表格本質上是有狀態的,通常用於在 Kafka Streams 中執行聚合操作。我們也可以對表格進行數學聚合,例如計算某個鍵的滾動計數,如表 2-5 所示。

表 2-5. SSH 日誌的聚合表格檢視

鍵(Key)值(Value)偏移量(Offset)
mitch21
elyse12
isabelle13

資料流與表格的對偶性實作

Kafka Streams 使用鍵值儲存(預設為 RocksDB)來實作表格。透過消費有序的事件流並只保留每個鍵的最新記錄,我們可以在客戶端建立表格或對映式的資料表示。

以下 Java 程式碼展示瞭如何從資料流建立表格:

import java.util.Map.Entry;

var stream = List.of(
    Map.entry("a", 1),
    Map.entry("b", 1),
    Map.entry("a", 2));
var table = new HashMap<>();
stream.forEach((record) -> table.put(record.getKey(), record.getValue()));

輸出結果如下:

stream ==> [a=1, b=1, a=2]
table ==> {a=2, b=1}

程式碼解析:

  1. 初始化資料流:使用 List.of() 方法建立一個包含有序記錄的資料流。
  2. 建立表格:使用 HashMap 建立一個空的表格。
  3. 迭代資料流並更新表格:使用 forEach 方法遍歷資料流,並將每個記錄的鍵值對更新到表格中。
  4. 輸出結果:列印資料流和表格的內容,以展示兩者的差異。

KStream、KTable 和 GlobalKTable

Kafka Streams 提供了一組高階別抽象化概念,使處理資料流和表格變得更加容易。KStreamKTableGlobalKTable 是 Kafka Streams 中用於處理資料流和表格的核心抽象化概念。

這些抽象化概念使得開發人員能夠更輕鬆地處理 Kafka 中的資料,並提供了更強大的資料處理能力。接下來,我們將探討這些抽象化概念的使用方法和應用場景。

無狀態處理在 Kafka Streams 中的應用

在 Kafka Streams 中,無狀態處理是一種最簡單的流處理方式,它不需要記憶之前處理過的事件。每個事件被消費、處理後就會被遺忘。本章將探討 Kafka Streams 中的無狀態運算子,並瞭解如何使用它們來完成常見的流處理任務。

無狀態與有狀態處理的區別

在構建 Kafka Streams 應用程式時,首要考慮的是是否需要有狀態處理。無狀態應用程式中,每個事件都是獨立處理的,不需要記憶之前的事件。有狀態應用程式則需要記住之前事件的資訊,通常用於聚合、視窗或連線事件流。

無狀態處理

無狀態應用程式只使用無狀態運算子,如 filtermap 等,這些運算子只需要檢視當前記錄即可執行操作。

有狀態處理

有狀態應用程式需要使用有狀態運算子,如 countaggregate 等,這些運算子需要知道之前的事件資訊。

無狀態運算子的使用

Kafka Streams 提供了豐富的無狀態運算子,用於處理資料。這些運算子包括:

  • 過濾記錄:使用 filter 運算子過濾不需要的記錄。
  • 新增和刪除欄位:使用 map 運算子轉換記錄。
  • 重新鍵值:使用 selectKey 運算子重新設定記錄的鍵值。
  • 分支流:使用 branch 運算子根據條件將流分成多個分支。
  • 合併流:使用 merge 操作符合併多個流。
  • 轉換記錄:使用 mapflatMap 運算子轉換記錄成一個或多個輸出。
  • 獨立豐富記錄:使用 map 運算子豐富記錄。

程式碼範例:使用無狀態運算子處理 Twitter 資料

KStream<String, String> twitterStream = builder.stream("twitter_topic");

// 過濾不需要的記錄
KStream<String, String> filteredStream = twitterStream.filter(
    (key, value) -> value.contains("比特幣")
);

// 轉換記錄
KStream<String, String> transformedStream = filteredStream.map(
    (key, value) -> new KeyValue<>(key, value.toUpperCase())
);

// 輸出結果
transformedStream.to("output_topic");

內容解密:

  1. 過濾不需要的記錄:在這個範例中,我們使用 filter 運算子過濾出包含「比特幣」的推文。這樣可以減少不必要的資料處理。
  2. 轉換記錄:使用 map 運算子將推文內容轉換成大寫。這是無狀態處理的一個例子,因為它不需要記憶之前的事件。
  3. 輸出結果:最後,將處理後的資料輸出到指定的 Kafka 主題中。

無狀態處理的實戰演練:加密貨幣市場情緒分析

在本章中,我們將探討如何使用 Kafka Streams 建立一個無狀態的流處理應用程式,以分析加密貨幣市場的情緒。我們的目標是建立一個能夠處理 Twitter 資料流的應用程式,計算市場對不同加密貨幣的情緒指數,並將結果輸出到一個 Kafka 主題中,供下游的交易演算法使用。

介紹本章的實戰演練:處理 Twitter 資料流

本章的實戰演練將圍繞一個虛擬的交易軟體展開,該軟體需要評估市場對不同加密貨幣(如比特幣、以太坊、瑞波幣等)的情緒,以做出投資或撤資的決定。由於數百萬人使用 Twitter 分享他們對加密貨幣和其他話題的看法,因此我們將使用 Twitter 作為我們的資料來源。

處理流程設計

在開始之前,讓我們先看看建立我們的流處理應用程式所需的步驟。這些需求將幫助我們設計一個處理器拓撲結構,作為建立無狀態 Kafka Streams 應用程式的。以下是關鍵步驟:

  1. 消費 tweets 資料:從名為 tweets 的來源主題中消費提到特定數位貨幣(例如 #bitcoin、#ethereum)的推文。

    • 由於每條記錄都是 JSON 編碼的,我們需要找出如何正確地將這些記錄反序列化為更高層級的資料類別。
    • 在反序列化過程中,應移除不需要的欄位以簡化程式碼。這種只選擇部分欄位進行處理的做法稱為投影,是流處理中最常見的任務之一。
  2. 排除轉發推文:排除轉發的推文,這需要進行某種形式的資料過濾。

  3. 分支非英文推文:將非英文推文分支到一個單獨的資料流中進行翻譯。

  4. 翻譯非英文推文:將非英文推文翻譯成英文,這涉及將一個輸入值(非英文推文)對映到一個新的輸出值(英文翻譯推文)。

  5. 合併翻譯後的推文:將翻譯後的推文與英文推文資料流合併,建立一個統一的資料流。

  6. 計算情緒指數:為每條推文新增情緒指數,表示 Twitter 使用者在討論特定數位貨幣時的情緒是正面還是負面。由於一條推文可能提到多種加密貨幣,我們將展示如何使用 flatMap 運算子將每個輸入(推文)轉換為可變數量的輸出。

  7. 輸出結果:使用 Avro 序列化增強後的推文,並將其寫入名為 crypto-sentiment 的輸出主題中。我們的虛擬交易演算法將從這個主題中讀取資料,並根據所看到的訊號做出投資決策。

專案設定

本章的程式碼位於 https://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git。如果您想在我們逐步完成每個拓撲步驟時參考程式碼,請克隆該儲存函式庫並切換到包含本章教程的目錄。

$ git clone git@github.com:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-03/crypto-sentiment

您可以隨時透過執行以下命令來構建專案:

$ ./gradlew build --info

新增 KStream 源處理器

所有 Kafka Streams 應用程式都有一個共同點:它們從一個或多個來源主題中消費資料。在本教程中,我們只有一個來源主題:tweets。該主題由 Twitter 源聯結器填充,該聯結器從 Twitter 的流式 API 中串流推文,並將 JSON 編碼的推文記錄寫入 Kafka。

程式碼範例

// 建立 KStream 源處理器
KStream<String, String> tweets = builder.stream("tweets");

內容解密:

此段程式碼建立了一個 KStream 物件 tweets,用於從名為 “tweets” 的 Kafka 主題中讀取資料。由於我們的資料是 JSON 編碼的,我們需要在稍後的步驟中對其進行反序列化處理。