Kafka 生產者的組態調校對於訊息傳輸的效率和可靠性至關重要。理解 max.in.flight.requests.per.connection、max.request.size 等引數的影響,可以有效控制資源使用和吞吐量。訊息的序列化和反序列化則決定了資料交換的效率和跨平台相容性,選擇合適的序列化方式如 Avro,並搭配 Schema Registry 管理結構描述版本,是建構穩健串流應用程式的關鍵。最後,分割槽策略的選擇會直接影響到訊息的順序性和消費者端的負載平衡,需要根據應用場景選擇合適的鍵值和分割槽器。
Kafka 生產者組態與序列化詳解
在 Kafka 中,生產者(Producer)的組態對於確保資料的正確傳輸和處理至關重要。本文將探討 Kafka 生產者的關鍵組態引數、排序保證、以及序列化機制。
生產者組態參解析
max.in.flight.requests.per.connection
此引數控制生產者在未收到伺服器回應的情況下,可以傳送多少批次的訊息到伺服器。較高的設定可以增加記憶體使用量,同時提高吞吐量。根據 Apache 的 wiki 實驗,在單一資料中心環境中,吞吐量在只有 2 個 in-flight 請求時達到最大,但預設值為 5,並且表現相似。
max.request.size
此設定控制生產者傳送的 produce 請求的大小。它限制了可以傳送的最大訊息大小,以及生產者在一個請求中可以傳送的訊息數量。例如,預設的最大請求大小為 1 MB,這意味著可以傳送的最大訊息是 1 MB,或者生產者可以將 1,024 個大小為 1 KB 的訊息批次到一個請求中。
receive.buffer.bytes 和 send.buffer.bytes
這些引數設定了在寫入和讀取資料時使用的 TCP 傳送和接收緩衝區的大小。如果設定為 -1,則使用作業系統的預設值。當生產者或消費者與不同資料中心的 broker 通訊時,增加這些值是一個好主意,因為這些網路連結通常具有較高的延遲和較低的頻寬。
enable.idempotence
啟用冪等生產者(Idempotent Producer)可以保證訊息的順序性,並且避免重試時引入重複訊息。當 enable.idempotence=true 時,生產者會為每個傳送的記錄附加一個序列號。如果 broker 收到具有相同序列號的記錄,它將拒絕第二個副本,生產者將收到無害的 DuplicateSequenceException。
排序保證
Kafka 在 partition 內保留訊息的順序。這意味著,如果訊息從生產者以特定的順序傳送,broker 將以相同的順序將它們寫入 partition,所有消費者都將以相同的順序讀取它們。
設定 retries 和 max.in.flight.requests.per.connection
為了最大化可靠性和效能,需要小心設定 retries 和 max.in.flight.requests.per.connection。設定 retries 為非零值,並且 max.in.flight.requests.per.connection 大於 1,可能會導致訊息順序被顛倒。解決方案是設定 enable.idempotence=true,這保證了訊息順序,並且避免了重試引入的重複訊息。
序列化機制
Kafka 提供了多種序列化器(Serializer),包括整數、位元組陣列等。但是,大多數情況下,使用者需要序列化更通用的記錄。
自定義序列化器
當需要傳送到 Kafka 的物件不是簡單的字串或整數時,使用者可以選擇使用通用的序列化函式庫,如 Avro、Thrift 或 Protobuf,或者為已經使用的物件建立自定義序列化。
以下是一個自定義序列化器的範例,用於序列化 Customer 物件:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// 無需組態
}
@Override
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// 無需關閉
}
}
使用自定義序列化器
使用自定義的 CustomerSerializer 組態生產者,可以定義 ProducerRecord<String, Customer>,並直接將 Customer 物件傳遞給生產者。
此圖示說明瞭 Kafka 生產者的主要組態選項和序列化機制之間的關係,有助於理解如何根據具體需求進行組態和最佳化。
詳細分析圖示內容:
- 圖示中展示了 Kafka 生產者的主要組態選項,包括效能相關的引數和冪等性設定。
- 同時,也展示了排序保證和序列化機制的相關內容。
- 透過此圖示,讀者可以更直觀地理解 Kafka 生產者的組態和序列化流程。
內容解密:
- Kafka 生產者組態的重要性:正確的組態對於確保資料正確傳輸和處理非常重要。
max.in.flight.requests.per.connection的作用:控制生產者在未收到回應的情況下可以傳送多少批次的訊息。enable.idempotence的優勢:保證訊息順序,避免重試時引入重複訊息。- 自定義序列化器的應用:滿足特定應用需求,對非簡單資料型別進行有效序列化。
- 圖示的作用:直觀展示 Kafka 生產者的組態和序列化流程,有助於讀者理解。
使用Apache Avro進行序列化
在開發Kafka應用程式時,資料序列化的方式對於系統的可維護性和相容性至關重要。直接使用Java序列化雖然方便,但存在許多問題,例如難以維護不同版本的相容性、除錯困難等。因此,建議使用現有的序列化框架,如JSON、Apache Avro、Thrift或Protobuf。本章節將重點介紹Apache Avro,並展示如何使用Avro序列化記錄並將其傳送到Kafka。
Apache Avro簡介
Apache Avro是一種語言中立的資料序列化格式,由Doug Cutting建立,旨在提供一種方式來與大量使用者共用資料檔案。Avro資料由語言無關的結構描述(schema)定義,通常使用JSON描述,而序列化通常是二進位檔案,但也支援序列化為JSON。Avro假設在讀寫檔案時存在結構描述,通常將結構描述嵌入檔案本身。
Avro的優勢
Avro最有趣的特點之一是,當寫入訊息的應用程式切換到新的但相容的結構描述時,讀取資料的應用程式可以繼續處理訊息,而無需任何變更或更新。例如,假設原始結構描述如下:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
在幾個月後,我們決定升級到新版本,不再包含傳真號碼欄位,而是使用電子郵件欄位。新結構描述如下:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
]
}
內容解密:
- 結構描述變更:新舊結構描述的變更體現了Avro的相容性設計。舊記錄包含
faxNumber,而新記錄包含email。 - 讀取應用程式的行為:當讀取應用程式遇到使用新結構描述寫入的訊息時,
getName()和getId()方法仍然有效,但getFaxNumber()將傳回null。同樣,當升級後的讀取應用程式遇到使用舊結構描述寫入的訊息時,getEmail()將傳回null。 - 相容性規則:Avro的檔案中包含了相容性規則,確保寫入資料的結構描述和讀取應用程式預期的結構描述必須相容。
- 反序列化器的需求:反序列化器需要存取寫入資料時使用的結構描述,即使它與存取資料的應用程式預期的結構描述不同。
將Avro記錄與Kafka結合使用
與Avro檔案不同,將整個結構描述儲存在每個記錄中通常會使記錄大小增加一倍以上。然而,Avro仍然需要在讀取記錄時存在整個結構描述,因此我們需要將結構描述儲存在其他地方。為此,我們遵循常見的架構模式,使用Schema Registry。Schema Registry不是Apache Kafka的一部分,但有多個開源選項可供選擇。本例中使用Confluent Schema Registry。
Schema Registry的工作流程
- 儲存結構描述:將所有用於將資料寫入Kafka的結構描述儲存在Schema Registry中。
- 儲存結構描述識別符:在產生到Kafka的記錄中儲存結構描述的識別符。
- 反序列化資料:消費者可以使用識別符從Schema Registry中提取記錄並反序列化資料。
程式碼範例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
Producer<String, Customer> producer = new KafkaProducer<>(props);
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getName(), customer);
producer.send(record);
}
內容解密:
Properties組態:組態Kafka生產者的屬性,包括bootstrap.servers、key.serializer、value.serializer和schema.registry.url。- KafkaAvroSerializer的使用:使用Confluent提供的
KafkaAvroSerializer進行Avro序列化。 - 產生Avro物件到Kafka:建立一個Kafka生產者,並不斷產生新的
Customer物件到指定的Kafka主題中。
Kafka 生產者中的序列化與分割槽策略
在建構 Kafka 生產者時,序列化和分割槽是兩個至關重要的概念。正確地選擇序列化方式和分割槽策略對於確保資料的正確性和高效性至關重要。
使用 Avro 進行序列化
Apache Kafka 支援多種序列化格式,其中 Avro 是最為流行的選擇之一。Avro 提供了一種緊湊的二進位制格式,並且與 Schema Registry 整合得非常好,能夠有效地管理和演化資料結構。
使用生成的 Avro 物件進行序列化
在 Kafka 中使用 Avro 進行序列化時,首先需要定義一個 Avro schema。然後,可以使用 Avro 的程式碼生成工具根據 schema 生成相應的 Java 類別。這些生成的類別不是普通的 Java 物件(POJO),而是具有特定方法的 Avro 物件,能夠與 Avro 序列化器無縫合作。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, Customer> producer = new KafkaProducer<>(props);
Customer customer = Customer.newBuilder()
.setId(1)
.setName("exampleCustomer")
.setEmail("example@example.com")
.build();
ProducerRecord<String, Customer> record = new ProducerRecord<>("customerContacts", customer.getName().toString(), customer);
producer.send(record);
內容解密:
- 設定屬性:建立一個
Properties物件,並設定 Kafka 生產者的必要屬性,包括引導伺服器、鍵和值的序列化器,以及 Schema Registry 的 URL。 - 建立生產者:使用設定的屬性建立一個
KafkaProducer例項,指定鍵和值的型別為String和Customer。 - 建立 Customer 物件:使用生成的
Customer類別建立一個新的客戶物件,並設定其屬性。 - 建立 ProducerRecord:建立一個
ProducerRecord,指定主題、鍵和值。鍵在這裡是客戶的名字,值是Customer物件。 - 傳送記錄:使用生產者的
send方法將記錄傳送到 Kafka 主題。
使用通用 Avro 物件進行序列化
除了使用生成的 Avro 物件外,還可以使用通用 Avro 物件(GenericRecord)來進行序列化。這種方式不需要提前生成特定的 Java 類別,而是直接根據提供的 schema 建立物件。
String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", \"name\": \"Customer\",\"fields\": [{\"name\": \"id\", \"type\": \"int\"},{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"email\", \"type\": [\"null\",\"string\"], \"default\":\"null\" }]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", 1);
customer.put("name", "exampleCustomer");
customer.put("email", "example@example.com");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("customerContacts", customer.get("name").toString(), customer);
producer.send(record);
內容解密:
- 定義 Schema:定義一個 Avro schema 字串,描述了
Customer資料結構。 - 解析 Schema:使用
Schema.Parser解析 schema 字串得到Schema物件。 - 建立通用記錄:建立一個
GenericRecord物件,並根據 schema 設定其欄位值。 - 建立 ProducerRecord:建立一個
ProducerRecord,主題為 “customerContacts”,鍵為客戶的名字,值為剛建立的GenericRecord。 - 傳送記錄:將記錄傳送到 Kafka。
分割槽策略
Kafka 中的分割槽策略決定了訊息如何被分配到不同的分割槽中。預設的分割槽器會根據訊息的鍵來決定分割槽,如果鍵為空,則會隨機選擇一個分割槽。
鍵為空的分割槽策略
當訊息的鍵為空時,預設的分割槽器會採用黏性(sticky)輪詢(round-robin)演算法。這意味著在切換到下一個分割槽之前,會盡可能地填充當前分割槽的批次,從而減少請求數量並提高效能。
鍵不為空的分割槽策略
當訊息的鍵不為空時,預設的分割槽器會根據鍵的雜湊值來決定分割槽。這確保了具有相同鍵的訊息總是被寫入相同的分割槽。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
producer.send(record);
內容解密:
- 建立 ProducerRecord:建立一個
ProducerRecord,主題為 “CustomerCountry”,鍵為 “Laboratory Equipment”,值為 “USA”。 - 傳送記錄:生產者根據鍵 “Laboratory Equipment” 的雜湊值決定分割槽,並將記錄傳送到相應的分割槽。