Kafka Streams 提供強大的狀態處理能力,允許開發者構建複雜的事件驅動應用程式。透過分組操作,可以確保相關記錄由相同的 Kafka Streams 任務處理,而聚合操作則可以對分組後的資料進行更深入的分析,例如計算總和、平均值或計數。理解 KStream 與 GlobalKTable 的連線方式對於根據記錄值屬性進行連線至關重要。此外,Kafka Streams 利用內部主題(如重新分割槽主題和變更日誌主題)來支援狀態儲存和容錯機制。在實際應用中,可以利用聚合運算元(如 aggregatereduce)來合併多個輸入值,例如計算每個遊戲的最高分數。aggregate 運算元允許指定不同的輸出記錄型別,提供更大的靈活性。為了實作更複雜的聚合邏輯,可以建立自定義聚合類別,並搭配初始化器和加法器函式使用。

Kafka Streams 中的狀態處理:分組與聚合

在 Kafka Streams 中,狀態處理是構建複雜事件驅動應用程式的關鍵部分。其中,分組和聚合是兩個重要的操作,允許開發者對資料流進行更深入的處理和分析。

分組記錄

在進行任何流或表聚合之前,必須先對 KStream 或 KTable 進行分組。分組的目的是確保相關記錄由相同的觀察者或 Kafka Streams 任務處理。

分組流

有兩種運算子可用於分組 KStream:groupBygroupByKey

  • groupBy 是一個更改鍵的運算子,會導致 Kafka Streams 標記流需要重新分割槽。如果下游運算子讀取新的鍵,Kafka Streams 將自動建立一個重新分割槽主題並將資料路由回 Kafka 以完成重新鍵控的過程。
