Kafka 以高效能和可擴充套件性著稱,其核心在於其訊息儲存和管理機制。訊息批次處理有效提升了儲存和傳輸效率,而多版本訊息格式的支援則確保了向下相容性。Kafka 利用偏移量索引和時間戳索引實作快速訊息檢索。此外,日誌壓縮功能可以有效節省儲存空間,並保留每個 key 的最新值。副本管理機制和相關組態,例如複製因子、不乾淨的 Leader 選舉、最小同步副本數等,對於確保 Kafka 叢集的高用性和資料永續性至關重要。理解這些機制和組態,才能更好地應用 Kafka 並構建可靠的資料串流平台。

Kafka 訊息儲存與管理機制深度解析

Kafka 的訊息儲存機制是其高效能與可擴充套件性的核心基礎。本文將探討 Kafka 的訊息批次處理、訊息格式、索引機制、日誌壓縮等關鍵技術細節,並結合實際應用場景進行分析。

訊息批次處理機制

Kafka 採用批次處理方式來最佳化訊息儲存與傳輸效率。每個訊息批次包含一個批次標頭和多個訊息記錄。批次標頭中包含諸多重要後設資料,例如:

  • 生產者 ID 與紀元(epoch)
  • 起始序列號(sequence number)
  • 是否為交易訊息或控制訊息
  • 訊息集合內容

訊息記錄本身包含以下關鍵資訊:

  1. 記錄大小(位元組數)
  2. 與批次起始偏移量的差值
  3. 與批次起始時間戳的毫秒差值
  4. 使用者負載資料(key、value、headers)

程式碼範例:Kafka 訊息批次結構解析

// Kafka 訊息批次結構示範
public class KafkaBatchStructure {
    private long producerId;
    private short producerEpoch;
    private int firstSequence;
    private List<Record> records;

    // 建構子與 getter 方法省略
}

class Record {
    private int size;
    private long offsetDelta;
    private long timestampDelta;
    private byte[] key;
    private byte[] value;
    private Header[] headers;

    // 建構子與 getter 方法省略
}

內容解密:

  1. KafkaBatchStructure 類別代表一個 Kafka 訊息批次,包含生產者相關資訊和多筆記錄。
  2. 每筆 Record 記錄儲存了大小、偏移差值、時間戳差值等中繼資料,以及實際的使用者資料(key、value、headers)。
  3. 這種設計大幅減少每筆記錄的儲存開銷,使大批次處理更為高效。

訊息格式與向下相容性

Kafka 自 0.11 版本引入新的訊息格式(v2),並支援多版本共存。為確保相容性,Kafka 實作了訊息格式的向下轉換機制:

  1. 當新版本生產者傳送 v2 格式訊息至 broker
  2. 舊版本消費者讀取時,broker 自動將 v2 格式轉換為舊格式

Kafka 訊息格式轉換流程

此圖示展示了 Kafka 如何處理不同版本之間的訊息格式轉換。

圖示內容解密:

  1. 新舊生產者均可傳送訊息至 Broker。
  2. Broker 根據消費者版本決定是否進行訊息格式轉換。
  3. 舊版消費者需要 Broker 將 v2 格式轉換為舊格式才能讀取。

索引機制與高效檢索

Kafka 維護兩種索引以支援快速訊息檢索:

  1. 偏移量索引:將偏移量對映至對應的檔案位置
  2. 時間戳索引:支援按時間戳查詢訊息

這些索引同樣採用分段儲存,以便於清理舊資料。

日誌壓縮機制詳解

Kafka 提供日誌壓縮功能,允許保留每個 key 的最新值,而非單純依據保留時間刪除舊訊息。此功能適用於以下場景:

  1. 使用 Kafka 儲存客戶最新地址資訊
  2. 應用程式利用 Kafka 儲存最新狀態

日誌壓縮工作原理

日誌被視為兩部分:

  1. Clean(已清理部分):已壓縮完成的資料,僅保留每個 key 的最新值。
  2. Dirty(未清理部分):上次壓縮後新寫入的資料。

