Kafka Streams 採用任務分割機制,讓資料流分散至多個任務處理,實作水平擴充套件。隨著資料量增長,只需增加執行緒或應用程式例項即可提升處理能力。Kafka 的高用性設計,讓 Kafka Streams 能夠從上次提交的偏移量還原處理,確保容錯性。消費者協調機制允許任務在可用執行緒上重新啟動,狀態還原則透過重新讀取內部主題來完成。縮短還原時間的關鍵在於最佳化 Kafka Streams 主題組態,例如降低 min.compaction.lag.ms 和調整段大小,並組態備用副本以利容錯移轉。

串流處理的高用性與擴充套件性

在前面的章節中,我們探討瞭如何使用 Kafka Streams 進行串流處理。現在,我們將深入瞭解 Kafka Streams 如何實作高用性和擴充套件性,以滿足大規模資料處理的需求。

擴充套件性

Kafka Streams 的設計使其能夠水平擴充套件,以處理大量的資料。這是透過將資料流分成多個任務(tasks)來實作的,每個任務負責處理資料流的一部分。當資料量增加時,可以簡單地新增更多的執行緒或應使用案例項來處理新增的任務。

圖 14-13:兩組任務處理具有主題的事件,以便在它們之間重新分割槽事件

在上圖中,我們可以看到兩個任務集合,它們共同處理一個主題的事件。當需要擴充套件時,可以新增更多的任務或執行緒來參與事件處理,從而提高整體的處理能力。

容錯機制

Kafka Streams 的擴充套件模型也使其具有良好的容錯能力。當應用程式失敗時,可以從 Kafka 中還原其最後提交的偏移量(offset),並繼續處理。這是因為 Kafka 提供了高可用性的資料儲存。

高用性的實作

  1. Kafka 的高用性:Kafka 的設計使其成為一個高可用性的系統。因此,儲存在 Kafka 中的資料也是高可用性的。
  2. 消費者協調:Kafka Streams 利用 Kafka 的消費者協調機制來提供任務的高用性。如果一個任務失敗,但仍有其他執行緒或應使用案例項在執行,該任務將在其中一個可用的執行緒上重新啟動。
  3. 狀態還原:當一個執行緒需要啟動一個原本在失敗執行緒上執行的任務時,它首先需要還原其儲存的狀態(例如,聚合視窗)。通常,這是透過從 Kafka 重新讀取內部主題來完成的,以便預熱 Kafka Streams 的狀態儲存。

縮短還原時間

為了減少還原時間,通常需要最佳化 Kafka Streams 主題的組態,例如設定較低的 min.compaction.lag.ms 和將段大小組態為 100 MB 而不是預設的 1 GB。此外,還可以組態備用副本(standby replica),以便在容錯移轉時能夠快速接管任務的處理。

串流處理的應用場景

串流處理適用於需要快速處理事件的場景,而不是等待數小時直到下一次批次處理。以下是一些常見的串流處理應用場景:

客戶服務

假設我們在一家大型連鎖酒店預訂了一個房間,並期望收到電子郵件確認和收據。如果在預訂後幾分鐘內仍未收到確認,客戶服務中心應該能夠立即查詢到我們的預訂資訊。利用串流處理,可以使所有相關系統在預訂完成後幾秒鐘或幾分鐘內收到更新,從而改善客戶體驗。

物聯網(IoT)

串流處理在 IoT 領域有多種應用,例如預測裝置何時需要預防性維護。透過大規模處理來自裝置的事件,可以識別出需要維護的裝置模式,從而減少停機時間。

詐欺偵測

詐欺偵測是串流處理的另一個重要應用場景。透過分析大量的交易資料,可以即時識別出異常模式,從而防止詐欺行為的發生。

總之,Kafka Streams 提供了一個強大且可擴充套件的串流處理平台,能夠滿足各種大規模資料處理的需求。無論是在客戶服務、IoT 還是詐欺偵測領域,串流處理都能夠提供即時的洞察和決策支援。

串流處理的應用與框架選擇

串流處理是一種能夠即時處理和分析大量連續資料流的技術,廣泛應用於金融欺詐檢測、網路安全威脅偵測等領域。這些應用都需要快速回應事件,以便在問題發生時立即採取行動。

串流處理的實際應用

  1. 欺詐檢測
    欺詐檢測是串流處理的重要應用之一,包括信用卡欺詐、股票交易欺詐、視訊遊戲作弊和網路安全風險檢測等。這些應用需要近乎即時的系統,能夠在事件發生時快速回應,例如在不良交易被批准之前阻止它。

  2. 網路安全
    在網路安全領域,有一種稱為「信標」(beaconing)的技術。當駭客在組織內植入惡意軟體時,它可能會定期向外部傳送訊息以接收指令。這種活動很難被檢測,因為它可以在任何時間和頻率發生。透過處理大量的網路連線事件串流,並識別異常的通訊模式,可以在造成更大損害之前提醒安全組織。

