Kafka 的序列化和反序列化機制確保了不同格式資料在叢集內的有效傳輸和處理。常見的序列化方式包括 JSON 和 Avro,開發者也能根據需求自定義序列化邏輯。生產者利用序列化將資料轉換為位元組流,而消費者則利用反序列化將位元組流還原為原始資料。Avro 序列化方案在 Schema 演進方面表現出色,適用於需要資料結構調整的場景。理解 Kafka 的 Topic 管理對於叢集的穩定運作至關重要,這包含了 Topic 的建立、分割槽與副本設定,以及後續的管理和組態調整。Kafka 的資料複製機制則保障了資料的高用性和永續性,透過設定副本數量和使用重新分配工具,可以有效提升資料的安全性。

序列化與反序列化在 Kafka 中的應用

在 Kafka 生態系統中,序列化(Serialization)與反序列化(Deserialization)扮演著至關重要的角色。它們負責將資料轉換成可傳輸的格式,以及將接收到的資料還原成原始格式。正確地實作序列化和反序列化對於確保資料的正確傳輸和處理至關重要。

為什麼需要序列化和反序列化?

Kafka 的設計使其能夠處理多種資料格式。然而,為了在網路上傳輸資料或將其儲存於磁碟上,資料必須被轉換成位元組流的形式。序列化過程正是將物件轉換成位元組流,而反序列化則是將位元組流還原成物件的過程。

常見的序列化與反序列化技術

  1. JSON 序列化與反序列化:JSON(JavaScript Object Notation)是一種輕量級的資料交換格式,易於人類閱讀和編寫,也易於機器解析和生成。在 Kafka 中,可以使用 JSON 格式進行資料的序列化和反序列化。

  2. Avro 序列化與反序列化:Apache Avro 是一種資料序列化系統,它提供了豐富的資料結構型別、緊湊的二進位制資料格式、以及一個用於儲存資料的容器檔案格式。Avro 在 Kafka 中被廣泛使用,特別是在需要 Schema 演化的場景中。

  3. 自定義序列化與反序列化:除了使用現有的序列化格式外,開發者也可以根據具體需求實作自定義的序列化與反序列化邏輯。這種方式提供了最大的靈活性,但也需要更多的開發和維護工作。

在 Kafka 中實作序列化和反序列化

生產者端的序列化

在 Kafka 生產者端,需要將要傳送的資料序列化成位元組流。可以使用現有的序列化函式庫,如 JSON 或 Avro,也可以實作自定義的序列化邏輯。

// 使用 Avro 序列化範例
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;

public class AvroSerializer {
    public byte[] serialize(GenericRecord record) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(record, encoder);
        encoder.flush();
        return out.toByteArray();
    }
}

消費者端的反序列化

在 Kafka 消費者端,需要將接收到的位元組流反序列化成原始的資料格式。同樣,可以使用現有的反序列化函式庫,或實作自定義的反序列化邏輯。

// 使用 Avro 反序列化範例
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;

public class AvroDeserializer {
    public GenericRecord deserialize(byte[] data, Schema schema) throws IOException {
        ByteArrayInputStream in = new ByteArrayInputStream(data);
        DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);
        return reader.read(null, decoder);
    }
}

內容解密:

上述程式碼範例展示瞭如何使用 Avro 進行序列化和反序列化。首先,在序列化過程中,我們建立了一個 GenericDatumWriter 例項,並使用 EncoderFactory 取得一個二進位制編碼器,將 GenericRecord 物件寫入到 ByteArrayOutputStream 中,最終得到位元組陣列。在反序列化過程中,我們使用 GenericDatumReaderDecoderFactory 取得的二進位制解碼器,將位元組陣列還原成 GenericRecord 物件。這兩個過程都需要依賴於 Avro 的 Schema 定義,以確保資料的一致性和正確性。

Apache Kafka 技術深度解析

主題管理與操作

在 Apache Kafka 中,Topic(主題)是資料儲存和管理的核心單位。正確地建立和管理 Topic 是確保 Kafka 叢集穩定運作的關鍵。

建立新的 Topic

使用 kafka-topics.sh 工具可以建立新的 Topic。在建立 Topic 時,需要考慮以下因素:

  • Topic 名稱:應遵循一定的命名規範,以方便管理和識別。
  • 分割槽數量:決定了 Topic 的平行處理能力。
  • 副本數量:決定了資料的冗餘度和可用性。