日誌壓縮示意圖

此圖示展示了 Kafka 日誌的壓縮狀態劃分。

圖示內容解密:

  1. 日誌被分為 Clean 和 Dirty 兩部分。
  2. Clean 部分已完成壓縮,儲存每個 key 的最新值。
  3. Dirty 部分包含最近寫入的資料,等待下次壓縮處理。

Kafka 日誌壓縮機制詳解

Kafka 的日誌壓縮(Log Compaction)是一種最佳化儲存空間的機制,確保主題中只保留每個鍵(key)的最新值。本章將探討 Kafka 日誌壓縮的工作原理、組態方法及其在實際應用中的重要性。

日誌壓縮的工作原理

當 Kafka 組態了日誌壓縮功能後,每個 broker 會啟動一個壓縮管理器執行緒和多個壓縮執行緒。這些執行緒負責執行壓縮任務,選擇髒訊息(dirty messages)比例最高的分割槽進行清理。

構建偏移量對映(Offset Map)

壓縮執行緒首先讀取分割槽的髒部分,並建立一個記憶體中的對映表。每個對映條目包含一個 16 位元組的訊息鍵雜湊值和前一條具有相同鍵的訊息的 8 位元組偏移量。這意味著每個對映條目僅使用 24 位元組。假設一個 1 GB 的段(segment)包含一百萬條訊息,每條訊息佔用 1 KB,那麼只需要約 24 MB 的記憶體來儲存對映表。這種設計非常高效!

組態記憶體使用

管理員可以組態用於偏移量對映表的總記憶體大小。即使每個執行緒有自己的對映表,組態也是針對所有執行緒的總記憶體。如果組態了 1 GB 的記憶體,而有 5 個壓縮執行緒,那麼每個執行緒將獲得 200 MB 的記憶體用於其偏移量對映表。Kafka 不要求整個髒部分都適合分配的記憶體大小,但至少需要一個完整的段能夠容納在內。如果無法滿足,Kafka 將記錄錯誤,管理員需要增加分配給偏移量對映表的記憶體或減少壓縮執行緒的數量。

壓縮過程

一旦壓縮執行緒構建了偏移量對映表,它將開始讀取最舊的段,並檢查其內容是否與偏移量對映表中的鍵相符。對於每個訊息,如果其鍵不存在於對映表中,則該訊息仍是最新的,將被複製到替換段中。如果鍵存在於對映表中,則該訊息被省略,因為後續有相同鍵但更新值的訊息。完成後,替換段將與原始段交換,執行緒繼續處理下一個段。最終,分割槽中只保留每個鍵的最新值。

刪除事件處理

當需要完全刪除某個鍵的所有訊息時(例如,使用者離開服務後需要刪除其所有資料),應用程式必須生成一個包含該鍵和 null 值的訊息。壓縮執行緒在遇到這種訊息(稱為墓碑訊息,tombstone)時,會先進行正常的壓縮,只保留具有 null 值的訊息,並在一段可組態的時間內保留該墓碑訊息。在此期間,消費者可以看到該墓碑訊息並據此刪除相應資料。過了這段時間後,壓縮執行緒將刪除墓碑訊息,該鍵將從 Kafka 的分割槽中徹底消失。

值得注意的是,Kafka 的管理客戶端提供了 deleteRecords 方法,可以刪除指定偏移量之前的所有記錄,並使用完全不同的機制。當呼叫此方法時,Kafka 將移動分割槽的低水位標記(low-water mark),防止消費者消費到被刪除的記錄,直到這些記錄被清理執行緒實際刪除。

日誌壓縮的時機

與刪除策略類別似,日誌壓縮策略永遠不會壓縮當前活躍的段。只有非活躍段中的訊息才有資格被壓縮。預設情況下,當主題中 50% 的內容是髒記錄時,Kafka 將開始進行壓縮。這是一種在壓縮頻率和磁碟空間浪費之間的權衡,可以由管理員進行調整。