如何選擇串流處理框架

選擇串流處理框架時,必須考慮所開發的應用程式型別。不同的應用場景需要不同的串流處理解決方案:

  1. 資料匯入(Ingest)
    當目標是將資料從一個系統傳輸到另一個系統,並進行必要的資料轉換時,需要選擇具備良好聯結器支援的串流處理系統。

  2. 低毫秒級操作
    對於需要即時回應的應用,如某些欺詐檢測案例,需要選擇支援事件驅動且低延遲的串流處理系統。

  3. 非同步微服務
    這些微服務代表更大的業務流程執行簡單的操作,如更新商店庫存。需要能夠維護本地狀態快取事件以提高效能的串流處理系統。

  4. 近即時資料分析
    這些串流應用執行複雜的聚合和連線操作,以生成有價值的業務洞察。需要具備強大本地儲存支援的串流處理系統,以支援高階聚合、視窗操作和連線。

選擇串流處理框架的考量因素

除了特定應用場景的需求,還有幾個全域性考量因素:

  1. 系統可操作性
    系統是否易於佈署、監控和故障排除?是否能與現有基礎設施良好整合?

  2. API 的易用性和除錯便捷性
    開發時間和上市時間非常重要,因此需要選擇能提高開發效率的系統。

  3. 是否讓困難的事情變得簡單
    是否提供乾淨的 API 和抽象,處理擴充套件性和還原等細節?

  4. 社群支援
    大多數串流處理應用是開源的,活躍的社群意味著可以獲得新功能、良好的品質、快速修復錯誤和及時的使用者支援。

內容解密:
  1. 這段程式碼使用 Kafka Streams 建立了一個資料處理流程,首先從名為 “topic” 的主題讀取資料。
  2. flatMapValues 操作將輸入的值轉換為小寫並按非單字字元進行分割,生成單詞串列。
  3. groupBy 操作根據單詞進行分組,然後使用 count 方法統計每個單詞的出現次數,並將結果物化到名為 “counts” 的狀態儲存中。
  4. 最後,將計數結果轉換為串流並輸出到系統控制檯。
  5. 這種處理方式適用於即時分析單詞頻率的場景,例如監控特定關鍵字的出現頻率。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Kafka Streams 高可用性與擴充套件性架構

package "Kafka Cluster" {
    queue "Input Topic\n(Partitions 0-3)" as input
    queue "Internal Topics\n(State Changelog)" as internal
    queue "Output Topic" as output
}

package "Kafka Streams Application" {
    package "Instance 1" {
        component "Task 0" as t0
        component "Task 1" as t1
        database "State Store\n(RocksDB)" as ss1
    }

    package "Instance 2" {
        component "Task 2" as t2
        component "Task 3" as t3
        database "State Store\n(RocksDB)" as ss2
    }

    package "Standby Replica" {
        component "Standby\nTask 0-1" as standby
        database "Standby\nState Store" as ss_standby
    }
}

input --> t0 : Partition 0
input --> t1 : Partition 1
input --> t2 : Partition 2
input --> t3 : Partition 3

t0 --> ss1
t1 --> ss1
t2 --> ss2
t3 --> ss2

ss1 --> internal : 狀態變更記錄
ss2 --> internal : 狀態變更記錄
internal --> ss_standby : 狀態同步

t0 --> output
t1 --> output
t2 --> output
t3 --> output

note right of standby
  容錯機制:
  當 Instance 失敗時
  Standby 可快速接管
  減少狀態還原時間
end note

note bottom of input
  擴充套件性:
  增加 Partition 數量
  新增 Instance 即可
  水平擴充套件處理能力
end note

@enduml

此圖展示 Kafka Streams 的高可用性與擴充套件性架構:多個應用程式實例分別處理不同分區的任務,透過本地狀態儲存和內部主題實現狀態管理,並利用備用副本實現快速容錯移轉。

在其他作業系統上安裝 Kafka

Apache Kafka 主要是一個 Java 應用程式,因此應該能夠在任何能夠安裝 JRE 的系統上執行。不過,它已經針對 Linux 作業系統進行了最佳化,因此在 Linux 上會有最佳的效能。在其他作業系統上執行可能會出現特定於該作業系統的錯誤。因此,當在常見的桌面作業系統上使用 Kafka 進行開發或測試時,考慮在虛擬機器中執行以比對最終的生產環境是一個好主意。

在 Windows 上安裝

截至 Microsoft Windows 10,有兩種方式可以執行 Kafka。傳統方式是使用原生 Java 安裝。Windows 10 使用者也可以選擇使用 Windows Subsystem for Linux。後者是更好的選擇,因為它提供了一個更簡單的設定,更接近典型的生產環境。

