在串流處理應用中,訊息重複可能導致資料不一致和錯誤的計算結果。Kafka 的精確一次語意,結合冪等生產者和事務機制,有效解決了這個問題。冪等生產者透過生產者 ID 和序列號機制,避免了因生產者重試導致的訊息重複。而事務機制則確保了多分割槽原子寫入,即使在應用程式當機的情況下,也能維持資料的一致性。然而,精確一次語意並非萬能,對於涉及外部系統的操作,仍需額外機制來確保資料的完整性和一致性。理解這些限制,並結合實際應用場景選擇合適的策略,才能最大程度地發揮 Kafka 的效能和可靠性。

8. Exactly-Once語意

在第7章中,我們討論了允許Kafka使用者控制其可靠性保證的組態引數和最佳實踐。我們專注於至少一次傳遞(at-least-once delivery)——Kafka不會丟失已確認為已提交的訊息的保證。這仍然留下了重複訊息的可能性。

在簡單的系統中,訊息被生產然後被各種應用程式消費,重複訊息是一種相當容易處理的煩惱。大多數現實世界的應用程式都包含唯一的識別符號,消費應用程式可以使用這些識別符號來對訊息進行重複資料刪除。

當我們檢視聚合事件的流處理應用程式時,事情變得更加複雜。當檢查一個消費事件、計算平均值並產生結果的應用程式時,通常無法檢測到平均值不正確,因為在計算平均值時某個事件被處理了兩次。在這些情況下,提供更強的保證——精確一次處理語意(exactly-once processing semantics)非常重要。

在本章中,我們將討論如何使用Kafka實作精確一次語意、推薦的使用案例和限制。與至少一次保證一樣,我們將探討並提供一些關於如何實作此保證的見解和直覺。在首次閱讀本章時,可以跳過這些細節,但在使用該功能之前瞭解這些細節將非常有用——它將有助於闡明不同組態和API的含義以及如何最佳地使用它們。

Kafka中的精確一次語意是兩個關鍵功能的組合:冪等生產者(idempotent producers),幫助避免由於生產者重試引起的重複;以及事務語意(transactional semantics),保證流處理應用程式中的精確一次處理。我們將討論這兩者,從更簡單、更通用的冪等生產者開始。

冪等生產者

如果執行相同的操作多次與執行一次具有相同的結果,則該服務被稱為冪等。在資料函式庫中,這通常表現為UPDATE t SET x=x+1 where y=5UPDATE t SET x=18 where y=5之間的差異。第一個例子不是冪等的;如果我們呼叫它三次,我們將得到與呼叫一次非常不同的結果。第二個例子是冪等的——無論我們執行該陳述式多少次,x都將等於18。

這與Kafka生產者有什麼關係?如果我們組態生產者具有至少一次語意而不是冪等語意,則意味著在不確定的情況下,生產者將重試傳送訊息,以便它至少到達一次。這些重試可能導致重複。

經典的例子是當一個分割槽長官者從生產者接收到一條記錄,將其成功複製到追隨者,然後在其上的代理當機之前,它無法向生產者傳送回應。生產者在一段時間內沒有收到回應,將重發訊息。該訊息將到達新的長官者,該長官者已經從之前的嘗試中擁有了該訊息的副本——從而導致重複。

在某些應用程式中,重複訊息並不重要,但在其他應用程式中,它們可能導致庫存清點錯誤、財務報表錯誤或向某人傳送兩把雨傘而不是一把。

Kafka的冪等生產者透過自動檢測和解決這些重複問題來解決這個問題。

冪等生產者如何工作?

當我們啟用冪等生產者時,每條訊息都將包含一個唯一的生產者ID(PID)和序列號。這些與目標主題和分割槽一起,唯一標識每條訊息。代理使用這些唯一識別符號來跟蹤每個代理上每個分割槽產生的最後五條訊息。為了限制每個分割槽必須跟蹤的前一個序列號的數量,我們還要求生產者使用max.inflight.requests=5或更低(預設值為5)。