此外,管理員還可以透過兩個組態引數來控制壓縮的時機:

  • min.compaction.lag.ms:保證訊息寫入後到被壓縮之間的最小時間間隔。
  • max.compaction.lag.ms:保證訊息寫入後到被認為可以壓縮之間的最大延遲。此組態常用於需要確保在特定時間內完成壓縮的業務場景,例如 GDPR 要求在特定時間內刪除某些資訊。
程式碼範例:Kafka 組態日誌壓縮
# 在 server.properties 中啟用日誌壓縮
log.cleaner.enable=true

# 組態用於日誌壓縮的記憶體大小
log.cleaner.dedupe.buffer.size=1073741824

# 組態日誌壓縮執行緒數量
log.cleaner.threads=5

# 組態日誌壓縮觸發比例
log.cleaner.min.compaction.lag.ms=60000
log.cleaner.max.compaction.lag.ms=86400000

內容解密:

  1. log.cleaner.enable=true:啟用 Kafka 的日誌壓縮功能。
  2. log.cleaner.dedupe.buffer.size=1073741824:組態用於日誌壓縮的記憶體大小為 1 GB,用於儲存偏移量對映表。
  3. log.cleaner.threads=5:設定日誌壓縮執行緒數量為 5,用於平行處理多個分割槽的日誌壓縮任務。
  4. log.cleaner.min.compaction.lag.ms=60000log.cleaner.max.compaction.lag.ms=86400000:組態日誌壓縮的最短和最長延遲時間,分別為 1 分鐘和 24 小時。

圖示說明

此圖示展示了 Kafka 日誌壓縮的基本流程,包括啟用日誌壓縮、構建偏移量對映表、遍歷段並檢查鍵、複製最新訊息到替換段、交換替換段與原始段以及刪除墓碑訊息等步驟。

可靠的資料傳遞

可靠性是系統的一種特性,而非單一元件的屬性。因此,當我們討論Apache Kafka的可靠性保證時,我們需要考慮整個系統及其使用案例。在可靠性方面,與Kafka整合的系統與Kafka本身同等重要。由於可靠性是一個系統級別的問題,因此它不能僅由一人負責。每個人——Kafka管理員、Linux管理員、網路和儲存管理員以及應用程式開發人員——都必須共同努力建立一個可靠的系統。

Apache Kafka的靈活性

Apache Kafka對於可靠的資料傳遞非常靈活。我們知道Kafka有多種使用案例,從追蹤網站點選到信用卡支付。有些使用案例需要最高的可靠性,而其他案例則優先考慮速度和簡單性而非可靠性。Kafka被設計成足夠可組態,其客戶端API足夠靈活,以允許所有型別的可靠性權衡。由於其靈活性,也很容易在使用Kafka時無意中釀成錯誤——相信我們的系統是可靠的,但事實上並非如此。在本章中,我們將首先討論不同型別的可靠性及其在Apache Kafka上下文中的含義。然後,我們將討論Kafka的複製機制及其如何為系統的可靠性做出貢獻。

可靠性保證

當我們談論可靠性時,我們通常以保證的形式來討論,這些保證是系統在不同情況下被保證保留的行為。也許最著名的可靠性保證是ACID,這是關係型資料函式庫普遍支援的標準可靠性保證。ACID代表原子性(atomicity)、一致性(consistency)、隔離性(isolation)和永續性(durability)。當供應商解釋說他們的資料函式庫是ACID相容的,這意味著資料函式庫保證了有關交易行為的某些行為。

這些保證是人們信任關係型資料函式庫處理其最關鍵應用程式的原因——他們確切地知道系統承諾什麼,以及它在不同條件下將如何表現。他們理解這些保證,並且可以透過依賴這些保證來編寫安全的應用程式。瞭解Kafka提供的保證對於那些尋求建立可靠應用程式的人來說至關重要。這種理解使系統開發人員能夠弄清楚在不同的故障條件下系統將如何表現。

