Kafka Streams 提供了強大的狀態儲存功能,允許開發者在串流處理過程中儲存和查詢狀態資訊。除了內建的 RocksDB 狀態儲存外,Kafka Streams 也支援自訂狀態儲存,讓開發者能根據需求彈性運用不同的儲存機制。然而,自訂狀態儲存的實作相對複雜,需要仔細考量效能影響。對於大多數應用場景,使用內建的 RocksDB 狀態儲存已能滿足需求。處理器 API 作為 Kafka Streams 中較低層級的 API,賦予開發者更細粒度的控制能力,例如存取記錄元資料、排程定期函式和更精確地控制狀態儲存。雖然程式碼較為冗長,但能實作 DSL 難以達成的功能。Kafka Streams 也允許在同一個應用程式中結合使用 DSL 和處理器 API,兼顧程式碼簡潔性和功能彈性。
自訂狀態儲存與處理器 API 概覽
在探討 Kafka Streams 的進階功能時,我們關注了狀態儲存的管理以及如何確保具狀態的應用程式能夠平穩執行。同時,我們也觸及了自訂狀態儲存的實作以及處理器 API 的使用。
自訂狀態儲存
Kafka Streams 允許開發者實作自己的狀態儲存機制。這需要實作 StateStore 介面,或者使用更高層級的介面,如 KeyValueStore、WindowStore 或 SessionStore。此外,還需要實作 StoreSupplier 介面,以定義如何建立自訂狀態儲存的新例項。
雖然內建的根據 RocksDB 的狀態儲存效能出色,但在某些情況下,開發者可能仍需要實作自訂狀態儲存。然而,這是一個繁瑣且容易出錯的任務,因此除非絕對必要,否則不建議這樣做。
自訂狀態儲存實作重點
- 實作
StateStore介面或其更高層級的變體 - 建立
StoreSupplier介面以管理自訂狀態儲存的例項化 - 注意效能影響,特別是在涉及網路呼叫時
處理器 API
處理器 API 是 Kafka Streams 中的一個較低層級的 API,它提供了比高層級 DSL 更細粒度的控制。雖然它需要更冗長的程式碼,但它允許開發者對資料流經拓撲的方式、處理器之間的關係以及狀態的建立和維護進行精確控制。
何時使用處理器 API
開發者可能需要在以下情況下使用處理器 API:
- 需要存取記錄元資料(主題、分割槽、偏移資訊、記錄標頭等)
- 需要排程定期函式
- 需要對何時將記錄轉發到下游處理器進行更細粒度的控制
- 需要更細粒度的狀態儲存存取許可權
- 需要規避 DSL 中的某些限制
處理器 API 的優缺點
- 優點:提供更細粒度的控制,允許實作 DSL 中不可用的功能
- 缺點:程式碼更冗長,可讀性較低,且更容易出錯
結合 DSL 和處理器 API
Kafka Streams 允許在同一個應用程式中結合使用 DSL 和處理器 API。這使得開發者能夠利用 DSL 的簡潔性和處理器 API 的靈活性,從而在保持程式碼可讀性的同時實作複雜的功能。
Processor API 程式碼範例與解析
// 定義一個簡單的處理器拓撲
Topology topology = new Topology();
// 新增一個源處理器
topology.addSource("source", "input-topic");
// 新增一個處理器
topology.addProcessor("processor", MyProcessor::new, "source");
// 新增一個接收器
topology.addSink("sink", "output-topic", "processor");
#### 程式碼解析:
1. **建立拓撲物件**:首先,我們建立了一個 `Topology` 物件,用於定義我們的處理器拓撲。
2. **新增源處理器**:接著,我們使用 `addSource` 方法新增了一個名為 "source" 的源處理器,它訂閱了名為 "input-topic" 的 Kafka 主題。
3. **新增處理器**:然後,我們使用 `addProcessor` 方法新增了一個名為 "processor" 的處理器,它由 `MyProcessor` 類別例項化,並與 "source" 處理器相連。
4. **新增接收器**:最後,我們使用 `addSink` 方法新增了一個名為 "sink" 的接收器,它將資料寫入名為 "output-topic" 的 Kafka 主題,並與 "processor" 處理器相連。
物聯網數位孿生服務:以 Kafka Streams 建置
在工業物聯網(IIoT)與物聯網(IoT)領域中,數位孿生(Digital Twin)技術正逐漸嶄露頭角。所謂的數位孿生,是指將實體裝置的狀態在數位世界中進行對映,使我們能夠更有效率地監控和管理裝置。本章節將透過 Kafka Streams 的 Processor API,建置一個離岸風場的數位孿生服務,以展示如何利用 Kafka Streams 進行高容量感測器資料的處理,並實作裝置狀態的即時更新與查詢。
數位孿生的概念
數位孿生技術的核心在於將實體裝置的狀態對映到數位副本中。以風場中的風力渦輪機為例,每當渦輪機回報其當前狀態(如風速、溫度、電源狀態等),我們會將這些資訊儲存在鍵值儲存中。以下是一個典型的狀態紀錄範例:
{
"timestamp": "2020-11-23T09:02:00+08:00",
"wind_speed_mph": 40,
"temperature_fahrenheit": 60,
"power": "ON"
}
內容解密:
timestamp:表示該狀態紀錄的時間戳記。wind_speed_mph:表示風速,單位為英里/小時。temperature_fahrenheit:表示溫度,單位為華氏度。power:表示電源狀態(例如開啟或關閉)。
值得注意的是,裝置 ID 是透過紀錄鍵值來傳達的,這使得我們能夠區分不同裝置的狀態事件。
與數位孿生互動
在 IoT 場景中,裝置可能會頻繁離線,因此直接與實體裝置互動可能會導致錯誤。為了提高用性,我們轉而與裝置的數位副本(孿生)互動。例如,若要將某個風力渦輪機的電源狀態從開啟變更為關閉,我們會先更新數位副本中的期望狀態。當實體渦輪機重新上線時,它會同步其狀態並根據期望狀態進行調整。
因此,一個數位孿生紀錄會包含報告狀態和期望狀態。以下是 Kafka Streams Processor API 建立和公開的數位孿生紀錄範例:
{
"desired": {
"timestamp": "2020-11-23T09:02:01+08:00",
"power": "OFF"
},
"reported": {
"timestamp": "2020-11-23T09:00:01+08:00",
"windSpeedMph": 68,
"power": "ON"
}
}
內容解密:
desired:表示期望狀態,即我們希望裝置處於的狀態。reported:表示報告狀態,即裝置當前回報的狀態。
建置 IoT 數位孿生服務
我們的應用程式需要從一組風力渦輪機中擷取感測器資料串流,對資料進行一些簡單處理,並將每個渦輪機的最新狀態儲存在持久化的鍵值狀態儲存中。然後,我們將透過 Kafka Streams 的互動式查詢功能公開這些資料。
本章節將建置如圖 7-1 所示的拓撲結構。我們的 Kafka 叢集包含兩個主題,分別是 reported-state-events 和 desired-state-events。
@startuml
skinparam backgroundColor #FEFEFE
skinparam sequenceArrowThickness 2
title Kafka Streams 狀態儲存與處理器API
actor "客戶端" as client
participant "API Gateway" as gateway
participant "認證服務" as auth
participant "業務服務" as service
database "資料庫" as db
queue "訊息佇列" as mq
client -> gateway : HTTP 請求
gateway -> auth : 驗證 Token
auth --> gateway : 認證結果
alt 認證成功
gateway -> service : 轉發請求
service -> db : 查詢/更新資料
db --> service : 回傳結果
service -> mq : 發送事件
service --> gateway : 回應資料
gateway --> client : HTTP 200 OK
else 認證失敗
gateway --> client : HTTP 401 Unauthorized
end
@enduml此圖示展示了資料從來源主題流入,並經過處理後儲存於狀態儲存,最終公開為數位孿生紀錄的流程。
圖示內容解密:
reported-state-events主題接收來自風力渦輪機的環境感測器資料。- Processor 負責檢查風速是否超過安全閾值,如果超過則自動生成關閉訊號。
- 將最新狀態儲存在鍵值儲存中,並結合
desired-state-events主題的資料。 - 最終公開完整的數位孿生紀錄。
數位孿生應用程式開發:處理風力渦輪機狀態
在上一章中,我們探討了使用Kafka Streams和KSQLDB構建即時資料處理應用程式的基本概念。本章將探討如何使用Kafka Streams的Processor API開發一個數位孿生應用程式,用於處理風力渦輪機的狀態。
專案概述
數位孿生是一種虛擬的實體,它模擬了現實世界中的一個物件或系統的狀態和行為。在本例中,我們將為風力渦輪機建立一個數位孿生,以即時反映其狀態和行為。
我們的目標是建立一個Kafka Streams應用程式,該程式將:
- 從兩個輸入主題(reported-state-events和desired-state-events)讀取資料。
- 將這兩種事件合併成一個數位孿生記錄。
- 將數位孿生記錄寫入一個持久化的鍵值儲存(digital-twin-store)。
- 定期清理過期的數位孿生記錄。
- 將數位孿生記錄輸出到一個主題(digital-twins)供分析使用。
專案設定
首先,我們需要設定專案。您可以從GitHub倉函式庫克隆本章的程式碼:
$ git clone git@github.com:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-07/digital-twin
您可以使用以下命令構建專案:
$ ./gradlew build --info
資料模型
在開始建立拓撲之前,我們需要定義資料模型。輸入主題中的資料以JSON格式表示,並使用TurbineState類別來表示。
TurbineState類別定義
public class TurbineState {
private String timestamp;
private Double windSpeedMph;
public enum Power { ON, OFF }
public enum Type { DESIRED, REPORTED }
private Power power;
private Type type;
}
我們還需要定義一個DigitalTwin類別來表示合併後的數位孿生記錄。
DigitalTwin類別定義
public class DigitalTwin {
private TurbineState desired;
private TurbineState reported;
// getters and setters omitted for brevity
}
序列化/反序列化
在Processor API中,我們需要使用序列化器和反序列化器來將原始記錄位元組轉換為資料類別。我們將使用JsonSerdes類別來提供DigitalTwin和TurbineState的序列化器和反序列化器。
JsonSerdes類別定義
public class JsonSerdes {
public static Serde<DigitalTwin> DigitalTwin() {
JsonSerializer<DigitalTwin> serializer = new JsonSerializer<>();
JsonDeserializer<DigitalTwin> deserializer = new JsonDeserializer<>(DigitalTwin.class);
return Serdes.serdeFrom(serializer, deserializer);
}
public static Serde<TurbineState> TurbineState() {
JsonSerializer<TurbineState> serializer = new JsonSerializer<>();
JsonDeserializer<TurbineState> deserializer = new JsonDeserializer<>(TurbineState.class);
return Serdes.serdeFrom(serializer, deserializer);
}
}
內容解密:
此段程式碼定義了兩個靜態方法,分別用於建立 DigitalTwin 和 TurbineState 的 Serde 物件。 Serde 是 Kafka 中用於序列化和反序列化物件的介面。
JsonSerializer用於將物件序列化為 JSON 字串。JsonDeserializer用於將 JSON 字串反序列化為指定型別的物件。Serdes.serdeFrom(serializer, deserializer)方法用於建立一個包含指定序列化器和反序列化器的Serde物件。
透過使用這些 Serde 物件,我們可以方便地在 Kafka Streams 應用程式中處理 DigitalTwin 和 TurbineState 物件。
在 Kafka Streams 中使用 Processor API 建立處理拓撲
在前面的教程中,我們實作了 Serde 介面(參見第 74 頁的「建立 Tweet Serdes」)。本文將展示另一種方法,即使用 Kafka Streams 中的 Serdes.serdeFrom 方法從序列化器和反序列化器例項建構 Serdes。
取得 DigitalTwin Serdes 和 TurbineState Serdes 的方法
以下是一個用於檢索 TurbineState Serdes 的範例程式碼:
new JsonDeserializer<>(TurbineState.class);
return Serdes.serdeFrom(serializer, deserializer);
內容解密:
JsonDeserializer建構函式的使用:這裡使用JsonDeserializer的建構函式來建立一個新的反序列化器例項,指定TurbineState.class作為反序列化的目標類別。這使得 JSON 資料可以被轉換成TurbineState物件。Serdes.serdeFrom方法的使用:透過傳入序列化器和反序列化器例項,Serdes.serdeFrom方法用於建立一個Serdes物件。這種方法提供了一種靈活的方式來建立自定義的 Serdes。
新增來源處理器
現在我們已經定義了資料類別,是時候實作處理拓撲的第一步(參見圖 7-1)。這涉及新增兩個來源處理器,使我們能夠將資料從輸入主題串流到 Kafka Streams 應用程式中。
範例程式碼:新增來源處理器的初始拓撲
Topology builder = new Topology();
builder.addSource(
"Desired State Events",
Serdes.String().deserializer(),
JsonSerdes.TurbineState().deserializer(),
"desired-state-events");
builder.addSource(
"Reported State Events",
Serdes.String().deserializer(),
JsonSerdes.TurbineState().deserializer(),
"reported-state-events");
內容解密:
- 建立
Topology例項:直接例項化一個Topology物件,用於新增和連線來源、接收器和串流處理器。 addSource方法的使用:使用addSource方法建立來源處理器。有多個過載版本可供選擇,支援偏移重置策略、主題模式等。- 來源處理器的名稱:每個處理器必須具有唯一的名稱,因為 Kafka Streams 在內部使用一個拓撲排序的對映來儲存這些名稱。
- 鍵和值的反序列化器:分別使用內建的
String反序列化器和自定義的TurbineState反序列化器。
新增無狀態串流處理器
下一步需要在風力渦輪機報告危險風速時自動生成關閉訊號。這需要學習如何使用 Processor API 新增串流處理器。
範例程式碼:新增無狀態串流處理器的範例
builder.addProcessor(
"High Winds Flatmap Processor",
HighWindsFlatmapProcessor::new,
"Reported State Events");
內容解密:
addProcessor方法的使用:使用addProcessor方法新增串流處理器。第二個引數需要提供一個ProcessorSupplier,它是一個傳回Processor例項的函式介面。- 串流處理器的名稱和父處理器:指定串流處理器的名稱和父處理器的名稱。在此範例中,父處理器是「Reported State Events」。
Processor API 與 DSL 的比較
- Processor API 提供了更底層的操作,能夠更精細地控制處理拓撲的每個環節,但需要手動管理處理器之間的連線。
- DSL(Domain-Specific Language) 提供了更高層次的抽象,簡化了串流處理的實作,但犧牲了一定的靈活性。
透過結合 Processor API 和 DSL 的優點,可以實作更高效、更靈活的串流處理應用。接下來的章節將進一步探討如何實作 Processor 介面以及如何在 Kafka Streams 中使用狀態儲存。