Kafka 的消費者偏移管理對於確保資料正確處理至關重要。消費者組利用偏移量追蹤消費進度,Kafka 則保留已提交的偏移量以便重新分配或重啟。然而,若消費者組失效,Kafka 只會在設定時間內保留偏移量,逾期將被刪除,導致消費者組遺忘先前的消費記錄。偏移量提交的機制是透過消費者向 Kafka 傳送訊息,更新 __consumer_offsets 主題中的每個分割槽的已提交偏移量。提交的偏移量直接影響資料處理,偏移量過小會導致訊息重複處理,過大則會導致訊息遺漏。自動提交偏移雖然方便,但可能因提交延遲導致訊息重複。手動提交偏移能更精確地控制,但需要開發者自行管理。

理解 Kafka 消費者偏移管理的機制對於構建可靠的資料串流應用至關重要。不論是自動提交還是手動提交,都必須仔細權衡其優缺點,並根據實際應用場景選擇合適的策略。對於需要精確控制訊息處理的應用,手動提交偏移量是更好的選擇,可以避免訊息重複或遺漏。而對於容錯率要求較高的應用,則可以考慮結契約步和非同步提交,確保偏移量在各種情況下都能被正確提交。

Kafka消費者偏移管理的重要性及其運作機制

Kafka消費者組(Consumer Group)的偏移管理是確保資料正確處理的關鍵因素之一。當消費者組中有活躍成員持續傳送心跳訊號時,Kafka會保留該組最後提交的每個分割槽偏移量,以便在重新分配或重啟時能夠檢索。然而,一旦消費者組變為空,Kafka只會在設定的時間內(預設為7天)保留已提交的偏移量。若偏移量被刪除,當消費者組再次啟動時,將表現得像全新的消費者組,忘記之前的所有消費記錄。

提交偏移的機制

每次呼叫poll()方法時,它會傳回Kafka中尚未被消費者組讀取的記錄。這意味著Kafka允許消費者使用其自身的機制來跟蹤在每個分割槽中的位置(偏移量)。這種更新分割槽當前位置的行為稱為偏移提交。與傳統的訊息佇列不同,Kafka不會逐一提交記錄,而是由消費者提交最後一條成功處理的訊息偏移量,並隱含地認為該訊息之前的所有訊息都已成功處理。

消費者如何提交偏移?它會向Kafka傳送一條訊息,更新一個特殊的__consumer_offsets主題,其中包含每個分割槽的已提交偏移量。只要所有消費者保持執行,這不會有任何影響。但是,如果消費者當機或新消費者加入消費者組,將觸發重新平衡。重新平衡後,每個消費者可能會被分配到新的分割槽集合,為了知道從哪裡開始工作,消費者將讀取每個分割槽的最新已提交偏移量並從那裡繼續。

偏移管理的影響

如果已提交的偏移量小於客戶端最後處理的訊息偏移量,則最後處理的偏移量和已提交偏移量之間的訊息將被處理兩次。反之,如果已提交的偏移量大於客戶端實際處理的最後訊息偏移量,則最後處理的偏移量和已提交偏移量之間的訊息將被消費者組忽略。

自動提交偏移

最簡單的提交偏移方式是讓消費者自動完成。如果設定enable.auto.commit=true,則每隔五秒(預設間隔,由auto.commit.interval.ms控制),消費者將提交客戶端從poll()接收到的最新偏移量。與消費者中的其他操作一樣,自動提交也是由輪詢迴圈驅動的。每次輪詢時,消費者都會檢查是否到了提交時間,如果是,則會提交上一次輪詢傳回的偏移量。

然而,在使用這一便捷選項之前,瞭解其後果非常重要。考慮到預設情況下自動提交每五秒發生一次,假設在最近一次提交後的三秒鐘後消費者當機了。重新平衡後,倖存的消費者將開始消費之前由當機的代理擁有的分割槽,但它們將從最後提交的偏移量開始。在這種情況下,偏移量已經過時三秒,因此這三秒內到達的所有事件將被處理兩次。

程式碼範例:自動提交組態

enable.auto.commit=true
auto.commit.interval.ms=5000

內容解密:

  • enable.auto.commit=true:啟用自動提交功能,讓消費者定期自動提交已處理的偏移量。
  • auto.commit.interval.ms=5000:設定自動提交的時間間隔為5000毫秒(5秒),即每5秒進行一次自動提交。

手動控制偏移提交

雖然自動提交很方便,但開發者需要更多的控制權來避免重複訊息的出現。手動控制偏移提交可以提供更精確的控制,但也需要開發者自行管理偏移量的提交。

// 示例手動提交程式碼
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync(); // 同步提交偏移量
}

內容解密:

  • 建立KafkaConsumer例項,並訂閱主題列表。
  • 進入無限迴圈,不斷呼叫poll()取得記錄。
  • 遍歷記錄並印出相關資訊。
  • 使用commitSync()方法同步提交當前批次的偏移量,以確保資料處理的安全性。

Kafka 消費者偏移管理最佳實踐

在 Kafka 中,消費者偏移管理是確保資料處理正確性和可靠性的關鍵。預設情況下,Kafka 消費者會自動提交偏移,但大多數開發者會選擇手動控制偏移提交,以避免遺失訊息或減少重新平衡期間的訊息重複處理。