Apache Kafka的保證

Kafka提供了以下保證:

  1. 訊息順序保證:在一個分割區內,訊息的順序是有保證的。如果訊息B是在訊息A之後使用相同的生產者在相同的分割區內寫入的,那麼Kafka保證訊息B的偏移量將高於訊息A,並且消費者將在訊息A之後讀取訊息B。
  2. 已提交的訊息:當訊息被寫入所有同步副本(但不一定被刷入磁碟)時,生產者可以選擇接收已傳送訊息的確認。
  3. 訊息不會丟失:只要至少有一個副本保持活躍,已提交的訊息就不會丟失。
  4. 消費者只能讀取已提交的訊息

複製機制

Kafka的複製機制是其可靠性保證的核心。每個Kafka主題都被分解成分割區,這些分割區是基本的資料建構塊。一個分割區儲存在一個單獨的磁碟上。Kafka保證了一個分割區內事件的順序,並且一個分割區可以是線上(可用)或離線(不可用)。每個分割區可以有多個副本,其中一個被指定為長官者。所有事件都被生產到長官者副本,並且通常也是從長官者副本消費。其他副本只需要與長官者保持同步並及時複製所有最近的事件。如果長官者變得不可用,其中一個同步副本將成為新的長官者。

同步副本

一個副本被視為同步,如果它是某個分割區的長官者,或者如果它是一個跟隨者,且:

  • 與ZooKeeper有活躍的工作階段(意味著它在過去6秒內向ZooKeeper發送了心跳,引數可組態)。
  • 在過去10秒內從長官者那裡取得了訊息(引數可組態)。
  • 在過去10秒內取得了長官者最新的訊息(引數可組態)。也就是說,跟隨者不僅要繼續從長官者那裡取得訊息,還必須至少在過去10秒內沒有延遲。

Kafka 副本管理與可靠性組態詳解

Kafka 的副本管理機制是其高用性和資料永續性的關鍵。在 Kafka 中,每個分割槽(partition)都可以有多個副本(replica),這些副本分佈在不同的 broker 上,以確保即使某個 broker 發生故障,資料仍然可用。

副本同步與不同步狀態

當一個副本失去與 ZooKeeper 的連線、停止取得新訊息,或落後太多無法在 10 秒內趕上時,該副本被視為不同步(out-of-sync)。一個不同步的副本可以在重新連線到 ZooKeeper 並趕上 leader 的最新訊息後還原同步。

在舊版本的 Kafka 中,常見的一個問題是副本在同步與不同步狀態之間快速切換,這通常是因為某些組態問題,例如較大的最大請求大小和較大的 JVM 堆積記憶體,導致垃圾回收暫停時間過長,從而使 broker 暫時斷開與 ZooKeeper 的連線。不過,在 Kafka 2.5.0 及更高版本中,這個問題已經很少見了。

副本同步對效能的影響

一個稍微落後的同步副本可能會減慢生產者和消費者的速度,因為他們需要等待所有同步副本都收到訊息後才會提交。一旦一個副本變為不同步狀態,我們不再等待它取得訊息,雖然它仍然落後,但不再對效能產生影響。然而,同步副本的數量減少會降低分割槽的有效複製因子,從而增加停機或資料丟失的風險。

Broker 組態引數

Kafka 的 broker 有三個組態引數可以改變其可靠訊息儲存的行為。這些引數既可以在 broker 級別設定,也可以針對特定主題進行設定。這使得同一個 Kafka 叢集可以同時託管可靠和非可靠的主題。

複製因子(Replication Factor)

主題級別的組態引數是 replication.factor,而 broker 級別的預設值是 default.replication.factor,用於自動建立的主題。複製因子決定了每個分割槽被複制到多少個 broker 上。較高的複製因子提供了更高的可用性和可靠性,但也需要更多的磁碟空間和 broker。

