Kafka 的消費者偏移管理對於確保資料正確處理至關重要。消費者組利用偏移量追蹤消費進度,Kafka 則保留已提交的偏移量以便重新分配或重啟。然而,若消費者組失效,Kafka 只會在設定時間內保留偏移量,逾期將被刪除,導致消費者組遺忘先前的消費記錄。偏移量提交的機制是透過消費者向 Kafka 傳送訊息,更新 __consumer_offsets 主題中的每個分割槽的已提交偏移量。提交的偏移量直接影響資料處理,偏移量過小會導致訊息重複處理,過大則會導致訊息遺漏。自動提交偏移雖然方便,但可能因提交延遲導致訊息重複。手動提交偏移能更精確地控制,但需要開發者自行管理。
理解 Kafka 消費者偏移管理的機制對於構建可靠的資料串流應用至關重要。不論是自動提交還是手動提交,都必須仔細權衡其優缺點,並根據實際應用場景選擇合適的策略。對於需要精確控制訊息處理的應用,手動提交偏移量是更好的選擇,可以避免訊息重複或遺漏。而對於容錯率要求較高的應用,則可以考慮結契約步和非同步提交,確保偏移量在各種情況下都能被正確提交。
Kafka消費者偏移管理的重要性及其運作機制
Kafka消費者組(Consumer Group)的偏移管理是確保資料正確處理的關鍵因素之一。當消費者組中有活躍成員持續傳送心跳訊號時,Kafka會保留該組最後提交的每個分割槽偏移量,以便在重新分配或重啟時能夠檢索。然而,一旦消費者組變為空,Kafka只會在設定的時間內(預設為7天)保留已提交的偏移量。若偏移量被刪除,當消費者組再次啟動時,將表現得像全新的消費者組,忘記之前的所有消費記錄。
提交偏移的機制
每次呼叫poll()方法時,它會傳回Kafka中尚未被消費者組讀取的記錄。這意味著Kafka允許消費者使用其自身的機制來跟蹤在每個分割槽中的位置(偏移量)。這種更新分割槽當前位置的行為稱為偏移提交。與傳統的訊息佇列不同,Kafka不會逐一提交記錄,而是由消費者提交最後一條成功處理的訊息偏移量,並隱含地認為該訊息之前的所有訊息都已成功處理。
消費者如何提交偏移?它會向Kafka傳送一條訊息,更新一個特殊的__consumer_offsets主題,其中包含每個分割槽的已提交偏移量。只要所有消費者保持執行,這不會有任何影響。但是,如果消費者當機或新消費者加入消費者組,將觸發重新平衡。重新平衡後,每個消費者可能會被分配到新的分割槽集合,為了知道從哪裡開始工作,消費者將讀取每個分割槽的最新已提交偏移量並從那裡繼續。
偏移管理的影響
如果已提交的偏移量小於客戶端最後處理的訊息偏移量,則最後處理的偏移量和已提交偏移量之間的訊息將被處理兩次。反之,如果已提交的偏移量大於客戶端實際處理的最後訊息偏移量,則最後處理的偏移量和已提交偏移量之間的訊息將被消費者組忽略。
自動提交偏移
最簡單的提交偏移方式是讓消費者自動完成。如果設定enable.auto.commit=true,則每隔五秒(預設間隔,由auto.commit.interval.ms控制),消費者將提交客戶端從poll()接收到的最新偏移量。與消費者中的其他操作一樣,自動提交也是由輪詢迴圈驅動的。每次輪詢時,消費者都會檢查是否到了提交時間,如果是,則會提交上一次輪詢傳回的偏移量。
然而,在使用這一便捷選項之前,瞭解其後果非常重要。考慮到預設情況下自動提交每五秒發生一次,假設在最近一次提交後的三秒鐘後消費者當機了。重新平衡後,倖存的消費者將開始消費之前由當機的代理擁有的分割槽,但它們將從最後提交的偏移量開始。在這種情況下,偏移量已經過時三秒,因此這三秒內到達的所有事件將被處理兩次。
程式碼範例:自動提交組態
enable.auto.commit=true
auto.commit.interval.ms=5000
內容解密:
enable.auto.commit=true:啟用自動提交功能,讓消費者定期自動提交已處理的偏移量。auto.commit.interval.ms=5000:設定自動提交的時間間隔為5000毫秒(5秒),即每5秒進行一次自動提交。
手動控制偏移提交
雖然自動提交很方便,但開發者需要更多的控制權來避免重複訊息的出現。手動控制偏移提交可以提供更精確的控制,但也需要開發者自行管理偏移量的提交。
// 示例手動提交程式碼
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 同步提交偏移量
}
內容解密:
- 建立
KafkaConsumer例項,並訂閱主題列表。 - 進入無限迴圈,不斷呼叫
poll()取得記錄。 - 遍歷記錄並印出相關資訊。
- 使用
commitSync()方法同步提交當前批次的偏移量,以確保資料處理的安全性。
Kafka 消費者偏移管理最佳實踐
在 Kafka 中,消費者偏移管理是確保資料處理正確性和可靠性的關鍵。預設情況下,Kafka 消費者會自動提交偏移,但大多數開發者會選擇手動控制偏移提交,以避免遺失訊息或減少重新平衡期間的訊息重複處理。
手動提交偏移
要啟用手動提交偏移,需要設定 enable.auto.commit=false。這樣,偏移只會在應用程式明確選擇提交時才會被提交。最簡單可靠的提交 API 是 commitSync(),它會提交 poll() 傳回的最新偏移,並在偏移提交成功後傳回,如果提交失敗則會丟擲例外。
commitSync() 使用範例
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e);
}
}
內容解密:
- 迴圈提取訊息:使用
while (true)迴圈不斷提取 Kafka 中的訊息。 - 處理訊息:在
for迴圈中處理每條訊息,這裡僅列印訊息的詳細資訊。 - 同步提交偏移:呼叫
commitSync()提交當前批次的最後一個偏移,確保訊息處理的可靠性。 - 錯誤處理:捕捉
CommitFailedException例外並記錄錯誤日誌。
非同步提交偏移
手動提交的一個缺點是應用程式會被阻塞,直到代理程式回應提交請求。這會限制應用程式的吞吐量。非同步提交 API 可以改善吞吐量,但需要小心處理提交順序。
commitAsync() 使用範例
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
內容解密:
- 非同步提交:呼叫
commitAsync()非同步提交偏移,不會阻塞應用程式。 - 吞吐量改善:非同步提交可以提高應用程式的吞吐量,但需要注意提交失敗的情況。
重試非同步提交
為了正確處理非同步提交的重試,可以使用單調遞增的序號。在每次提交時增加序號,並在回呼中檢查序號是否與當前的序號一致,以決定是否重試。
commitAsync() 重試範例
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
內容解密:
- 回呼處理:使用
OffsetCommitCallback處理提交結果,記錄提交失敗的偏移和錯誤資訊。 - 錯誤日誌記錄:在回呼中記錄提交失敗的詳細資訊,以便於除錯。
結契約步和非同步提交
通常,非同步提交足夠應對大多數情況,但在某些關鍵時刻(如關閉消費者或重新平衡前),可以結合使用同步提交以確保偏移被正確提交。
Kafka 消費者偏移管理與再平衡監聽器
在 Kafka 中,消費者組(Consumer Group)透過提交偏移量(Offset)來記錄其消費進度。正確管理偏移量對於確保資料不丟失和不重複消費至關重要。本篇文章將探討如何使用 commitAsync() 和 commitSync() 方法提交偏移量,以及如何透過 ConsumerRebalanceListener 介面處理再平衡事件。
非同步與同步提交偏移量
Kafka 提供了兩種提交偏移量的方法:commitAsync() 和 commitSync()。非同步提交 commitAsync() 不會阻塞,適合在正常執行時使用,因為它能夠快速提交偏移量,並且如果一次提交失敗,下一次提交可以作為重試。但是,在關閉消費者之前,應該使用同步提交 commitSync(),因為它會重試直到成功或發生不可還原的錯誤。
Duration timeout = Duration.ofMillis(100);
try {
while (!closing) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
consumer.commitSync();
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
}
內容解密:
Duration timeout = Duration.ofMillis(100);:設定輪詢超時時間為 100 毫秒。while (!closing):迴圈檢查是否正在關閉消費者。consumer.poll(timeout):從 Kafka 提取訊息,超時時間由timeout控制。consumer.commitAsync();:非同步提交偏移量,不阻塞主執行緒。consumer.commitSync();:在關閉前同步提交偏移量,確保偏移量被正確記錄。
提交指定偏移量
有時需要在處理一批訊息的中途提交偏移量,以避免在再平衡發生時重複處理所有訊息。Kafka 允許傳遞一個包含分割槽和偏移量的對映(Map)給 commitAsync() 或 commitSync() 方法。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
內容解密:
currentOffsetsMap:用於跟蹤每個分割槽的當前偏移量。record.offset() + 1:提交下一個預期處理的訊息的偏移量。if (count % 1000 == 0):每處理 1000 條訊息提交一次偏移量,可以根據實際需求調整。
再平衡監聽器
當消費者組發生再平衡時,消費者需要執行一些清理工作,如提交最後處理的偏移量、關閉檔案控制程式碼或資料函式庫連線等。ConsumerRebalanceListener 介面提供了三個方法來處理這些事件:onPartitionsAssigned()、onPartitionsRevoked() 和 onPartitionsLost()。
onPartitionsAssigned():在分割槽被分配給消費者後呼叫,用於準備或載入與分割槽相關的狀態。onPartitionsRevoked():在消費者需要放棄之前擁有的分割槽時呼叫,通常用於提交偏移量。onPartitionsLost():僅在使用協同再平衡演算法時呼叫,用於清理與丟失分割槽相關的資源。
透過實作 ConsumerRebalanceListener,開發者可以在再平衡發生時執行自定義邏輯,確保資料的一致性和系統的健壯性。