Kafka Streams 應用程式開發的第一步是定義資料模型。考量到每個生命徵象測量值都帶有時間戳,建立一個介面讓所有資料類別實作,確保時間戳的提取方式一致。此舉有助於後續時間戳提取器的實作。事件時間是事件發生的真實時間,攝入時間是事件進入 Kafka 的時間,而處理時間則是 Kafka Streams 處理事件的時間。選擇正確的時間語義對於根據時間的操作至關重要,例如視窗化連線和聚合。設定 Kafka 引數 log.message.timestamp.typemessage.timestamp.type 時,需注意避免意外使用攝入時間。事件時間的優勢在於時間戳更具意義,且根據時間的操作具有確定性,即使重新處理資料,視窗化行為也保持一致。

資料模型與時間語義在Kafka Streams中的應用

在開發Kafka Streams應用程式時,定義資料模型是首要步驟。由於每個生命徵象測量值都與時間戳相關聯,首先建立一個簡單的介面供各資料類別實作。這個介面使得從給定記錄中以一致的方式提取時間戳成為可能,並且在後續章節中實作時間戳提取器時會派上用場。

資料模型定義

public interface Vital {
    public String getTimestamp();
}

public class Pulse implements Vital {
    private String timestamp;
}

public class BodyTemp implements Vital {
    private String timestamp;
    private Double temperature;
    private String unit;
}

內容解密:

  • Vital介面定義了getTimestamp()方法,用於取得時間戳。
  • PulseBodyTemp類別實作了Vital介面,分別代表脈搏和體溫的測量資料。
  • BodyTemp類別額外包含了temperatureunit屬性,用於記錄體溫值和單位。

時間語義的重要性

Kafka Streams中有多種時間概念,選擇正確的時間語義對於執行根據時間的操作(如視窗化連線和聚合)至關重要。本文將探討Kafka Streams中的不同時間概念。

事件時間(Event Time)

事件發生的時間,可以嵌入事件的負載中,或使用Kafka生產者客戶端直接設定。

攝入時間(Ingestion Time)

事件被追加到Kafka主題的時間,總是在事件時間之後。

處理時間(Processing Time)

事件被Kafka Streams應用程式處理的時間,總是在事件時間和攝入時間之後。重新處理相同資料會導致新的處理時間戳,因此具有非確定性。

圖示:Kafka Streams中的不同時間語義

此圖示展示了心跳感測器資料流中的不同時間語義。

graph LR
    A[事件產生] -->|事件時間|> B[事件傳輸]
    B -->|攝入時間|> C[Kafka主題]
    C -->|處理時間|> D[Kafka Streams處理]

內容解密:

  • 圖表展示了事件從產生到被處理的流程。
  • 事件時間代表事件實際發生的時間。
  • 攝入時間是事件被寫入Kafka主題的時間。
  • 處理時間是事件被Kafka Streams應用程式處理的時間。

事件時間的組態與優勢

為了實作事件時間語義,可以在事件負載中嵌入時間戳,或使用Kafka生產者客戶端設定預設的時間戳。然而,需要注意Kafka的組態,以避免意外地使用攝入時間語義。

相關組態包括:

  • log.message.timestamp.type(代理級別)
  • message.timestamp.type(主題級別)

若主題組態為LogAppendTime,則生產者附加到訊息的時間戳將被代理的本地系統時間覆寫。因此,即使原本打算使用事件時間語義,也會變成攝入時間語義。

事件時間語義的優勢

  • 時間戳更具意義,與事件本身相關。
  • 允許根據時間的操作具有確定性,如重新處理資料時的視窗化行為。

時間戳提取器在 Kafka Streams 中的應用

在 Kafka Streams 中,時間戳提取器(Timestamp Extractors)負責將給定的記錄與時間戳關聯起來,這些時間戳用於依賴時間的操作,如視窗連線和視窗聚合。每個時間戳提取器實作必須遵循以下介面:

public interface TimestampExtractor {
    long extract(
        ConsumerRecord<Object, Object> record,
        long partitionTime
    );
}

內建的時間戳提取器

