Kafka 生產者負責將訊息傳送至 Kafka 叢集,其效能和可靠性至關重要。訊息傳送方式主要有三種:fire-and-forget、同步傳送和非同步傳送,各有其適用場景。理解 acks、retries、linger.ms 等關鍵組態引數,才能有效控制訊息的可靠性、吞吐量和資源使用。此外,正確的錯誤處理策略,例如使用回呼函式和區分可重試錯誤與不可重試錯誤,對於確保系統穩定性也相當重要。

Kafka 生產者實作詳解:傳送訊息的方法與錯誤處理

在建構 Kafka 生產者時,首先需要設定相關的屬性。以下是一個簡單的範例,展示如何初始化一個 Kafka 生產者,並使用不同的方式傳送訊息至 Kafka 叢集。

初始化 Kafka 生產者

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);

內容解密:

  1. kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");:此行程式碼設定了 Kafka 生產者的 key.serializer 屬性,指定使用 StringSerializer 來序列化訊息的鍵。
  2. kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");:此行設定了 value.serializer 屬性,使用 StringSerializer 來序列化訊息的值。
  3. producer = new KafkaProducer<String, String>(kafkaProps);:建立一個新的 KafkaProducer 例項,指定鍵和值的型別均為 String,並傳入之前設定的屬性物件。

傳送訊息至 Kafka 的方法

Kafka 生產者提供了三種主要的訊息傳送方式:Fire-and-forget、Synchronous send 和 Asynchronous send。

Fire-and-forget

此方法直接傳送訊息至 Kafka 叢集,不等待任何確認回應。

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

內容解密:

  1. ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");:建立一個 ProducerRecord 物件,指定主題名稱、鍵和值。
  2. producer.send(record);:使用 send() 方法傳送訊息,不等待回應。
  3. catch (Exception e):捕捉可能發生的異常,如序列化錯誤或緩衝區已滿等。

Synchronous send

此方法會等待 Kafka 的回應,以確認訊息是否成功傳送。

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}

內容解密:

  1. producer.send(record).get();:使用 send() 方法傳送訊息,並呼叫 get() 方法等待回應,以確認傳送是否成功。
  2. catch (Exception e):捕捉任何在傳送過程中發生的錯誤,如重試次數耗盡或不可重試的錯誤。

Asynchronous send

此方法使用回呼函式處理傳送結果,可以非同步地傳送訊息並處理錯誤。

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());

內容解密:

  1. private class DemoProducerCallback implements Callback:定義一個實作 Callback 介面的類別,用於處理傳送結果。
  2. public void onCompletion(RecordMetadata recordMetadata, Exception e):當傳送完成時呼叫此方法,若發生錯誤,則印出異常資訊。
  3. producer.send(record, new DemoProducerCallback());:傳送訊息並指定回呼函式,以非同步方式處理結果。

Kafka 生產者組態與訊息傳遞控制

在 Kafka 中,生產者(Producer)是負責將訊息傳送到 Kafka 叢集的元件。為了確保訊息的可靠傳遞和提高生產者的效能,Kafka 提供了多種組態引數來控制生產者的行為。

回呼函式(Callback)的使用

在傳送訊息時,可以使用回呼函式來處理傳送結果。回呼函式需要實作 org.apache.kafka.clients.producer.Callback 介面,該介面定義了一個 onCompletion() 方法。當 Kafka 傳回錯誤時,onCompletion() 方法將被呼叫,並傳入一個非空的異常物件。

new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

內容解密:

  1. ProducerRecord 物件被建立,包含主題(Topic)、鍵(Key)和值(Value)。
  2. producer.send() 方法被呼叫,將 ProducerRecord 物件傳送到 Kafka 叢集。
  3. DemoProducerCallback 物件被傳入 send() 方法,用於處理傳送結果。

組態生產者

Kafka 生產者有多個組態引數,可以用來控制其行為。以下是一些重要的組態引數:

client.id

  • 用於標識客戶端和應用程式的字串。
  • 用於日誌記錄、指標收集和配額管理。

acks

  • 控制多少個分割槽副本必須接收到訊息,生產者才認為寫入成功。
  • 有三個可選值:01all
acks=0
  • 生產者不會等待代理伺服器的回應,直接認為訊息傳送成功。
  • 如果代理伺服器沒有接收到訊息,生產者不會知道,訊息將會遺失。
  • 可以達到很高的吞吐量。
acks=1
  • 生產者會等待長官者副本接收到訊息後,才認為寫入成功。
  • 如果長官者副本當機,生產者會收到錯誤回應,可以重試傳送訊息。
acks=all
  • 生產者會等待所有同步副本接收到訊息後,才認為寫入成功。
  • 這是最安全的模式,可以確保訊息不會遺失。
// acks 組態範例
Properties props = new Properties();
props.put("acks", "all");

內容解密:

  1. acks 引數控制生產者的可靠性。
  2. 不同的 acks 值會影響生產者的效能和可靠性。
  3. 需要根據具體的使用場景選擇合適的 acks 值。

訊息傳遞時間控制

Kafka 生產者有多個組態引數,可以用來控制訊息傳遞的時間。以下是一些重要的組態引數:

max.block.ms

  • 控制生產者在呼叫 send() 方法時,最多可以阻塞多少毫秒。
  • 當緩衝區已滿或後設資料不可用時,send() 方法會被阻塞。
// max.block.ms 組態範例
Properties props = new Properties();
props.put("max.block.ms", "5000");

內容解密:

  1. max.block.ms 引數控制生產者的阻塞時間。
  2. max.block.ms 到期後,send() 方法會丟擲異常。

圖示:Kafka 生產者訊息傳遞流程

@startuml
note
  無法自動轉換的 Plantuml 圖表
  請手動檢查和調整
@enduml

此圖示展示了 Kafka 生產者的訊息傳遞流程,包括 send() 方法的呼叫和回呼函式的處理。

Kafka 生產者組態引數詳解

Kafka 生產者提供了多項組態引數,以控制生產者的行為和效能。瞭解這些引數有助於最佳化生產者的表現和提高系統的可靠性。

超時相關組態

delivery.timeout.ms

此引數控制從記錄準備好傳送到 broker 回應或客戶端放棄的時間,包括重試的時間。它應該大於 linger.msrequest.timeout.ms。如果超過此時間,callback 將被呼叫並附帶一個異常。

request.timeout.ms

此引數控制生產者在傳送資料時等待伺服器回應的時間。如果超時,生產者將重試或呼叫 callback 並附帶一個 TimeoutException

max.block.ms

當生產者的傳送緩衝區滿或後設資料不可用時,此引數控制 send() 方法阻塞的最大時間。超過此時間,將丟擲一個超時異常。

重試相關組態

retriesretry.backoff.ms

retries 引數控制生產者在遇到暫時性錯誤時重試的次數。retry.backoff.ms 控制重試之間的等待時間。建議不要直接使用這些引數,而是組態 delivery.timeout.ms 以控制總的重試時間。

批次和緩衝區相關組態

linger.ms

此引數控制在傳送當前批次之前等待額外訊息的時間。透過設定 linger.ms 大於 0,可以增加吞吐量並減少每條訊息的開銷。

buffer.memory

此引數設定生產者用於緩衝等待傳送到 broker 的訊息的記憶體量。如果應用程式傳送訊息的速度快於它們可以被傳遞到伺服器的速度,生產者可能會耗盡空間,導致 send() 呼叫阻塞。

batch.size

此引數控制每個批次使用的記憶體量(以位元組為單位)。當批次滿時,所有訊息將被傳送。但是,生產者不一定會等待批次變滿。

壓縮相關組態

compression.type

此引數可以設定為 snappygziplz4zstd,以啟用相應的壓縮演算法。壓縮可以減少網路利用率和儲存需求。

其他重要組態

max.in.flight.requests.per.connection

此引數控制在單個連線上可以傳送的最大未確認請求數量。

重試和錯誤處理的最佳實踐

  • 不要在應用程式邏輯中處理重試,而應該依賴生產者的重試機制。
  • 透過組態 delivery.timeout.ms 來控制總的重試時間,而不是直接設定 retriesretry.backoff.ms
  • 對於非重試性錯誤,應在應用程式邏輯中進行處理。

範例程式碼與解析

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("delivery.timeout.ms", "120000"); // 設定 delivery.timeout.ms 為 120 秒
props.put("request.timeout.ms", "30000"); // 設定 request.timeout.ms 為 30 秒
props.put("linger.ms", "10"); // 設定 linger.ms 為 10 毫秒
props.put("compression.type", "snappy"); // 啟用 Snappy 壓縮

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            // 處理異常
        } else {
            // 處理成功傳送的情況
        }
    }
});
producer.close();

程式碼解析:

  1. 建立 Kafka 生產者組態屬性:首先,建立一個 Properties 物件來存放 Kafka 生產者的組態屬性。
  2. 設定 Kafka 叢集連線資訊:透過 bootstrap.servers 屬性指定 Kafka 叢集的連線資訊。
  3. 設定訊息確認機制:使用 acks 屬性設定訊息確認機制為 all,確保所有副本都確認收到訊息。
  4. 設定金鑰與值的序列化器:指定金鑰和值的序列化器為字串序列化器。
  5. 設定傳輸超時與壓縮:組態 delivery.timeout.msrequest.timeout.mslinger.mscompression.type 等屬性,以最佳化生產者的效能和行為。
  6. 建立 Kafka 生產者例項:使用組態好的屬性建立一個 KafkaProducer 例項。
  7. 傳送訊息:建立一個 ProducerRecord 物件,並使用 send() 方法將訊息傳送到指定的主題。
  8. 處理回撥:提供一個 Callback 實作來處理訊息傳送的結果,包括成功和失敗的情況。
  9. 關閉生產者:最後,關閉 Kafka 生產者例項以釋放資源。

內容解密:

  • delivery.timeout.ms:設定為 120 秒,以允許足夠的時間來處理重試和 broker 回應。
  • request.timeout.ms:設定為 30 秒,以控制等待 broker 回應的時間。
  • linger.ms:設定為 10 毫秒,以增加吞吐量和減少每條訊息的開銷。
  • compression.type:啟用 Snappy 壓縮,以減少網路利用率和儲存需求。
  • Callback 處理:提供了一個回撥實作來處理訊息傳送的結果,包括例外處理和成功處理。