決定一個主題的適當複製因子需要考慮幾個因素:

  • 可用性:單個副本的分割槽在單個 broker 重啟時將不可用。更多的副本意味著更高的可用性。
  • 永續性:每個副本都是分割槽資料的一個複製。更多的副本,尤其是分佈在不同的儲存裝置上,可以降低資料丟失的風險。
  • 吞吐量:每增加一個副本,就會增加 broker 之間的流量。因此,需要根據叢集大小和容量進行規劃。
  • 端對端延遲:生產的記錄需要被複制到所有同步副本後才對消費者可用。理論上,更多的副本可能會增加延遲,因為慢速的副本會拖慢整個系統。
  • 成本:更多的副本意味著更高的儲存和網路成本。對於非關鍵資料,使用較低的複製因子(如 2)可以降低成本,但也會降低可用性。

此外,副本的放置位置也很重要。Kafka 會確保每個分割槽的副本分佈在不同的 broker 上,但在某些情況下,這可能還不夠。如果所有副本都在同一個機架上的 broker 上,而該機架的交換機發生故障,那麼該分割槽仍然會不可用。

Kafka 的可靠性和可用性組態

Kafka 的設計旨在提供高用性和可靠性,但這需要正確的組態和維護。以下是一些關鍵的組態和技術,用於確保 Kafka 叢集的可靠性和可用性。

複製因子和機架感知

為了保護 Kafka 叢集免受單點故障的影響,建議使用多個副本(replication factor)並將 broker 分佈在不同的機架(rack)上。透過設定 broker.rack 引數,可以指定每個 broker 所在的機架。Kafka 將確保分割槽(partition)的副本分散在多個機架上,以提高用性。

# 設定 broker 所在的機架
broker.rack=rack1

內容解密:

  • broker.rack 引數用於指定 broker 所在的機架,有助於 Kafka 將分割槽副本分散到不同的機架,從而提高系統的容錯能力。
  • 在雲端環境中,通常將可用區域(availability zone)視為不同的機架。

不乾淨的 Leader 選舉

unclean.leader.election.enable 引數控制是否允許不同步的副本(out-of-sync replica)成為新的 Leader。預設情況下,此引數設為 false,以確保資料的一致性和可靠性。

# 不允許不同步的副本成為新的 Leader
unclean.leader.election.enable=false

內容解密:

  • 當 Leader 不可用時,Kafka 需要選擇一個新的 Leader。如果允許不同步的副本成為新的 Leader,可能會導致資料丟失或不一致。
  • 設定為 false 可以避免資料丟失,但可能導致某些分割槽在極端情況下不可用,直到原來的 Leader 還原。

最小同步副本數

min.insync.replicas 引數用於設定寫入資料所需的最小同步副本數。這可以確保資料被寫入多個副本,提高資料的可靠性。

# 設定最小同步副本數為 2
min.insync.replicas=2

內容解密:

  • min.insync.replicas 設定為 2 時,生產者只能在至少兩個副本同步的情況下寫入資料。
  • 如果只有一個副本可用,生產者將無法寫入資料,確保了資料不會在單一副本上被寫入而導致資料丟失。

保持副本同步

為了保持副本的同步,需要避免副本失去連線或落後於 Leader。透過監控和維護,可以減少不同步副本的出現,提高系統的可靠性。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Kafka 訊息儲存與管理機制

package "系統架構" {
    package "前端層" {
        component [使用者介面] as ui
        component [API 客戶端] as client
    }

    package "後端層" {
        component [API 服務] as api
        component [業務邏輯] as logic
        component [資料存取] as dao
    }

    package "資料層" {
        database [主資料庫] as db
        database [快取] as cache
    }
}

ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取

note right of api
  RESTful API
  或 GraphQL
end note

@enduml

此圖示展示了 Kafka 中 Leader 和副本之間的資料複製過程。

內容解密:

  • Leader 負責接收生產者的寫入請求,並將資料複製到其他副本。
  • 為了保持系統的可靠性,需要確保所有副本保持同步。