當代理接收到一條已經被接受的訊息時,它將以適當的錯誤拒絕重複的訊息。這個錯誤由生產者記錄,並反映在其指標中,但不會引起任何異常,也不應該引起任何警示。在生產者客戶端,它將被新增到record-error-rate指標中。在代理上,它將是RequestMetrics型別的ErrorsPerSec指標的一部分,其中包括每個錯誤型別的單獨計數。

如果代理接收到的序列號出乎意料地高怎麼辦?代理預期訊息編號2後面跟著訊息編號3;如果代理接收到訊息編號27而不是會怎麼樣?在這種情況下,代理將以“亂序序列”錯誤回應,但如果我們使用冪等生產者而不使用事務,則可以忽略此錯誤。

警告

雖然生產者在遇到“亂序序列號”異常後將繼續正常執行,但此錯誤通常表明生產者和代理之間丟失了訊息——如果代理收到了訊息編號2,後面跟著訊息編號27,則訊息3到26一定發生了某些事情。在日誌中遇到此類別錯誤時,值得重新審視生產者和主題組態,並確保生產者組態了推薦的高可靠性值,並檢查是否發生了不乾淨的長官者選舉。

與分散式系統一樣,考慮冪等生產者在故障條件下的行為很有趣。考慮兩種情況:生產者重新啟動和代理故障。

生產者重新啟動

當生產者失敗時,通常會建立一個新的生產者來取代它——無論是由人工重新啟動機器,還是使用像Kubernetes這樣更複雜的框架來提供自動化的故障還原。關鍵點是,當生產者啟動時,如果啟用了冪等生產者,生產者將初始化並聯絡Kafka代理以生成生產者ID。每次初始化生產者時,都將獲得一個全新的ID(假設我們沒有啟用事務)。這意味著,如果一個生產者失敗,而取代它的生產者發送了一條先前由舊生產者傳送的訊息,代理將無法檢測到重複——這兩條訊息將具有不同的生產者ID和不同的序列號,並且將被視為不同的訊息。

// 初始化Kafka生產者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true"); // 啟用冪等生產者

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 傳送訊息
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);

內容解密:

  1. 初始化Kafka生產者的組態屬性,包括指定Kafka叢集的位置、確認模式、鍵和值的序列化器。
  2. 透過設定enable.idempotence=true來啟用冪等生產者,以避免由於重試導致的訊息重複。
  3. 建立KafkaProducer例項,並使用它來傳送一條記錄到指定的主題。
  4. 冪等生產者的實作依賴於生產者ID和序列號,確保即使在重試的情況下,訊息也不會被重複處理。

Kafka 冪等性生產者(Idempotent Producer)解析

Kafka 的冪等性生產者是為瞭解決因生產者重試機制導致的訊息重複問題而設計的。透過使用生產者 ID 和序列號,Kafka 能夠保證訊息的唯一性和正確的順序。

冪等性生產者的運作機制

  1. 生產者 ID 和序列號:每個生產者例項都會被分配一個唯一的生產者 ID。當生產者傳送訊息時,每個訊息都會被賦予一個序列號。Broker 會根據生產者 ID 和序列號來檢查是否有重複的訊息。

  2. Broker 端的狀態維護:Broker 會為每個生產者維護一個記憶體中的狀態,記錄最近的序列號。這使得 Broker 能夠拒絕重複的訊息。

  3. 容錯移轉和還原:當 Broker 發生故障時,新的 Leader 會繼承原有的生產者狀態,確保訊息的連續性和正確性。當舊的 Leader 還原後,它會從最新的快照和日誌中還原生產者狀態。

冪等性生產者的限制

  • 無法檢測應用層面的重複:如果應用程式多次呼叫 producer.send() 傳送相同的訊息,冪等性生產者無法檢測這種重複。

  • 多個生產者例項的重複問題:如果多個生產者例項傳送相同的訊息,冪等性生產者無法檢測這種重複。

如何使用 Kafka 冪等性生產者

  1. 組態冪等性生產者:在生產者組態中設定 enable.idempotence=true

  2. 效能影響:對於已經組態 acks=all 的生產者,啟用冪等性不會有明顯的效能差異。

  3. 運作變化

    • 生產者在啟動時會額外進行一次 API 呼叫以取得生產者 ID。
    • 每個記錄批次會包含生產者 ID 和第一個訊息的序列號,這增加了 96 位元的開銷。
    • Broker 會驗證序列號以保證沒有重複訊息。
    • 保證每個分割槽的訊息順序,即使在故障情況下。