Kafka Streams 提供了多種內建的時間戳提取器,包括:

  1. FailOnInvalidTimestamp:預設的時間戳提取器,從消費者記錄中提取時間戳。如果時間戳無效,則丟擲 StreamsException
  2. LogAndSkipOnInvalidTimestamp:類別似於 FailOnInvalidTimestamp,但當遇到無效時間戳時,只記錄警告並跳過該記錄。
  3. WallclockTimestampExtractor:傳回本地系統時間,用於處理時間語義。

WallclockTimestampExtractor 示例

public class WallclockTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(
        final ConsumerRecord<Object, Object> record,
        final long partitionTime
    ) {
        return System.currentTimeMillis();
    }
}

自訂預設時間戳提取器

可以透過設定 StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG 屬性來覆寫預設的時間戳提取器。

Properties props = new Properties();
props.put(
    StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
    WallclockTimestampExtractor.class
);
// ... 其他組態
KafkaStreams streams = new KafkaStreams(builder.build(), props);

自訂時間戳提取器

當事件時間嵌入在記錄的有效載荷中時,需要建立自訂的時間戳提取器。以下是一個自訂的時間戳提取器示例,用於提取患者生命體徵測量的時間戳。

public class VitalTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        Vital measurement = (Vital) record.value();
        if (measurement != null && measurement.getTimestamp() != null) {
            String timestamp = measurement.getTimestamp();
            return Instant.parse(timestamp).toEpochMilli();
        }
        return partitionTime;
    }
}

內容解密:

  1. 將記錄值轉換為 Vital 物件,利用介面的一致性提取時間戳。
  2. 在解析時間戳之前,檢查記錄是否包含時間戳。
  3. 從記錄中提取時間戳,並將其轉換為毫秒級的 Epoch 時間。

選擇適當的時間語義

在使用視窗操作時,選擇適當的時間語義非常重要。處理時間語義可能會導致非預期的結果,因為視窗邊界代表的是 Kafka Streams 應用程式觀察到事件的時間,而不是事件發生的實際時間。

重要注意事項:

  • 當使用視窗聚合時,使用處理時間語義可能會導致事件落入錯誤的視窗中。
  • 當事件時間嵌入在記錄的有效載荷中時,應使用自訂的時間戳提取器來實作事件時間語義。

時間戳提取器與串流註冊

在 Kafka Streams 中,正確處理事件時間是至關重要的。TimestampExtractor 介面允許開發者自定義如何從 Kafka 記錄中提取時間戳。本文將探討如何實作自定義的時間戳提取器,並將其應用於輸入串流的註冊。

自定義時間戳提取器

在許多情況下,預設的時間戳提取器可能無法滿足特定需求。例如,當記錄中的時間戳不是以毫秒為單位時,需要進行轉換。以下是一個自定義的時間戳提取器的範例:

public class VitalTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        // 假設 Pulse 物件包含時間戳屬性
        Pulse pulse = (Pulse) record.value();
        // 將時間戳轉換為毫秒
        long timestampInMillis = pulse.getTimestamp() * 1000;
        return timestampInMillis;
    }
}

內容解密:

  1. extract 方法負責從 ConsumerRecord 中提取時間戳。
  2. record.value() 轉型為 Pulse 物件,以存取其時間戳屬性。
  3. 將時間戳從秒轉換為毫秒,以滿足 Kafka Streams 的要求。
  4. 如果無法提取時間戳,則可以回退到使用 partitionTime

註冊帶有自定義時間戳提取器的串流

註冊輸入串流時,可以透過 Consumed 物件指定自定義的時間戳提取器。以下是範例程式碼:

StreamsBuilder builder = new StreamsBuilder();
Consumed<String, Pulse> pulseConsumerOptions =
    Consumed.with(Serdes.String(), JsonSerdes.Pulse())
            .withTimestampExtractor(new VitalTimestampExtractor());
KStream<String, Pulse> pulseEvents =
    builder.stream("pulse-events", pulseConsumerOptions);

內容解密:

  1. 使用 StreamsBuilder 構建處理器拓撲。
  2. 透過 Consumed.withTimestampExtractor 指定自定義的時間戳提取器。
  3. 使用自定義的時間戳提取器註冊 pulse-events 主題的串流。

視窗化串流

