Kafka 生產者負責將訊息傳送至 Kafka 叢集,其效能和可靠性至關重要。訊息傳送方式主要有三種:fire-and-forget、同步傳送和非同步傳送,各有其適用場景。理解 acks、retries、linger.ms 等關鍵組態引數,才能有效控制訊息的可靠性、吞吐量和資源使用。此外,正確的錯誤處理策略,例如使用回呼函式和區分可重試錯誤與不可重試錯誤,對於確保系統穩定性也相當重要。
Kafka 生產者實作詳解:傳送訊息的方法與錯誤處理
在建構 Kafka 生產者時,首先需要設定相關的屬性。以下是一個簡單的範例,展示如何初始化一個 Kafka 生產者,並使用不同的方式傳送訊息至 Kafka 叢集。
初始化 Kafka 生產者
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
內容解密:
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");:此行程式碼設定了 Kafka 生產者的key.serializer屬性,指定使用StringSerializer來序列化訊息的鍵。kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");:此行設定了value.serializer屬性,使用StringSerializer來序列化訊息的值。producer = new KafkaProducer<String, String>(kafkaProps);:建立一個新的KafkaProducer例項,指定鍵和值的型別均為String,並傳入之前設定的屬性物件。
傳送訊息至 Kafka 的方法
Kafka 生產者提供了三種主要的訊息傳送方式:Fire-and-forget、Synchronous send 和 Asynchronous send。
Fire-and-forget
此方法直接傳送訊息至 Kafka 叢集,不等待任何確認回應。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
內容解密:
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");:建立一個ProducerRecord物件,指定主題名稱、鍵和值。producer.send(record);:使用send()方法傳送訊息,不等待回應。catch (Exception e):捕捉可能發生的異常,如序列化錯誤或緩衝區已滿等。
Synchronous send
此方法會等待 Kafka 的回應,以確認訊息是否成功傳送。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
內容解密:
producer.send(record).get();:使用send()方法傳送訊息,並呼叫get()方法等待回應,以確認傳送是否成功。catch (Exception e):捕捉任何在傳送過程中發生的錯誤,如重試次數耗盡或不可重試的錯誤。
Asynchronous send
此方法使用回呼函式處理傳送結果,可以非同步地傳送訊息並處理錯誤。
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
內容解密:
private class DemoProducerCallback implements Callback:定義一個實作Callback介面的類別,用於處理傳送結果。public void onCompletion(RecordMetadata recordMetadata, Exception e):當傳送完成時呼叫此方法,若發生錯誤,則印出異常資訊。producer.send(record, new DemoProducerCallback());:傳送訊息並指定回呼函式,以非同步方式處理結果。
Kafka 生產者組態與訊息傳遞控制
在 Kafka 中,生產者(Producer)是負責將訊息傳送到 Kafka 叢集的元件。為了確保訊息的可靠傳遞和提高生產者的效能,Kafka 提供了多種組態引數來控制生產者的行為。
回呼函式(Callback)的使用
在傳送訊息時,可以使用回呼函式來處理傳送結果。回呼函式需要實作 org.apache.kafka.clients.producer.Callback 介面,該介面定義了一個 onCompletion() 方法。當 Kafka 傳回錯誤時,onCompletion() 方法將被呼叫,並傳入一個非空的異常物件。
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
內容解密:
ProducerRecord物件被建立,包含主題(Topic)、鍵(Key)和值(Value)。producer.send()方法被呼叫,將ProducerRecord物件傳送到 Kafka 叢集。DemoProducerCallback物件被傳入send()方法,用於處理傳送結果。
組態生產者
Kafka 生產者有多個組態引數,可以用來控制其行為。以下是一些重要的組態引數:
client.id
- 用於標識客戶端和應用程式的字串。
- 用於日誌記錄、指標收集和配額管理。
acks
- 控制多少個分割槽副本必須接收到訊息,生產者才認為寫入成功。
- 有三個可選值:
0、1和all。
acks=0
- 生產者不會等待代理伺服器的回應,直接認為訊息傳送成功。
- 如果代理伺服器沒有接收到訊息,生產者不會知道,訊息將會遺失。
- 可以達到很高的吞吐量。
acks=1
- 生產者會等待長官者副本接收到訊息後,才認為寫入成功。
- 如果長官者副本當機,生產者會收到錯誤回應,可以重試傳送訊息。
acks=all
- 生產者會等待所有同步副本接收到訊息後,才認為寫入成功。
- 這是最安全的模式,可以確保訊息不會遺失。
// acks 組態範例
Properties props = new Properties();
props.put("acks", "all");
內容解密:
acks引數控制生產者的可靠性。- 不同的
acks值會影響生產者的效能和可靠性。 - 需要根據具體的使用場景選擇合適的
acks值。
訊息傳遞時間控制
Kafka 生產者有多個組態引數,可以用來控制訊息傳遞的時間。以下是一些重要的組態引數:
max.block.ms
- 控制生產者在呼叫
send()方法時,最多可以阻塞多少毫秒。 - 當緩衝區已滿或後設資料不可用時,
send()方法會被阻塞。
// max.block.ms 組態範例
Properties props = new Properties();
props.put("max.block.ms", "5000");
內容解密:
max.block.ms引數控制生產者的阻塞時間。- 當
max.block.ms到期後,send()方法會丟擲異常。
圖示:Kafka 生產者訊息傳遞流程
@startuml
note
無法自動轉換的 Plantuml 圖表
請手動檢查和調整
@enduml此圖示展示了 Kafka 生產者的訊息傳遞流程,包括 send() 方法的呼叫和回呼函式的處理。
Kafka 生產者組態引數詳解
Kafka 生產者提供了多項組態引數,以控制生產者的行為和效能。瞭解這些引數有助於最佳化生產者的表現和提高系統的可靠性。
超時相關組態
delivery.timeout.ms
此引數控制從記錄準備好傳送到 broker 回應或客戶端放棄的時間,包括重試的時間。它應該大於 linger.ms 和 request.timeout.ms。如果超過此時間,callback 將被呼叫並附帶一個異常。
request.timeout.ms
此引數控制生產者在傳送資料時等待伺服器回應的時間。如果超時,生產者將重試或呼叫 callback 並附帶一個 TimeoutException。
max.block.ms
當生產者的傳送緩衝區滿或後設資料不可用時,此引數控制 send() 方法阻塞的最大時間。超過此時間,將丟擲一個超時異常。
重試相關組態
retries 和 retry.backoff.ms
retries 引數控制生產者在遇到暫時性錯誤時重試的次數。retry.backoff.ms 控制重試之間的等待時間。建議不要直接使用這些引數,而是組態 delivery.timeout.ms 以控制總的重試時間。
批次和緩衝區相關組態
linger.ms
此引數控制在傳送當前批次之前等待額外訊息的時間。透過設定 linger.ms 大於 0,可以增加吞吐量並減少每條訊息的開銷。
buffer.memory
此引數設定生產者用於緩衝等待傳送到 broker 的訊息的記憶體量。如果應用程式傳送訊息的速度快於它們可以被傳遞到伺服器的速度,生產者可能會耗盡空間,導致 send() 呼叫阻塞。
batch.size
此引數控制每個批次使用的記憶體量(以位元組為單位)。當批次滿時,所有訊息將被傳送。但是,生產者不一定會等待批次變滿。
壓縮相關組態
compression.type
此引數可以設定為 snappy、gzip、lz4 或 zstd,以啟用相應的壓縮演算法。壓縮可以減少網路利用率和儲存需求。
其他重要組態
max.in.flight.requests.per.connection
此引數控制在單個連線上可以傳送的最大未確認請求數量。
重試和錯誤處理的最佳實踐
- 不要在應用程式邏輯中處理重試,而應該依賴生產者的重試機制。
- 透過組態
delivery.timeout.ms來控制總的重試時間,而不是直接設定retries和retry.backoff.ms。 - 對於非重試性錯誤,應在應用程式邏輯中進行處理。
範例程式碼與解析
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("delivery.timeout.ms", "120000"); // 設定 delivery.timeout.ms 為 120 秒
props.put("request.timeout.ms", "30000"); // 設定 request.timeout.ms 為 30 秒
props.put("linger.ms", "10"); // 設定 linger.ms 為 10 毫秒
props.put("compression.type", "snappy"); // 啟用 Snappy 壓縮
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 處理異常
} else {
// 處理成功傳送的情況
}
}
});
producer.close();
程式碼解析:
- 建立 Kafka 生產者組態屬性:首先,建立一個
Properties物件來存放 Kafka 生產者的組態屬性。 - 設定 Kafka 叢集連線資訊:透過
bootstrap.servers屬性指定 Kafka 叢集的連線資訊。 - 設定訊息確認機制:使用
acks屬性設定訊息確認機制為all,確保所有副本都確認收到訊息。 - 設定金鑰與值的序列化器:指定金鑰和值的序列化器為字串序列化器。
- 設定傳輸超時與壓縮:組態
delivery.timeout.ms、request.timeout.ms、linger.ms和compression.type等屬性,以最佳化生產者的效能和行為。 - 建立 Kafka 生產者例項:使用組態好的屬性建立一個
KafkaProducer例項。 - 傳送訊息:建立一個
ProducerRecord物件,並使用send()方法將訊息傳送到指定的主題。 - 處理回撥:提供一個
Callback實作來處理訊息傳送的結果,包括成功和失敗的情況。 - 關閉生產者:最後,關閉 Kafka 生產者例項以釋放資源。
內容解密:
- delivery.timeout.ms:設定為 120 秒,以允許足夠的時間來處理重試和 broker 回應。
- request.timeout.ms:設定為 30 秒,以控制等待 broker 回應的時間。
- linger.ms:設定為 10 毫秒,以增加吞吐量和減少每條訊息的開銷。
- compression.type:啟用 Snappy 壓縮,以減少網路利用率和儲存需求。
Callback處理:提供了一個回撥實作來處理訊息傳送的結果,包括例外處理和成功處理。