Kafka Streams 提供強大的狀態處理能力,允許開發者構建複雜的事件驅動應用程式。透過分組操作,可以確保相關記錄由相同的 Kafka Streams 任務處理,而聚合操作則可以對分組後的資料進行更深入的分析,例如計算總和、平均值或計數。理解 KStream 與 GlobalKTable 的連線方式對於根據記錄值屬性進行連線至關重要。此外,Kafka Streams 利用內部主題(如重新分割槽主題和變更日誌主題)來支援狀態儲存和容錯機制。在實際應用中,可以利用聚合運算元(如 aggregate 和 reduce)來合併多個輸入值,例如計算每個遊戲的最高分數。aggregate 運算元允許指定不同的輸出記錄型別,提供更大的靈活性。為了實作更複雜的聚合邏輯,可以建立自定義聚合類別,並搭配初始化器和加法器函式使用。
Kafka Streams 中的狀態處理:分組與聚合
在 Kafka Streams 中,狀態處理是構建複雜事件驅動應用程式的關鍵部分。其中,分組和聚合是兩個重要的操作,允許開發者對資料流進行更深入的處理和分析。
分組記錄
在進行任何流或表聚合之前,必須先對 KStream 或 KTable 進行分組。分組的目的是確保相關記錄由相同的觀察者或 Kafka Streams 任務處理。
分組流
有兩種運算子可用於分組 KStream:groupBy 和 groupByKey。
groupBy是一個更改鍵的運算子,會導致 Kafka Streams 標記流需要重新分割槽。如果下游運算子讀取新的鍵,Kafka Streams 將自動建立一個重新分割槽主題並將資料路由回 Kafka 以完成重新鍵控的過程。
KGroupedStream<String, Enriched> grouped =
withProducts.groupBy(
(key, value) -> value.getProductId().toString(),
Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
groupByKey不會標記流需要重新分割槽,因此效能更好,因為它避免了將資料發送回 Kafka 進行重新分割槽的額外網路呼叫。
KGroupedStream<String, Enriched> grouped =
withProducts.groupByKey(
Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
分組表
對於 KTable,分組操作只有 groupBy 可用,並且傳回一個不同的中間表示:KGroupedTable。
KGroupedTable<String, Player> groupedPlayers =
players.groupBy(
(key, value) -> KeyValue.pair(key, value),
Grouped.with(Serdes.String(), JsonSerdes.Player()));
聚合
完成分組後,就可以進行聚合操作。KGroupedStream 和 KGroupedTable 都允許開發者執行各種聚合操作,如計算總和、平均值或計數等。
KStream 到 GlobalKTable 的連線
在進行聚合之前,瞭解如何在 Kafka Streams 中進行連線操作是非常重要的。特別是 KStream 到 GlobalKTable 的連線,它允許開發者根據記錄值中的某些屬性進行連線,而不是依賴於鍵的分割槽。
KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
(leftKey, scoreWithPlayer) -> {
return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
};
KStream<String, Enriched> withProducts =
withPlayers.join(products, keyMapper, productJoiner);
內部主題
Kafka Streams 在內部建立了一些主題,如重新分割槽主題和變更日誌主題,用於支援狀態儲存和容錯。
$ kafka-topics --bootstrap-server kafka:9092 --list
players
products
score-events
dev-KSTREAM-KEY-SELECT-0000000001-repartition
dev-players-STATE-STORE-0000000002-changelog
Kafka Streams 中的聚合運算
在建立排行榜拓撲的最後一步中,我們需要計算每個遊戲的最高分數。Kafka Streams 提供了一組運算元,使得執行這類別聚合運算變得非常容易。
聚合運算簡介
聚合運算是將多個輸入值合併成一個輸出值的方法。雖然我們通常將聚合運算視為數學運算,但它們不一定是數學運算。count 是一種計算每個鍵的事件數量的數學運算,而 aggregate 和 reduce 運算元則更為通用,可以使用您指定的任何組合邏輯來合併值。
aggregate和reduce的主要差異在於傳回型別。reduce運算元要求聚合的輸出型別與輸入型別相同,而aggregate運算元可以指定不同的輸出記錄型別。
對流進行聚合
在本文中,我們將學習如何對記錄流進行聚合運算。這涉及到建立一個用於初始化新聚合值的函式(稱為初始化器)和一個用於在給定鍵的新記錄到來時執行後續聚合的函式(稱為加法器函式)。
初始化器
當 Kafka Streams 拓撲遇到新的鍵時,我們需要某種方式來初始化聚合。幫助我們完成這項工作的介面是 Initializer。與 Kafka Streams API 中的許多類別一樣,Initializer 是一個功能介面(即包含單一方法),因此可以定義為 lambda 表示式。
Initializer<Long> countInitializer = () -> 0L;
對於更複雜的聚合運算,您可以提供自己的自定義初始化器。例如,要實作視訊遊戲排行榜,我們需要某種方式來計算給定遊戲的前三名最高分數。
自定義聚合類別:HighScores
我們將建立一個名為 HighScores 的自定義類別來作為我們的聚合類別。該類別需要一些底層資料結構來儲存給定視訊遊戲的前三名分數。
public class HighScores {
private final TreeSet<Enriched> highScores = new TreeSet<>();
public HighScores add(final Enriched enriched) {
highScores.add(enriched);
if (highScores.size() > 3) {
highScores.remove(highScores.last());
}
return this;
}
}
初始化 HighScores
Initializer<HighScores> highScoresInitializer = HighScores::new;
加法器
接下來,我們需要定義用於合併兩個聚合的邏輯。這是透過 Aggregator 介面完成的,該介面與 Initializer 一樣,是一個功能介面,可以使用 lambda 表示式來實作。
Aggregator<String, Enriched, HighScores> highScoresAdder =
(key, value, aggregate) -> aggregate.add(value);
在 HighScores 類別中實作 add 方法,以將新的最高分數新增到內部的 TreeSet 中,並在分數超過三個時移除最低分數。
為了使 TreeSet 能夠知道如何排序 Enriched 物件(並因此能夠識別最低分數的 Enriched 記錄以在最高分數聚合超過三個值時移除),我們將實作 Comparable 介面。
內容解密:
- 初始化器(Initializer):用於在遇到新鍵時初始化聚合值。可以是簡單的 lambda 表示式,如
countInitializer,也可以是自定義的類別例項化,如HighScores::new。 - 加法器(Adder):用於將新記錄新增到現有的聚合值中。在
HighScores的例子中,加法器邏輯被封裝在add方法中,該方法將新的Enriched物件新增到TreeSet中,並保持集合大小不超過三個元素。 TreeSet:用於儲存和管理前三名最高分數。由於TreeSet是有序集合,因此它能夠自動排序分數,並在需要時移除最低分數。Comparable介面:為了使TreeSet能夠正確排序Enriched物件,需要在Enriched類別中實作Comparable介面,以定義物件之間的比較邏輯。
影片遊戲排行榜應用程式的狀態處理
在開發影片遊戲排行榜應用程式時,我們需要處理大量的玩家資料和分數事件。本章節將探討如何使用Kafka Streams進行狀態處理,包括聚合、分組和連線操作。
更新Enriched類別以實作Comparable介面
為了比較不同的Enriched物件,我們需要更新Enriched類別以實作Comparable介面。這樣,我們就可以根據分數對Enriched物件進行排序。
public class Enriched implements Comparable<Enriched> {
@Override
public int compareTo(Enriched o) {
return Double.compare(o.score, score);
}
// 省略其他程式碼
}
內容解密:
Enriched類別實作了Comparable介面,以便根據分數對物件進行比較。compareTo方法使用Double.compare來比較兩個Enriched物件的分數。
使用Kafka Streams的aggregate運算元進行高分聚合
現在我們已經有了初始化器和加法器函式,我們可以使用Kafka Streams的aggregate運算元來進行高分聚合。
KTable<String, HighScores> highScores =
grouped.aggregate(highScoresInitializer, highScoresAdder);
內容解密:
grouped.aggregate方法使用初始化器和加法器函式來進行聚合操作。highScoresInitializer初始化聚合的值,而highScoresAdder則將新的Enriched物件新增到聚合中。
聚合表格
聚合表格的過程與聚合流相似,但由於表格是可變的,因此需要能夠在鍵被刪除時更新聚合值。
KGroupedTable<String, Player> groupedPlayers =
players.groupBy(
(key, value) -> KeyValue.pair(key, value),
Grouped.with(Serdes.String(), JsonSerdes.Player()));
groupedPlayers.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + 1L,
(key, value, aggregate) -> aggregate - 1L);
內容解密:
- 初始化器函式將聚合初始化為0。
- 加法器函式在看到新的鍵時增加當前計數。
- 減法器函式在鍵被刪除時減少當前計數。
結合所有處理步驟
現在我們已經構建了排行榜拓撲的各個處理步驟,讓我們將它們結合起來。範例4-8展示了我們迄今為止建立的拓撲步驟如何結合在一起。
// 建立StreamsBuilder例項
StreamsBuilder builder = new StreamsBuilder();
// 註冊分數事件流
KStream<String, ScoreEvent> scoreEvents =
builder.stream(
"score-events",
Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent()))
.selectKey((k, v) -> v.getPlayerId().toString());
// 建立分割槽玩家表格
KTable<String, Player> players =
builder.table("players", Consumed.with(Serdes.String(), JsonSerdes.Player()));
// 建立全域產品表格
GlobalKTable<String, Product> products =
builder.globalTable(
"products",
Consumed.with(Serdes.String(), JsonSerdes.Product()));
// 連線引數,用於scoreEvents和players的連線操作
Joined<String, ScoreEvent, Player> playerJoinParams =
Joined.with(Serdes.String(), JsonSerdes.ScoreEvent(), JsonSerdes.Player());
// 連線scoreEvents和players
ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner =
(score, player) -> new ScoreWithPlayer(score, player);
KStream<String, ScoreWithPlayer> withPlayers =
scoreEvents.join(players, scorePlayerJoiner, playerJoinParams);
// 將score-with-player記錄對映到產品
KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
(leftKey, scoreWithPlayer) -> {
return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
};
// 連線withPlayers流和產品全域KTable
ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =
(scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product);
KStream<String, Enriched> withProducts =
withPlayers.join(products, keyMapper, productJoiner);
// 分組豐富的產品流
KGroupedStream<String, Enriched> grouped =
withProducts.groupBy(
(key, value) -> value.getProductId().toString(),
Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
// 初始化高分聚合的值
Initializer<HighScores> highScoresInitializer = HighScores::new;
// 高分聚合的邏輯實作在HighScores.add方法中
Aggregator<String, Enriched, HighScores> highScoresAdder =
(key, value, aggregate) -> aggregate.add(value);
// 執行聚合操作,並具體化底層狀態儲存以供查詢
KTable<String, HighScores> highScores =
grouped.aggregate(
highScoresInitializer,
highScoresAdder);
內容解密:
StreamsBuilder用於構建拓撲。scoreEvents流與players表格連線,生成withPlayers流。withPlayers流與products全域KTable連線,生成withProducts流。withProducts流被分組並聚合,生成高分表格。
Kafka Streams 互動式查詢與狀態管理
Kafka Streams 提供了一種強大的機制,能夠將應用程式的狀態暴露給外部客戶端,從而實作低延遲的事件驅動微服務架構。在本章中,我們將探討如何使用互動式查詢來存取 Kafka Streams 應用程式的狀態。
互動式查詢與狀態儲存
要使用互動式查詢,首先需要將狀態儲存具象化。具象化的狀態儲存與內部狀態儲存不同,它們被明確命名並且可以在處理器拓撲之外進行查詢。
具象化狀態儲存的建立
在 Kafka Streams 中,可以透過 Materialized 類別來建立具象化的狀態儲存。以下範例展示瞭如何使用 Materialized 類別來建立一個持久化的鍵值儲存:
KTable<String, HighScores> highScores = grouped.aggregate(
highScoresInitializer,
highScoresAdder,
Materialized.<String, HighScores, KeyValueStore<Bytes, byte[]>>as("leader-boards")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerdes.HighScores()));
內容解密:
Materialized.as("leader-boards"):為狀態儲存指定名稱 “leader-boards”,使其能夠在處理器拓撲之外被查詢。.withKeySerde(Serdes.String()):指定鍵的序列化/反序列化器為String型別。.withValueSerde(JsonSerdes.HighScores()):指定值的序列化/反序列化器為HighScores型別,使用 JSON 格式進行序列化。
存取唯讀狀態儲存
要存取狀態儲存,需要兩個資訊:狀態儲存的名稱和型別。可以使用 KafkaStreams.store() 方法來取得一個可查詢的狀態儲存例項。
範例程式碼:取得唯讀鍵值儲存
ReadOnlyKeyValueStore<String, HighScores> stateStore = streams.store(
StoreQueryParameters.fromNameAndType("leader-boards", QueryableStoreTypes.keyValueStore()));
內容解密:
StoreQueryParameters.fromNameAndType("leader-boards", QueryableStoreTypes.keyValueStore()):根據名稱 “leader-boards” 和型別keyValueStore取得狀態儲存。QueryableStoreTypes.keyValueStore():指定要檢索的狀態儲存型別為鍵值儲存。
查詢非視窗鍵值儲存
不同的狀態儲存型別支援不同的查詢方式。對於簡單的鍵值儲存,可以進行點查詢、範圍掃描和計數查詢。
常見查詢操作
- 點查詢:根據特定的鍵查詢對應的值。
- 範圍掃描:查詢特定鍵範圍內的所有鍵值對。
- 計數查詢:統計特定條件下的鍵值對數量。