在患者監測系統中,我們對計算每分鐘的心跳次數(bpm)感興趣。這需要使用視窗聚合操作。Kafka Streams 支援多種視窗型別,包括:

翻滾視窗(Tumbling Windows)

翻滾視窗是固定大小、不重疊的視窗。它們由視窗大小(以毫秒為單位)定義,並且與 epoch 對齊。

TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(5));
內容解密:
  1. TimeWindows.of 方法用於建立翻滾視窗。
  2. 指定視窗大小為 5 秒。
  3. 翻滾視窗不會重疊,每個記錄只會被計入一個視窗。

Kafka Streams 中的視窗型別與應用

在 Kafka Streams 中,視窗(Windows)是一種用於處理無限資料流的重要機制,可以將無限的資料流分割成有限的區塊進行處理。Kafka Streams 提供了多種視窗型別,包括滾動視窗(Tumbling Windows)、跳躍視窗(Hopping Windows)、會話視窗(Session Windows)、滑動連線視窗(Sliding Join Windows)以及滑動聚合視窗(Sliding Aggregation Windows)。

滾動視窗(Tumbling Windows)

滾動視窗是一種固定大小且不重疊的視窗。它們根據設定的大小將資料流分割成連續的區塊。例如,如果我們設定一個 60 秒的滾動視窗,則每 60 秒會產生一個新的視窗,且每個記錄只會出現在一個視窗中。

程式碼範例

TimeWindows tumblingWindow = TimeWindows.of(Duration.ofSeconds(60));

內容解密:

  • TimeWindows.of(Duration.ofSeconds(60)) 用於建立一個大小為 60 秒的滾動視窗。
  • 該視窗會根據 epoch 對齊,確保視窗邊界固定且可預測。
  • 每個記錄的 timestamp 會決定它屬於哪個視窗,且每個記錄只會出現在一個視窗中。

跳躍視窗(Hopping Windows)

跳躍視窗是固定大小但可能重疊的視窗。它們需要設定視窗大小和前進間隔。當前進間隔小於視窗大小時,視窗之間會產生重疊,使得某些記錄可能出現在多個視窗中。

程式碼範例

TimeWindows hoppingWindow = TimeWindows
    .of(Duration.ofSeconds(5))
    .advanceBy(Duration.ofSeconds(4));

內容解密:

  • .of(Duration.ofSeconds(5)) 設定視窗大小為 5 秒。
  • .advanceBy(Duration.ofSeconds(4)) 設定前進間隔為 4 秒,導致視窗之間有 1 秒的重疊。
  • 該設定使得某些記錄可能會出現在多個視窗中。

會話視窗(Session Windows)

會話視窗的大小是可變的,取決於活動期間和非活動間隙。設定一個非活動間隙(inactivity gap),如果新記錄的時間戳與前一記錄的時間戳之間的差距在該間隙範圍內,則它們會被合併到同一個視窗中。

程式碼範例

SessionWindows sessionWindow = SessionWindows.with(Duration.ofSeconds(5));

內容解密:

  • SessionWindows.with(Duration.ofSeconds(5)) 設定非活動間隙為 5 秒。
  • 如果新記錄的時間戳與前一記錄的時間戳差距在 5 秒內,則它們屬於同一個會話視窗。
  • 該型別的視窗邊界是包含上下邊界的。

滑動連線視窗(Sliding Join Windows)

滑動連線視窗用於連線操作,根據兩個記錄的時間戳差異是否在設定的視窗大小範圍內來決定是否屬於同一個視窗。

程式碼範例

JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));

內容解密:

  • JoinWindows.of(Duration.ofSeconds(5)) 設定連線視窗大小為 5 秒。
  • 如果兩個記錄的時間戳差異小於或等於 5 秒,則它們屬於同一個連線視窗。
  • 這種視窗型別主要用於 join 操作。

滑動聚合視窗(Sliding Aggregation Windows)

滑動聚合視窗與滑動連線視窗類別似,但用於聚合操作。記錄的時間戳決定了它們是否屬於同一個視窗。

程式碼範例

SlidingWindows slidingWindow = SlidingWindows.withTimeDifferenceAndGrace(
    Duration.ofSeconds(5),
    Duration.ofSeconds(0));