KGroupedStream<String, Enriched> grouped =
    withProducts.groupBy(
        (key, value) -> value.getProductId().toString(),
        Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
  • groupByKey 不會標記流需要重新分割槽,因此效能更好,因為它避免了將資料發送回 Kafka 進行重新分割槽的額外網路呼叫。
KGroupedStream<String, Enriched> grouped =
    withProducts.groupByKey(
        Grouped.with(Serdes.String(), JsonSerdes.Enriched()));

分組表

對於 KTable,分組操作只有 groupBy 可用,並且傳回一個不同的中間表示:KGroupedTable。

KGroupedTable<String, Player> groupedPlayers =
    players.groupBy(
        (key, value) -> KeyValue.pair(key, value),
        Grouped.with(Serdes.String(), JsonSerdes.Player()));

聚合

完成分組後,就可以進行聚合操作。KGroupedStream 和 KGroupedTable 都允許開發者執行各種聚合操作,如計算總和、平均值或計數等。

KStream 到 GlobalKTable 的連線

在進行聚合之前,瞭解如何在 Kafka Streams 中進行連線操作是非常重要的。特別是 KStream 到 GlobalKTable 的連線,它允許開發者根據記錄值中的某些屬性進行連線,而不是依賴於鍵的分割槽。

KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
    (leftKey, scoreWithPlayer) -> {
        return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
    };

KStream<String, Enriched> withProducts =
    withPlayers.join(products, keyMapper, productJoiner);

內部主題

Kafka Streams 在內部建立了一些主題,如重新分割槽主題和變更日誌主題,用於支援狀態儲存和容錯。

$ kafka-topics --bootstrap-server kafka:9092 --list
players
products
score-events
dev-KSTREAM-KEY-SELECT-0000000001-repartition
dev-players-STATE-STORE-0000000002-changelog

Kafka Streams 中的聚合運算

在建立排行榜拓撲的最後一步中,我們需要計算每個遊戲的最高分數。Kafka Streams 提供了一組運算元,使得執行這類別聚合運算變得非常容易。

聚合運算簡介

聚合運算是將多個輸入值合併成一個輸出值的方法。雖然我們通常將聚合運算視為數學運算,但它們不一定是數學運算。count 是一種計算每個鍵的事件數量的數學運算,而 aggregatereduce 運算元則更為通用,可以使用您指定的任何組合邏輯來合併值。

  • aggregatereduce 的主要差異在於傳回型別。reduce 運算元要求聚合的輸出型別與輸入型別相同,而 aggregate 運算元可以指定不同的輸出記錄型別。

對流進行聚合

在本文中,我們將學習如何對記錄流進行聚合運算。這涉及到建立一個用於初始化新聚合值的函式(稱為初始化器)和一個用於在給定鍵的新記錄到來時執行後續聚合的函式(稱為加法器函式)。

初始化器

當 Kafka Streams 拓撲遇到新的鍵時,我們需要某種方式來初始化聚合。幫助我們完成這項工作的介面是 Initializer。與 Kafka Streams API 中的許多類別一樣,Initializer 是一個功能介面(即包含單一方法),因此可以定義為 lambda 表示式。

Initializer<Long> countInitializer = () -> 0L;

對於更複雜的聚合運算,您可以提供自己的自定義初始化器。例如,要實作視訊遊戲排行榜,我們需要某種方式來計算給定遊戲的前三名最高分數。

自定義聚合類別:HighScores

我們將建立一個名為 HighScores 的自定義類別來作為我們的聚合類別。該類別需要一些底層資料結構來儲存給定視訊遊戲的前三名分數。

public class HighScores {
    private final TreeSet<Enriched> highScores = new TreeSet<>();
    
    public HighScores add(final Enriched enriched) {
        highScores.add(enriched);
        if (highScores.size() > 3) {
            highScores.remove(highScores.last());
        }
        return this;
    }
}

初始化 HighScores

Initializer<HighScores> highScoresInitializer = HighScores::new;

加法器

接下來,我們需要定義用於合併兩個聚合的邏輯。這是透過 Aggregator 介面完成的,該介面與 Initializer 一樣,是一個功能介面,可以使用 lambda 表示式來實作。

Aggregator<String, Enriched, HighScores> highScoresAdder =
    (key, value, aggregate) -> aggregate.add(value);

HighScores 類別中實作 add 方法,以將新的最高分數新增到內部的 TreeSet 中,並在分數超過三個時移除最低分數。

為了使 TreeSet 能夠知道如何排序 Enriched 物件(並因此能夠識別最低分數的 Enriched 記錄以在最高分數聚合超過三個值時移除),我們將實作 Comparable 介面。

內容解密:

  1. 初始化器(Initializer):用於在遇到新鍵時初始化聚合值。可以是簡單的 lambda 表示式,如 countInitializer,也可以是自定義的類別例項化,如 HighScores::new
  2. 加法器(Adder):用於將新記錄新增到現有的聚合值中。在 HighScores 的例子中,加法器邏輯被封裝在 add 方法中,該方法將新的 Enriched 物件新增到 TreeSet 中,並保持集合大小不超過三個元素。
  3. TreeSet:用於儲存和管理前三名最高分數。由於 TreeSet 是有序集合,因此它能夠自動排序分數,並在需要時移除最低分數。
  4. Comparable 介面:為了使 TreeSet 能夠正確排序 Enriched 物件,需要在 Enriched 類別中實作 Comparable 介面,以定義物件之間的比較邏輯。

影片遊戲排行榜應用程式的狀態處理

在開發影片遊戲排行榜應用程式時,我們需要處理大量的玩家資料和分數事件。本章節將探討如何使用Kafka Streams進行狀態處理,包括聚合、分組和連線操作。

更新Enriched類別以實作Comparable介面

為了比較不同的Enriched物件,我們需要更新Enriched類別以實作Comparable介面。這樣,我們就可以根據分數對Enriched物件進行排序。

public class Enriched implements Comparable<Enriched> {
    @Override
    public int compareTo(Enriched o) {
        return Double.compare(o.score, score);
    }
    // 省略其他程式碼
}

內容解密:

  • Enriched類別實作了Comparable介面,以便根據分數對物件進行比較。
  • compareTo方法使用Double.compare來比較兩個Enriched物件的分數。

使用Kafka Streams的aggregate運算元進行高分聚合

現在我們已經有了初始化器和加法器函式,我們可以使用Kafka Streams的aggregate運算元來進行高分聚合。

KTable<String, HighScores> highScores =
    grouped.aggregate(highScoresInitializer, highScoresAdder);

內容解密:

  • grouped.aggregate方法使用初始化器和加法器函式來進行聚合操作。
  • highScoresInitializer初始化聚合的值,而highScoresAdder則將新的Enriched物件新增到聚合中。

聚合表格

聚合表格的過程與聚合流相似,但由於表格是可變的,因此需要能夠在鍵被刪除時更新聚合值。

KGroupedTable<String, Player> groupedPlayers =
    players.groupBy(
        (key, value) -> KeyValue.pair(key, value),
        Grouped.with(Serdes.String(), JsonSerdes.Player()));
groupedPlayers.aggregate(
    () -> 0L,
    (key, value, aggregate) -> aggregate + 1L,
    (key, value, aggregate) -> aggregate - 1L);

內容解密:

  • 初始化器函式將聚合初始化為0。
  • 加法器函式在看到新的鍵時增加當前計數。
  • 減法器函式在鍵被刪除時減少當前計數。

結合所有處理步驟

現在我們已經構建了排行榜拓撲的各個處理步驟,讓我們將它們結合起來。範例4-8展示了我們迄今為止建立的拓撲步驟如何結合在一起。

// 建立StreamsBuilder例項
StreamsBuilder builder = new StreamsBuilder();

// 註冊分數事件流
KStream<String, ScoreEvent> scoreEvents =
    builder.stream(
        "score-events",
        Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent()))
    .selectKey((k, v) -> v.getPlayerId().toString());