程式碼範例

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true"); // 啟用冪等性
props.put("acks", "all"); // 保證訊息被正確接收

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
producer.close();

內容解密:

  1. enable.idempotence=true:此設定啟用了 Kafka 生產者的冪等性功能,確保在重試情況下不會產生重複訊息。
  2. acks=all:此設定保證了訊息被所有同步副本確認後才視為成功寫入,進一步確保了資料的可靠性和一致性。
  3. KafkaProducer:建立了一個 Kafka 生產者例項,用於向 Kafka 主題傳送訊息。
  4. ProducerRecord:建立了一個要傳送到指定主題的記錄,包含鍵值對。
  5. producer.send(record):將記錄傳送到 Kafka 主題。

Kafka 交易機制:確保串流處理的精確度

Apache Kafka 的交易機制是為串流處理應用程式設計的,旨在確保資料處理的準確性和可靠性。交易機制可以保證在「讀取-處理-寫入」流程中,每一筆輸入記錄只會被處理一次,即使在應用程式當機或出現重複資料的情況下也能保持結果的正確性。

交易機制的應用場景

交易機制特別適用於需要高精確度的串流處理應用,例如金融應用、資料聚合和連線操作。在這些應用中,交易機制可以確保資料的正確性和一致性。

金融應用的範例

金融應用是複雜串流處理的典型例子,需要使用精確一次(exactly-once)的能力來保證聚合結果的準確性。即使在一般的應用場景中,例如聊天機器人,也可以使用 Kafka Streams 啟用精確一次的保證。

交易機制解決的問題

應用程式當機導致的重複處理

當應用程式在處理資料後當機,可能會導致結果被寫入輸出主題多次。交易機制透過原子性的「讀取-處理-寫入」操作,確保偏移量和結果要麼同時成功,要麼都不成功,避免重複處理。

殭屍應用程式導致的重複結果

當應用程式凍結或失去與 Kafka 的連線時,可能會導致多個例項同時處理同一批資料,產生重複結果。交易機制透過「殭屍防護」機制,避免殭屍例項寫入輸出流。

交易機制如何保證精確一次

Kafka 交易機制引入了原子性多分割槽寫入的概念,確保偏移量和結果的寫入是原子的。要使用交易機制,需要組態一個具有 transactional.id 的生產者,並使用 initTransactions() 初始化。Kafka 會維護 transactional.idproducer.id 的對映,以確保在重啟後仍能識別相同的生產者。

使用交易生產者

交易生產者是一種特殊的 Kafka 生產者,透過組態 transactional.id 並初始化 initTransactions() 來使用。交易生產者可以執行原子性多分割槽寫入,確保資料的一致性和正確性。

圖解:交易生產者的原子性多分割槽寫入

此圖示說明瞭一個簡單的串流處理應用程式,執行原子性多分割槽寫入到兩個分割槽,同時提交事件的偏移量。

圖表說明

圖 8-1. 交易生產者的原子性多分割槽寫入

此圖示呈現了交易生產者如何執行原子性寫入操作,確保資料的一致性和正確性。

程式碼範例:使用交易生產者

// 設定交易生產者
Properties props = new Properties();
props.put("transactional.id", "my-transactional-id");

// 初始化交易生產者
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    // 開始交易
    producer.beginTransaction();
    
    // 生產資料到輸出主題
    producer.send(new ProducerRecord<>("output-topic", "key", "value"));
    
    // 提交偏移量
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition("input-topic", 0), new OffsetAndMetadata(1));
    producer.sendOffsetsToTransaction(offsets, "consumer-group-id");
    
    // 提交交易
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // 處理 ProducerFencedException
    producer.close();
}

