Kafka Streams 允許開發者將狀態儲存嵌入應用程式中,方便進行資料累積、計數和聚合等操作。狀態儲存支援多種查詢方式,例如根據鍵值進行點查詢、指定範圍的範圍掃描以及遍歷所有條目的迭代器。這些查詢操作可直接在應用程式內部執行,實作高效的資料處理。然而,在分散式環境下,應用程式的狀態儲存可能分散在不同的例項上。此時,需要區分本地查詢和遠端查詢。本地查詢直接存取應用程式例項自身的狀態儲存,而遠端查詢則需要與其他持有狀態資料的例項進行通訊。Kafka Streams 提供了 queryMetadataForKey 和 allMetadataForStore 等方法,用於定位特定鍵值或狀態儲存所在的例項,方便實作遠端查詢功能。
互動式查詢:探討 Kafka Streams 中的狀態儲存查詢
在前面的章節中,我們已經瞭解了 Kafka Streams 中狀態儲存的基本概念。現在,讓我們進一步探討如何對狀態儲存執行各種查詢操作。首先,我們將介紹簡單鍵值儲存所支援的查詢型別。
簡單鍵值儲存的查詢型別
簡單鍵值儲存提供了多種查詢型別,這些查詢型別在 ReadOnlyKeyValueStore 介面中定義:
public interface ReadOnlyKeyValueStore<K, V> {
V get(K key);
KeyValueIterator<K, V> range(K from, K to);
KeyValueIterator<K, V> all();
long approximateNumEntries();
}
點查詢(get() 方法)
點查詢是最常見的查詢型別之一,涉及根據特定的鍵查詢狀態儲存中的值。例如:
HighScores highScores = stateStore.get(key);
此操作將傳回與給定鍵相關聯的值,如果鍵不存在,則傳回 null。
範圍掃描(range() 方法)
範圍掃描允許查詢一個鍵範圍內的所有鍵值對。以下是一個範例:
KeyValueIterator<String, HighScores> range = stateStore.range(1, 7);
try {
while (range.hasNext()) {
KeyValue<String, HighScores> next = range.next();
String key = next.key;
HighScores highScores = next.value;
// 處理 highScores 物件
}
} finally {
range.close(); // 關閉迭代器以避免記憶體洩漏
}
所有條目(all() 方法)
all() 方法傳回狀態儲存中的所有鍵值對,類別似於無篩選條件的 SELECT * 查詢。與範圍掃描類別似,使用完畢後必須關閉迭代器以避免記憶體洩漏。
條目數量(approximateNumEntries() 方法)
此方法傳回狀態儲存中條目的近似數量。對於使用 RocksDB 的持久化儲存,由於計算精確數量可能很昂貴且具有挑戰性,因此傳回的是近似值。
long approxNumEntries = stateStore.approximateNumEntries();
本地查詢與遠端查詢
每個 Kafka Streams 應用程式例項都可以查詢其本地狀態。然而,除非正在物化一個 GlobalKTable 或執行單一例項,否則本地狀態僅代表應用程式狀態的一部分。
遠端查詢
為了查詢應用程式的完整狀態,需要:
- 發現哪些例項包含應用程式狀態的各個片段。
- 新增遠端程式呼叫(RPC)或 REST 服務,以將本地狀態暴露給其他執行中的應用程式例項。
- 新增 RPC 或 REST 使用者端,以從執行中的應用程式例項查詢遠端狀態儲存。
在這個教程中,我們將使用 Javalin 實作 REST 服務,並使用 OkHttp 作為 REST 使用者端。首先,在 build.gradle 檔案中新增以下依賴項:
dependencies {
// 互動式查詢(伺服器)所需的依賴項
implementation 'io.javalin:javalin:4.3.0'
// REST 使用者端
implementation 'com.squareup.okhttp3:okhttp:4.10.0'
}
分散式狀態儲存與互動式查詢的實作
在建構Kafka Streams應用程式時,狀態儲存的分散式管理和互動式查詢是至關重要的環節。本文將探討如何使用Kafka Streams實作分散式狀態儲存,並透過RESTful API進行互動式查詢。
設定應用程式伺服器組態
首先,我們需要在Kafka Streams組態中設定APPLICATION_SERVER_CONFIG引數,以指定應用程式例項的主機和埠號。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "myapp:8080");
// 其他Kafka Streams屬性設定省略
KafkaStreams streams = new KafkaStreams(builder.build(), props);
內容解密:
StreamsConfig.APPLICATION_SERVER_CONFIG用於指定應用程式例項的通訊位址。- 該組態將被傳遞給其他執行中的應用程式例項,以便進行例項發現和遠端查詢。
建立排行榜服務
接下來,我們將建立一個排行榜服務,用於處理互動式查詢。該服務將使用Javalin框架建立RESTful API。
class LeaderboardService {
private final HostInfo hostInfo;
private final KafkaStreams streams;
LeaderboardService(HostInfo hostInfo, KafkaStreams streams) {
this.hostInfo = hostInfo;
this.streams = streams;
}
// 取得只讀狀態儲存
ReadOnlyKeyValueStore<String, HighScores> getStore() {
return streams.store(
StoreQueryParameters.fromNameAndType(
"leader-boards",
QueryableStoreTypes.keyValueStore()));
}
// 啟動Javalin服務
void start() {
Javalin app = Javalin.create().start(hostInfo.port());
app.get("/leaderboard/:key", this::getKey);
app.get("/leaderboard/count", this::getCount);
}
}
內容解密:
LeaderboardService類別封裝了與排行榜相關的操作,包括取得狀態儲存和啟動RESTful服務。getStore()方法用於取得只讀的狀態儲存,用於進行互動式查詢。start()方法啟動Javalin服務,並定義了兩個API端點:/leaderboard/:key和/leaderboard/count。
實作互動式查詢
現在,讓我們來實作/leaderboard/:key端點,用於根據指定的鍵值查詢高分紀錄。
void getKey(Context ctx) {
String productId = ctx.pathParam("key");
KeyQueryMetadata metadata =
streams.queryMetadataForKey(
"leader-boards", productId, Serdes.String().serializer());
if (hostInfo.equals(metadata.activeHost())) {
HighScores highScores = getStore().get(productId);
if (highScores == null) {
ctx.status(404);
return;
}
ctx.json(highScores.toList());
return;
}
// 遠端查詢
String remoteHost = metadata.activeHost().host();
int remotePort = metadata.activeHost().port();
String url = String.format("http://%s:%d/leaderboard/%s", remoteHost, remotePort, productId);
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
try (Response response = client.newCall(request).execute()) {
ctx.result(response.body().string());
} catch (Exception e) {
ctx.status(500);
}
}
內容解密:
queryMetadataForKey()方法用於查詢指定鍵值所在的例項。- 如果鍵值位於本地例項,直接查詢本地狀態儲存;否則,進行遠端查詢。
- 遠端查詢時,建構請求URL並使用OkHttpClient傳送請求。
統計所有例項的高分紀錄數量
最後,我們來實作/leaderboard/count端點,用於統計所有執行中的應用程式例項的高分紀錄數量。
void getCount(Context ctx) {
// 使用allMetadataForStore()方法取得所有例項的狀態儲存資訊
// 省略具體實作細節
}
內容解密:
allMetadataForStore()方法傳回所有分享相同應用程式ID且具有至少一個活動分割區的Kafka Streams應用程式例項的端點資訊。- 我們可以利用該方法統計所有例項的高分紀錄數量。
實作互動式查詢的計數方法
在前面的程式碼中,我們參照了 getCount 方法,現在我們將實作這個方法。該方法利用 allMetadataForStore 方法來取得每個遠端狀態儲存中的記錄總數。
void getCount(Context ctx) {
long count = getStore().approximateNumEntries();
for (StreamsMetadata metadata : streams.allMetadataForStore("leader-boards")) {
if (!hostInfo.equals(metadata.hostInfo())) {
count += fetchCountFromRemoteInstance(
metadata.hostInfo().host(),
metadata.hostInfo().port());
}
}
ctx.json(count);
}
內容解密:
- 初始化
count變數為本地狀態儲存中的條目數量。 - 使用
allMetadataForStore方法檢索包含所需狀態片段的每個 Kafka Streams 例項的主機/埠對。 - 如果後設資料對應於目前主機,則跳過,因為已經從本地狀態儲存中提取了條目計數。
- 如果後設資料不屬於本地例項,則從遠端例項檢索計數。
執行應用程式並查詢排行榜服務
完成排行榜拓撲的最後一步後,我們現在可以執行應用程式、產生一些虛擬資料並查詢排行榜服務。
虛擬資料範例
# players
1|{"id": 1, "name": "Elyse"}
2|{"id": 2, "name": "Mitch"}
3|{"id": 3, "name": "Isabelle"}
4|{"id": 4, "name": "Sammy"}
# products
1|{"id": 1, "name": "Super Smash Bros"}
6|{"id": 6, "name": "Mario Kart"}
# score-events
{"score": 1000, "product_id": 1, "player_id": 1}
{"score": 2000, "product_id": 1, "player_id": 2}
{"score": 4000, "product_id": 1, "player_id": 3}
{"score": 500, "product_id": 1, "player_id": 4}
{"score": 800, "product_id": 6, "player_id": 1}
{"score": 2500, "product_id": 6, "player_id": 2}
{"score": 9000.0, "product_id": 6, "player_id": 3}
{"score": 1200.0, "product_id": 6, "player_id": 4}
將虛擬資料產生到相應的主題後,查詢排行榜服務,我們將看到 Kafka Streams 應用程式不僅處理了高分,還公開了狀態操作的結果。
示例回應
[
{
"playerId": 3,
"productId": 1,
"playerName": "Isabelle",
"gameName": "Super Smash Bros",
"score": 4000
},
{
"playerId": 2,
"productId": 1,
"playerName": "Mitch",
"gameName": "Super Smash Bros",
"score": 2000
},
{
"playerId": 1,
"productId": 1,
"playerName": "Elyse",
"gameName": "Super Smash Bros",
"score": 1000
}
]
第五章:視窗與時間
時間是一個非常重要的概念,我們透過時間的流逝來衡量生命。每一年,都有幾個人圍在我身邊唱著生日快樂歌,當最後一個平淡的音符從空氣中消失時,一個蛋糕被獻給這個神秘的力量。我喜歡認為這個蛋糕是為我準備的,但其實它是獻給時間的。
時間不僅與物理世界緊密相連,也滲透到我們的事件流中。為了釋放 Kafka Streams 的全部潛力,我們必須瞭解事件與時間之間的關係。本章將詳細探討這種關係,並提供視窗操作的實踐經驗。視窗允許我們將事件分組到明確的時間桶中,可以用於建立更進階的連線和聚合(我們在前一章中首次探討了這些內容)。
在本章結束時,您將瞭解以下內容:
- 事件時間、攝入時間和處理時間之間的差異
- 如何建立自定義的時間戳提取器,以將事件與特定的時間戳和時間語義相關聯
- 時間如何控制資料在 Kafka Streams 中的流動
- Kafka Streams 中支援哪些型別的視窗
- 如何執行視窗連線
- 如何執行視窗聚合
- 有哪些策略可用於處理延遲和亂序事件
- 如何使用
suppress運算子處理視窗的最終結果 - 如何查詢視窗化的鍵值儲存
具有時間感知能力的串流處理:以病人監測系統為例
在醫療領域中,某些病況的治療需要即時處理大量的生理資料。Children’s Healthcare of Atlanta 利用 Kafka Streams 和 ksqlDB 進行即時預測,以判斷患有頭部創傷的兒童是否需要緊急手術。本章節將以此為靈感,透過建立一個監測病人生命徵象的應用程式來示範多個具有時間感知能力的串流處理概念。
病人監測應用程式介紹
系統性炎症反應症候群(Systemic Inflammatory Response Syndrome, SIRS)是一種需要立即就醫的病況。根據 Medical University of South Carolina 的物理治療師 Bridgette Kadri 表示,多個生命徵象,包括體溫、心率和血壓,可以作為 SIRS 的指標。本範例將著重於體溫和心率兩個指標。當兩個指標都超過預設閾值(心率 >= 100 次/分鐘,體溫 >= 100.4°F)時,系統會將警示訊息傳送至指定的主題,以通知相關醫療人員。
應用程式架構
我們的病人監測應用程式將包含以下步驟:
- 資料來源:兩個主題,分別是
pulse-events和body-temp-events,用於捕捉病人的心率和體溫資料。pulse-events主題由心跳感測器填充,每次感測到心跳時都會新增一筆記錄,記錄以病人 ID 為鍵值。body-temp-events主題由無線體溫感測器填充,每次測量到體溫時都會新增一筆記錄,同樣以病人 ID 為鍵值。
圖示:病人監測應用程式拓撲結構
@startuml
skinparam backgroundColor #FEFEFE
skinparam sequenceArrowThickness 2
title KafkaStreams 互動式狀態儲存查詢
actor "客戶端" as client
participant "API Gateway" as gateway
participant "認證服務" as auth
participant "業務服務" as service
database "資料庫" as db
queue "訊息佇列" as mq
client -> gateway : HTTP 請求
gateway -> auth : 驗證 Token
auth --> gateway : 認證結果
alt 認證成功
gateway -> service : 轉發請求
service -> db : 查詢/更新資料
db --> service : 回傳結果
service -> mq : 發送事件
service --> gateway : 回應資料
gateway --> client : HTTP 200 OK
else 認證失敗
gateway --> client : HTTP 401 Unauthorized
end
@enduml此圖示展示了資料從來源主題到最終警示輸出的流程。
實作步驟
轉換原始脈搏事件為心率:使用視窗化聚合(windowed aggregation)將脈搏事件轉換為心率(單位:次/分鐘)。由於測量單位是每分鐘的心跳次數,因此視窗大小設定為 60 秒。使用抑制運算元(suppress operator)僅發出最終的心率計算結果。
篩選超出閾值的生命徵象資料:篩選出心率和體溫超出預設閾值的資料。
重新鍵值:由於視窗化聚合改變了記錄鍵值,因此需要重新鍵值心率記錄,以滿足合併記錄的共分割需求。
視窗化 Join:合併兩個生命徵象串流,由於在合併前已經篩選出高心率和體溫資料,因此每個合併後的記錄都代表 SIRS 的警示條件。
輸出結果:透過互動式查詢公開心率聚合結果,並將合併串流的輸出寫入
alerts主題。
專案設定
本章節的程式碼位於 https://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git。您可以透過以下命令下載並切換到本章節的專案目錄:
$ git clone git@github.com:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-05/patient-monitoring
執行以下命令以建置專案:
$ ./gradlew build --info
內容解密:
本章節透過一個病人監測系統的範例,展示了具有時間感知能力的串流處理技術。利用 Kafka Streams,我們可以對即時的生理資料進行分析,並在必要時發出警示。這不僅展現了串流處理在醫療領域的應用,也對相關技術(如視窗化聚合、抑制運算元和視窗化 Join)進行了詳細介紹。透過這樣的範例,讀者能夠更深入地理解串流處理的概念及其在實際場景中的應用。