kafka-topics.sh --create --bootstrap-server <kafka-broker>:9092 --replication-factor 3 --partitions 4 my-topic

內容解密:

  • --create:表示建立新的 Topic。
  • --bootstrap-server:指定 Kafka Broker 的連線地址。
  • --replication-factor:設定 Topic 的副本數量。
  • --partitions:設定 Topic 的分割槽數量。
  • my-topic:要建立的 Topic 名稱。

管理 Topic

可以使用 kafka-topics.sh 工具來管理現有的 Topic,例如列出所有 Topic、描述 Topic 詳情、修改 Topic 組態等。

# 列出所有 Topic
kafka-topics.sh --list --bootstrap-server <kafka-broker>:9092

# 描述 Topic 詳情
kafka-topics.sh --describe --bootstrap-server <kafka-broker>:9092 --topic my-topic

內容解密:

  • --list:列出所有的 Topic。
  • --describe:描述指定 Topic 的詳情,包括分割槽資訊、副本分佈等。

資料複製與高用性

Kafka 透過資料複製機制來確保資料的高用性和永續性。每個分割槽可以有多個副本,分佈在不同的 Broker 上。

設定副本數量

在建立 Topic 時,可以透過 --replication-factor 引數來設定副本數量。

kafka-topics.sh --create --bootstrap-server <kafka-broker>:9092 --replication-factor 3 --partitions 4 my-topic

內容解密:

  • --replication-factor 3:表示每個分割槽有 3 個副本。

調整副本數量

如果需要調整現有 Topic 的副本數量,可以使用 kafka-reassign-partitions.sh 工具。

kafka-reassign-partitions.sh --bootstrap-server <kafka-broker>:9092 --reassignment-json-file reassign.json --execute

內容解密:

  • --reassignment-json-file:指定包含重新分配計劃的 JSON 檔案。
  • --execute:執行重新分配操作。

交易機制

Kafka 的交易機制確保了在生產者和消費者之間資料傳輸的完整性和一致性。

使用交易

要使用 Kafka 的交易功能,需要在生產者端進行相應的組態。

transactional.id=my-transactional-id

內容解密:

  • transactional.id:為生產者設定一個唯一的 Transactional ID,用於標識交易。

時間與視窗操作

在流處理應用中,時間是一個非常重要的概念。Kafka Streams 提供了多種時間相關的操作,包括事件時間、處理時間和日誌附加時間。

時間提取器

可以透過實作 TimestampExtractor 介面來自定義時間提取邏輯。

public class MyTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        // 自定義時間提取邏輯
    }
}

內容解密:

  • extract 方法:根據 ConsumerRecord 和分割槽時間提取時間戳記。

Kafka 技術

組態 TLS

為 Kafka 組態 TLS 是確保叢集安全性的重要步驟。TLS(Transport Layer Security)是一種加密通訊協定,用於保護資料在傳輸過程中的安全性。

為 Brokers 組態 TLS

  1. 生成憑證: 首先,需要為 Kafka brokers 生成 SSL/TLS 憑證。可以使用 OpenSSL 等工具生成自簽名憑證或從受信任的憑證授權單位(CA)取得憑證。

  2. 組態 Broker: 在 server.properties 檔案中,設定以下屬性:

    • listeners=SSL://localhost:9093
    • ssl.keystore.location=/path/to/kafka.server.keystore.jks
    • ssl.keystore.password=keystore_password
    • ssl.key.password=key_password

為 ZooKeeper 組態 SSL

  1. 生成憑證: 為 ZooKeeper 生成 SSL/TLS 憑證。

  2. 組態 ZooKeeper: 在 ZooKeeper 的組態檔案中,啟用 SSL/TLS 並指定相關的 keystore 和 truststore 檔案。

為客戶端生成憑證

客戶端需要使用 broker 的自簽名 CA 來生成自己的憑證,以建立信任的 SSL/TLS 連線。

時間視窗

時間視窗(Time Windows)是 Kafka Streams 中的一個重要概念,用於處理時間相關的資料流。

滾動視窗(Tumbling Window)

滾動視窗是一種特殊的時間視窗,它將資料流分成不重疊的時間段。