內容解密:

  1. 設定交易生產者:首先,我們需要設定一個具有 transactional.id 的 Kafka 生產者。這是為了確保生產者在重啟後仍能被識別。
  2. 初始化交易生產者:使用 initTransactions() 初始化交易生產者,以確保其能夠執行交易操作。
  3. 開始交易:使用 beginTransaction() 開始一個新的交易。
  4. 生產資料:使用 send() 方法將資料生產到輸出主題。
  5. 提交偏移量:使用 sendOffsetsToTransaction() 提交輸入主題的偏移量,以確保資料的一致性。
  6. 提交交易:使用 commitTransaction() 提交交易,以確保所有操作都成功執行。
  7. 錯誤處理:如果出現 ProducerFencedException,表示該生產者已被隔離,需要關閉並重新建立新的生產者。

透過使用 Kafka 的交易機制,可以確保串流處理應用程式的資料正確性和一致性,避免重複處理和資料遺失的問題。

Kafka 交易機制詳解與應用限制

Apache Kafka 的交易機制(Transactions)是為了提供多分割槽原子寫入(但不包括讀取)以及在串流處理應用中隔離僵屍生產者的功能。交易機制的設計目標是確保在消費-處理-生產(consume-process-produce)鏈中的串流處理任務能夠實作精確一次(exactly-once)的處理保證。

交易機制的工作原理

交易機制主要涉及生產者(Producer)的操作。當一個生產者被組態為交易模式時,它可以開始一個交易、寫入多個分割槽、提交偏移量以標記已經處理的記錄,並最終提交或中止交易。消費者(Consumer)需要被適當組態以配合交易的隔離級別,否則無法保證精確一次的處理語義。

消費者透過設定 isolation.level 組態來控制是否讀取交易寫入的訊息。如果設定為 read_committed,消費者將只讀取成功提交的交易中的訊息或非交易寫入的訊息,而不會讀取中止的交易或仍在進行中的交易中的訊息。

交易機制的限制

雖然交易機制為 Kafka 帶來了多分割槽原子寫入和隔離僵屍生產者的能力,但在某些場景下,它並不能直接提供精確一次的處理保證。以下是一些交易機制無法解決的問題:

  1. 串流處理中的副作用:如果串流處理應用在處理記錄的過程中涉及傳送電子郵件等外部操作,啟用精確一次語義並不能保證這些外部操作只執行一次。精確一次的保證僅適用於寫入 Kafka 的記錄。

  2. 從 Kafka 主題讀取並寫入外部資料函式庫:在這種情況下,應用程式將記錄寫入外部資料函式庫,而不是 Kafka。由於沒有機制能夠在單一交易中同時將結果寫入外部資料函式庫並提交偏移量到 Kafka,因此需要額外的機制來管理偏移量,例如將偏移量儲存在資料函式庫中,並利用資料函式庫的交易保證。

實作精確一次語義的挑戰

要實作精確一次的語義,需要仔細考慮應用的設計和實作。除了利用 Kafka 的交易機制外,還需要對外部操作進行特殊處理,以避免重複執行。

程式碼範例:組態生產者和消費者以支援交易

// 生產者組態
Properties producerProps = new Properties();
producerProps.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(producerProps);

// 開始交易
producer.initTransactions();
producer.beginTransaction();

try {
    // 生產訊息
    producer.send(new ProducerRecord<>("my-topic", "key", "value"));
    // 提交偏移量
    producer.sendOffsetsToTransaction(offsets, groupId);
    // 提交交易
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // 處理 ProducerFencedException
    producer.close();
}

#### 內容解密
1. `transactional.id` 組態確保了生產者的唯一性避免了多個生產者例項之間的衝突
2. `initTransactions()` 初始化交易功能,`beginTransaction()` 開始一個新的交易
3. 在 `try` 區塊內生產者傳送訊息到 Kafka 主題並提交偏移量以標記已經處理的記錄
4. 如果所有操作成功,`commitTransaction()` 提交交易否則將丟擲異常需要適當處理

// 消費者組態
Properties consumerProps = new Properties();
consumerProps.put("isolation.level", "read_committed");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

#### 內容解密
1. `isolation.level` 設定為 `read_committed`,確保消費者只讀取已提交的交易中的訊息
2. 這種組態避免了讀取中止或進行中的交易中的訊息提高了資料的一致性和可靠性