手動提交偏移

要啟用手動提交偏移,需要設定 enable.auto.commit=false。這樣,偏移只會在應用程式明確選擇提交時才會被提交。最簡單可靠的提交 API 是 commitSync(),它會提交 poll() 傳回的最新偏移,並在偏移提交成功後傳回,如果提交失敗則會丟擲例外。

commitSync() 使用範例

Duration timeout = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e);
    }
}

內容解密:

  1. 迴圈提取訊息:使用 while (true) 迴圈不斷提取 Kafka 中的訊息。
  2. 處理訊息:在 for 迴圈中處理每條訊息,這裡僅列印訊息的詳細資訊。
  3. 同步提交偏移:呼叫 commitSync() 提交當前批次的最後一個偏移,確保訊息處理的可靠性。
  4. 錯誤處理:捕捉 CommitFailedException 例外並記錄錯誤日誌。

非同步提交偏移

手動提交的一個缺點是應用程式會被阻塞,直到代理程式回應提交請求。這會限制應用程式的吞吐量。非同步提交 API 可以改善吞吐量,但需要小心處理提交順序。

commitAsync() 使用範例

Duration timeout = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    consumer.commitAsync();
}

內容解密:

  1. 非同步提交:呼叫 commitAsync() 非同步提交偏移,不會阻塞應用程式。
  2. 吞吐量改善:非同步提交可以提高應用程式的吞吐量,但需要注意提交失敗的情況。

重試非同步提交

為了正確處理非同步提交的重試,可以使用單調遞增的序號。在每次提交時增加序號,並在回呼中檢查序號是否與當前的序號一致,以決定是否重試。

commitAsync() 重試範例

Duration timeout = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        }
    });
}

內容解密:

  1. 回呼處理:使用 OffsetCommitCallback 處理提交結果,記錄提交失敗的偏移和錯誤資訊。
  2. 錯誤日誌記錄:在回呼中記錄提交失敗的詳細資訊,以便於除錯。

結契約步和非同步提交

通常,非同步提交足夠應對大多數情況,但在某些關鍵時刻(如關閉消費者或重新平衡前),可以結合使用同步提交以確保偏移被正確提交。

Kafka 消費者偏移管理與再平衡監聽器

在 Kafka 中,消費者組(Consumer Group)透過提交偏移量(Offset)來記錄其消費進度。正確管理偏移量對於確保資料不丟失和不重複消費至關重要。本篇文章將探討如何使用 commitAsync()commitSync() 方法提交偏移量,以及如何透過 ConsumerRebalanceListener 介面處理再平衡事件。

非同步與同步提交偏移量

Kafka 提供了兩種提交偏移量的方法:commitAsync()commitSync()。非同步提交 commitAsync() 不會阻塞,適合在正常執行時使用,因為它能夠快速提交偏移量,並且如果一次提交失敗,下一次提交可以作為重試。但是,在關閉消費者之前,應該使用同步提交 commitSync(),因為它會重試直到成功或發生不可還原的錯誤。

Duration timeout = Duration.ofMillis(100);
try {
    while (!closing) {
        ConsumerRecords<String, String> records = consumer.poll(timeout);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }
    consumer.commitSync();
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    consumer.close();
}

內容解密:

  1. Duration timeout = Duration.ofMillis(100);:設定輪詢超時時間為 100 毫秒。
  2. while (!closing):迴圈檢查是否正在關閉消費者。
  3. consumer.poll(timeout):從 Kafka 提取訊息,超時時間由 timeout 控制。
  4. consumer.commitAsync();:非同步提交偏移量,不阻塞主執行緒。
  5. consumer.commitSync();:在關閉前同步提交偏移量,確保偏移量被正確記錄。

提交指定偏移量

有時需要在處理一批訊息的中途提交偏移量,以避免在再平衡發生時重複處理所有訊息。Kafka 允許傳遞一個包含分割槽和偏移量的對映(Map)給 commitAsync()commitSync() 方法。

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

Duration timeout = Duration.ofMillis(100);
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        currentOffsets.put(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
    }
}

內容解密:

  1. currentOffsets Map:用於跟蹤每個分割槽的當前偏移量。
  2. record.offset() + 1:提交下一個預期處理的訊息的偏移量。
  3. if (count % 1000 == 0):每處理 1000 條訊息提交一次偏移量,可以根據實際需求調整。

再平衡監聽器

當消費者組發生再平衡時,消費者需要執行一些清理工作,如提交最後處理的偏移量、關閉檔案控制程式碼或資料函式庫連線等。ConsumerRebalanceListener 介面提供了三個方法來處理這些事件:onPartitionsAssigned()onPartitionsRevoked()onPartitionsLost()

  • onPartitionsAssigned():在分割槽被分配給消費者後呼叫,用於準備或載入與分割槽相關的狀態。
  • onPartitionsRevoked():在消費者需要放棄之前擁有的分割槽時呼叫,通常用於提交偏移量。
  • onPartitionsLost():僅在使用協同再平衡演算法時呼叫,用於清理與丟失分割槽相關的資源。

透過實作 ConsumerRebalanceListener,開發者可以在再平衡發生時執行自定義邏輯,確保資料的一致性和系統的健壯性。