// 建立分割槽玩家表格
KTable<String, Player> players =
    builder.table("players", Consumed.with(Serdes.String(), JsonSerdes.Player()));

// 建立全域產品表格
GlobalKTable<String, Product> products =
    builder.globalTable(
        "products",
        Consumed.with(Serdes.String(), JsonSerdes.Product()));

// 連線引數,用於scoreEvents和players的連線操作
Joined<String, ScoreEvent, Player> playerJoinParams =
    Joined.with(Serdes.String(), JsonSerdes.ScoreEvent(), JsonSerdes.Player());

// 連線scoreEvents和players
ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner =
    (score, player) -> new ScoreWithPlayer(score, player);
KStream<String, ScoreWithPlayer> withPlayers =
    scoreEvents.join(players, scorePlayerJoiner, playerJoinParams);

// 將score-with-player記錄對映到產品
KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
    (leftKey, scoreWithPlayer) -> {
        return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
    };

// 連線withPlayers流和產品全域KTable
ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =
    (scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product);
KStream<String, Enriched> withProducts =
    withPlayers.join(products, keyMapper, productJoiner);

// 分組豐富的產品流
KGroupedStream<String, Enriched> grouped =
    withProducts.groupBy(
        (key, value) -> value.getProductId().toString(),
        Grouped.with(Serdes.String(), JsonSerdes.Enriched()));

// 初始化高分聚合的值
Initializer<HighScores> highScoresInitializer = HighScores::new;

// 高分聚合的邏輯實作在HighScores.add方法中
Aggregator<String, Enriched, HighScores> highScoresAdder =
    (key, value, aggregate) -> aggregate.add(value);

// 執行聚合操作,並具體化底層狀態儲存以供查詢
KTable<String, HighScores> highScores =
    grouped.aggregate(
        highScoresInitializer,
        highScoresAdder);

內容解密:

  • StreamsBuilder用於構建拓撲。
  • scoreEvents流與players表格連線,生成withPlayers流。
  • withPlayers流與products全域KTable連線,生成withProducts流。
  • withProducts流被分組並聚合,生成高分表格。

Kafka Streams 互動式查詢與狀態管理

Kafka Streams 提供了一種強大的機制,能夠將應用程式的狀態暴露給外部客戶端,從而實作低延遲的事件驅動微服務架構。在本章中,我們將探討如何使用互動式查詢來存取 Kafka Streams 應用程式的狀態。

互動式查詢與狀態儲存

要使用互動式查詢,首先需要將狀態儲存具象化。具象化的狀態儲存與內部狀態儲存不同,它們被明確命名並且可以在處理器拓撲之外進行查詢。

具象化狀態儲存的建立

在 Kafka Streams 中,可以透過 Materialized 類別來建立具象化的狀態儲存。以下範例展示瞭如何使用 Materialized 類別來建立一個持久化的鍵值儲存:

KTable<String, HighScores> highScores = grouped.aggregate(
    highScoresInitializer,
    highScoresAdder,
    Materialized.<String, HighScores, KeyValueStore<Bytes, byte[]>>as("leader-boards")
        .withKeySerde(Serdes.String())
        .withValueSerde(JsonSerdes.HighScores()));

內容解密:

  1. Materialized.as("leader-boards"):為狀態儲存指定名稱 “leader-boards”,使其能夠在處理器拓撲之外被查詢。
  2. .withKeySerde(Serdes.String()):指定鍵的序列化/反序列化器為 String 型別。
  3. .withValueSerde(JsonSerdes.HighScores()):指定值的序列化/反序列化器為 HighScores 型別,使用 JSON 格式進行序列化。

存取唯讀狀態儲存

要存取狀態儲存,需要兩個資訊:狀態儲存的名稱和型別。可以使用 KafkaStreams.store() 方法來取得一個可查詢的狀態儲存例項。

範例程式碼:取得唯讀鍵值儲存

ReadOnlyKeyValueStore<String, HighScores> stateStore = streams.store(
    StoreQueryParameters.fromNameAndType("leader-boards", QueryableStoreTypes.keyValueStore()));

內容解密:

  1. StoreQueryParameters.fromNameAndType("leader-boards", QueryableStoreTypes.keyValueStore()):根據名稱 “leader-boards” 和型別 keyValueStore 取得狀態儲存。
  2. QueryableStoreTypes.keyValueStore():指定要檢索的狀態儲存型別為鍵值儲存。

查詢非視窗鍵值儲存

不同的狀態儲存型別支援不同的查詢方式。對於簡單的鍵值儲存,可以進行點查詢、範圍掃描和計數查詢。

常見查詢操作

  • 點查詢:根據特定的鍵查詢對應的值。
  • 範圍掃描:查詢特定鍵範圍內的所有鍵值對。
  • 計數查詢:統計特定條件下的鍵值對數量。