內容解密:

  • SlidingWindows.withTimeDifferenceAndGrace 用於建立滑動聚合視窗。
  • 第一個引數 Duration.ofSeconds(5) 設定視窗大小為 5 秒。
  • 第二個引數 Duration.ofSeconds(0) 設定 grace period 為 0 秒,用於處理遲到的記錄。

選擇適合的視窗型別

在 Kafka Streams 中選擇適合的視窗型別取決於具體的應用需求。例如,若需要將原始脈搏事件轉換為心率,可以使用滾動視窗,因為它能提供固定大小且不重疊的視窗,便於計算固定時間段內的心率。

Kafka Streams 視窗計算與延遲資料處理

在 Kafka Streams 中,視窗計算是一種常見的流處理模式,尤其是在處理時間序列資料時。視窗計算允許我們對一定時間範圍內的資料進行聚合和分析。然而,在實際應用中,資料可能會出現延遲或亂序的情況,這就需要我們對視窗計算的行為進行調整,以滿足不同的應用需求。

多維度鍵的轉換

在 Kafka Streams 中,當我們使用 windowedBy 運算子時,KTable 的鍵會從原始的 String 型別轉換為 Windowed<String> 型別。這是因為視窗計算需要將原始的記錄鍵與時間範圍結合起來,形成一個多維度的鍵。這種轉換使得我們可以對同一鍵的不同時間範圍內的資料進行獨立的聚合計算。

舉例來說,如果我們有以下記錄:

1|{"timestamp": "2020-11-12T09:02:00+08:00"}
1|{"timestamp": "2020-11-12T09:02:00.500Z"}
1|{"timestamp": "2020-11-12T09:02:01+08:00"}

那麼在經過 windowedBy 運算子處理後,這些記錄的鍵會被轉換為以下格式:

[1@1605171720000/1605171780000]

其中,1 是原始的記錄鍵,16051717200001605171780000 分別代表視窗的起始和結束時間。

程式碼範例

KTable<Windowed<String>, Long> windowedCounts = pulseEvents
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .count();

內容解密:

  1. groupByKey():根據記錄的鍵進行分組。
  2. windowedBy(TimeWindows.of(Duration.ofMinutes(1))):定義一個時間視窗,大小為 1 分鐘。
  3. count():對每個視窗內的記錄進行計數。

視窗結果的傳送

在 Kafka Streams 中,視窗結果的傳送時機是一個複雜的問題。由於事件流可能是無界的,且事件的時間戳可能不按順序到達,因此很難確定何時傳送視窗的最終結果。

Kafka Streams 預設採用連續細化(continuous refinement)的方法,即每當有新的事件加入視窗時,就立即傳送更新後的計算結果。這種方法最佳化的目標是降低延遲,但可能會導致下游運算子收到不完整的視窗結果。

圖表說明

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Kafka Streams 資料模型與時間語義應用

package "機器學習流程" {
    package "資料處理" {
        component [資料收集] as collect
        component [資料清洗] as clean
        component [特徵工程] as feature
    }

    package "模型訓練" {
        component [模型選擇] as select
        component [超參數調優] as tune
        component [交叉驗證] as cv
    }

    package "評估部署" {
        component [模型評估] as eval
        component [模型部署] as deploy
        component [監控維護] as monitor
    }
}

collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型

note right of feature
  特徵工程包含:
  - 特徵選擇
  - 特徵轉換
  - 降維處理
end note

note right of eval
  評估指標:
  - 準確率/召回率
  - F1 Score
  - AUC-ROC
end note

@enduml

此圖示展示了 Kafka Streams 處理事件的基本流程。

處理延遲資料

為了處理延遲資料,我們需要在完整性和延遲之間進行權衡。Kafka Streams 提供了 suppress 運算子,可以用來抑制中間的視窗計算結果,直到視窗關閉後才傳送最終結果。

程式碼範例

KTable<Windowed<String>, Long> suppressedCounts = windowedCounts
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));

內容解密:

  1. suppress():用於抑制中間的視窗計算結果。
  2. Suppressed.untilWindowCloses(BufferConfig.unbounded()):表示直到視窗關閉後才傳送最終結果,並且使用無界緩衝區來儲存事件。