交易(Transactions)

Kafka 的交易功能確保了訊息的完整性和一致性。

如何使用 Kafka Idempotent Producer

Kafka 的冪等生產者(Idempotent Producer)可以確保訊息不會被重複寫入。

交易如何運作

Kafka 的交易功能根據兩階段提交(Two-Phase Commit)協定,確保了訊息的原子性。

不乾淨的 Leader 選舉

不乾淨的 Leader 選舉(Unclean Leader Election)可能會導致資料丟失或不一致。

測試不乾淨的 Leader 選舉

可以透過模擬故障來測試不乾淨的 Leader 選舉的行為。

Under-Replicated 分割槽

Under-Replicated 分割槽是指某些分割槽的副本沒有跟上 Leader 的進度。

警示陷阱

需要設定警示機制來檢測 Under-Replicated 分割槽,以避免潛在的問題。

使用者活動追蹤

Kafka 可以用於追蹤使用者活動,例如點選流分析等。

ZooKeeper

ZooKeeper 是 Kafka 的一個重要元件,用於管理叢集的元資料和提供一致性保證。

安全考慮

需要對 ZooKeeper 進行安全組態,例如使用 SSL/TLS 加密和授權機制。

作者簡介

本文的作者包括 Gwen Shapira、Todd Palino、Rajini Sivaram 和 Krit Petty,他們都是 Kafka 社群的專家和貢獻者。

Gwen Shapira

Gwen 是 Confluent 的工程長官者,負責長官雲原生 Kafka 團隊。

Todd Palino

Todd 是 LinkedIn 的首席工程師,負責 Kafka 和 ZooKeeper 的架構和維運。

Rajini Sivaram

Rajini 是 Confluent 的首席工程師,負責設計和開發 Kafka 的跨叢集複製功能。

Krit Petty

Krit 是 LinkedIn 的 Kafka 網站可靠性工程經理,負責 Kafka 叢集的擴充套件和最佳化。

封面介紹:藍翅笑翠鳥

書籍《Kafka: The Definitive Guide》的封面動物是藍翅笑翠鳥(學名:Dacelo leachii),屬於翡翠鳥科(Alcedinidae)。這種鳥主要分佈於新幾內亞南部和澳洲北部的非乾旱地區,被歸類別為河翠鳥類別。

外觀特徵

雄性藍翅笑翠鳥具有鮮明的色彩特徵。其下翼和尾羽呈現藍色,因而得名。然而,雌性個體的尾部則呈現紅棕色,並帶有黑色條紋。兩性個體的腹部均為奶油色,並帶有棕色條紋,眼睛虹膜為白色。成年藍翅笑翠鳥的體型相對於其他翠鳥類別較小,體長僅約15至17英寸(約38至43公分),平均體重約260至330克。

食性與獵物

藍翅笑翠鳥的飲食以肉食為主,其獵物種類別會隨著季節變化而有所不同。例如,在夏季,由於蜥蜴、昆蟲和青蛙數量較多,這些成為其主要食物來源。而在較乾燥的月份,淡水螯蝦、魚類別、齧齒動物,甚至小型鳥類別都會成為其獵物。不過,藍翅笑翠鳥並非唯一捕食物件,紅色鷹鵰和赤梟等掠食者也會將其納入獵食範圍。

繁殖習性

藍翅笑翠鳥的繁殖季節主要在九月至十二月之間。牠們會在高樹的樹洞中築巢。養育後代是群體合作的過程,除了父母外,至少會有一個輔助鳥協助育雛。每次產卵約三到四枚,孵化期約26天。雛鳥在孵化後約36天會開始飛行,但前提是牠們能夠在高度競爭和攻擊性的雛鳥期存活下來。在生命的第一週,較大的雛鳥有時會殺死較小的兄弟姐妹。那些倖存下來的雛鳥會接受父母約6至10週的狩獵訓練,才會開始獨立生活。

保護現狀與相關資訊

封面設計資訊

封面插圖由Karen Montgomery根據《English Cyclopaedia》中的黑白雕版畫創作。封面字型採用Gilroy Semibold和Guardian Sans。正文字型為Adobe Minion Pro;標題字型是Adobe Myriad Condensed;程式碼字型則是Dalton Maag的Ubuntu Mono。