使用 Windows Subsystem for Linux

如果您正在執行 Windows 10,您可以使用 Windows Subsystem for Linux(WSL)安裝原生 Ubuntu 支援。在出版時,Microsoft 仍然認為 WSL 是一個實驗性功能。雖然它的運作方式類別似於虛擬機器,但它不需要完整的 VM 資源,並且提供了與 Windows 作業系統更豐富的整合。

要安裝 WSL,請按照 Microsoft Developer Network 的「什麼是 Windows Subsystem for Linux?」頁面上的指示進行。完成後,您需要使用 apt 安裝 JDK(假設您已經為 WSL 安裝了 Ubuntu 系統套件):

$ sudo apt install openjdk-16-jre-headless
[sudo] password for username:
Reading package lists... Done
Building dependency tree
Reading state information... Done
[...]
done.
$

安裝 JDK 後,您可以按照第 2 章的指示安裝 Apache Kafka。

使用原生 Java

對於較舊版本的 Windows,或如果您不想使用 WSL 環境,您可以使用 Windows 的 Java 環境原生執行 Kafka。但請注意,這可能會引入特定於 Windows 環境的錯誤。這些錯誤可能不會像 Linux 上的類別似問題那樣受到 Apache Kafka 開發社群的關注。

在安裝 ZooKeeper 和 Kafka 之前,您必須設定 Java 環境。您應該安裝最新版本的 Oracle Java 16,可以在 Oracle Java SE 下載頁面上找到。下載完整的 JDK 套件,以便擁有所有可用的 Java 工具,並按照指示進行安裝。

注意路徑

在安裝 Java 和 Kafka 時,強烈建議您堅持使用不包含空格的安裝路徑。雖然 Windows 允許路徑中使用空格,但為 Unix 環境設計的應用程式並非如此設定,因此指定路徑將很困難。在安裝 Java 時,請務必牢記這一點來設定安裝路徑。例如,如果安裝 JDK 16.0.1,一個好的選擇是使用路徑 C:\Java\jdk-16.0.1

安裝 Java 後,設定環境變數以便能夠使用它。這是在 Windows 的控制檯中完成的,不過確切位置將取決於您的作業系統版本。在 Windows 10 中,您必須:

  1. 選擇“系統和安全”
  2. 選擇“系統”
  3. 選擇“進階系統設定”,這將開啟“系統屬性”視窗
  4. 在“進階”標籤上,按一下“環境變數”按鈕

使用此部分新增一個名為 JAVA_HOME 的新使用者變數(圖 A-1),並將其設定為您安裝 Java 的路徑。然後編輯名為 Path 的系統變數,並新增一個新的專案 %JAVA_HOME%\bin。儲存這些設定,並離開控制檯。

圖 A-1. 新增 JAVA_HOME 變數

現在您可以繼續安裝 Apache Kafka。安裝包括 ZooKeeper,因此您不必單獨安裝它。目前的 Kafka 版本可以線上下載。在出版時,該版本是 2.8.0,在 Scala 版本 2.13.0 下執行。下載的檔案將使用 tar 實用程式進行 gzip 壓縮和封裝,因此您需要使用 Windows 應用程式(如 8 Zip)來解壓縮它。與在 Linux 上安裝類別似,您必須選擇一個目錄來解壓縮 Kafka。對於本範例,我們將假設 Kafka 被解壓縮到 C:\kafka_2.13-2.8.0

在 Windows 下執行 ZooKeeper 和 Kafka 有點不同,因為您必須使用為 Windows 設計的批次檔,而不是其他平台的 shell 指令碼。這些批次檔也不支援在背景執行應用程式,因此您需要為每個應用程式分別開啟一個 shell。首先,啟動 ZooKeeper:

PS C:\> cd kafka_2.13-2.8.0
PS C:\kafka_2.13-2.8.0> bin\windows\zookeeper-server-start.bat C:\kafka_2.13-2.8.0\config\zookeeper.properties
[2021-07-18 17:37:12,917] INFO Reading configuration from: C:\kafka_2.13-2.8.0\config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[...]
[2021-07-18 17:37:13,135] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
[2021-07-18 17:37:13,144] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

內容解密:

此段落展示瞭如何在 Windows 上啟動 ZooKeeper。首先,我們導航到 Kafka 的安裝目錄,然後執行 zookeeper-server-start.bat 指令碼,並指定 zookeeper.properties 組態檔案的路徑。輸出顯示 ZooKeeper 已成功啟動,並提供了一些有關組態和處理請求的資訊。

ZooKeeper 執行後,您可以開啟另一個視窗來啟動 Kafka:

PS C:\> cd kafka_2.13-2.8.0
PS C:\kafka_2.13-2.8.0> .\bin\windows\kafka-server-start.bat C:\kafka_2.13-2.8.0\config\server.properties
[2021-07-18 17:39:46,098] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[...]
[2021-07-18 17:39:47,918] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2021-07-18 17:39:48,009] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker 192.168.0.2:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

內容解密:

此段落展示瞭如何在 Windows 上啟動 Kafka。首先,我們導航到 Kafka 的安裝目錄,然後執行 kafka-server-start.bat 指令碼,並指定 server.properties 組態檔案的路徑。輸出顯示 Kafka 已成功啟動,並提供了一些有關註冊 MBean 和啟動伺服器的資訊。

在 macOS 上安裝

macOS 在 Darwin 上執行,這是一個部分源自 FreeBSD 的 Unix 作業系統。這意味著許多在 Unix 作業系統上執行的預期仍然成立,並且安裝為 Unix 設計的應用程式(如 Apache Kafka)並不困難。您可以透過使用套件管理器(如 Homebrew)保持安裝簡單,也可以手動安裝 Java 和 Kafka 以獲得對版本的更大控制權。

在 macOS 上安裝 Kafka

在 macOS 上安裝 Kafka 有兩種主要方法:使用 Homebrew 或手動安裝。以下將詳細介紹這兩種方法。

使用 Homebrew 安裝 Kafka

對於已經安裝了 Homebrew 的 macOS 使用者,可以透過它一步到位地安裝 Kafka。這不僅會安裝 Kafka,還會確保 Java 首先被安裝。截至本文撰寫時,Homebrew 將安裝 Apache Kafka 2.8.0 版本。

  1. 安裝 Homebrew:如果尚未安裝 Homebrew,請依照官方安裝頁面的指示進行安裝。

  2. 安裝 Kafka:使用 Homebrew 安裝 Kafka,它將自動處理依賴項,包括 Java。

    $ brew install kafka
    ==> Installing dependencies for kafka: openjdk, openssl@1.1 and zookeeper
    ==> Installing kafka dependency: openjdk
    ==> Pouring openjdk--16.0.1.big_sur.bottle.tar.gz
    [...]
    ==> Summary
    /usr/local/Cellar/kafka/2.8.0: 200 files, 68.2MB
    $
    

    內容解密:

    • brew install kafka 命令會安裝 Kafka 及其依賴項,包括 openjdkopenssl@1.1zookeeper
    • 安裝完成後,Kafka 的檔案將被連結到 /usr/local/bin/usr/local/etc/kafka/usr/local/var/lib/kafka-logs 等目錄。
  3. 啟動 ZooKeeper 和 Kafka

    $ /usr/local/bin/zkServer start
    ZooKeeper JMX enabled by default
    Using config: /usr/local/etc/zookeeper/zoo.cfg
    Starting zookeeper ... STARTED
    $ /usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties
    [2021-07-18 17:52:15,688] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [...]
    [2021-07-18 17:52:18,187] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    [2021-07-18 17:52:18,232] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker 192.168.0.2:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
    

    內容解密:

    • 首先啟動 ZooKeeper 服務,使用 /usr/local/bin/zkServer start 命令。
    • 然後啟動 Kafka 服務,使用 /usr/local/bin/kafka-server-start 命令並指定 server.properties 組態檔案。

手動安裝 Kafka

手動安裝 Kafka 需要先安裝 JDK,然後下載並組態 Kafka。

  1. 安裝 JDK:從 Oracle Java SE 下載頁面取得適當的 JDK 版本並進行安裝。

  2. 下載並組態 Kafka:下載 Apache Kafka,並將其解壓縮到指定目錄,如 /usr/local/kafka_2.13-2.8.0

  3. 設定 JAVA_HOME 環境變數

    $ export JAVA_HOME=`/usr/libexec/java_home -v 16.0.1`
    $ echo $JAVA_HOME
    /Library/Java/JavaVirtualMachines/jdk-16.0.1.jdk/Contents/Home
    

    內容解密:

    • 使用 /usr/libexec/java_home -v 16.0.1 命令來設定 JAVA_HOME 環境變數,指向已安裝的 JDK 目錄。
  4. 啟動 ZooKeeper 和 Kafka

    $ /usr/local/kafka_2.13-2.8.0/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.13-2.8.0/config/zookeeper.properties
    $ /usr/local/kafka_2.13-2.8.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.0/config/server.properties
    [2021-07-18 18:02:34,724] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [...]
    [2021-07-18 18:02:36,873] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    [2021-07-18 18:02:36,915] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker 192.168.0.2:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
    

    內容解密:

    • 使用 Kafka 提供的指令碼來啟動 ZooKeeper 和 Kafka 服務,分別執行 zookeeper-server-start.shkafka-server-start.sh 指令碼。