Kafka 的設定管理機制具備多層級的彈性,允許開發者針對主題、客戶端和代理伺服器進行細粒度的組態。透過覆寫預設設定,可以針對特定應用場景最佳化效能和可靠性。理解不同層級的設定選項以及如何使用命令列工具進行管理,對於構建和維護穩定的 Kafka 應用至關重要。主題層級的設定控制訊息格式、日誌保留和分割區複製等行為,而客戶端層級的設定則管理生產者和消費者的速率配額。代理伺服器設定則影響整個叢集的運作,例如最小同步複本數和最大連線數等。

Kafka 設定覆寫與管理

Kafka 提供了多層級的設定管理,包括主題(Topic)、客戶端(Client)以及代理伺服器(Broker)等不同層面的設定。這些設定可以透過不同的方式進行覆寫和管理,以滿足不同使用場景的需求。

主題層級設定

在 Kafka 中,主題層級的設定用於控制特定主題的行為。以下是一些常見的主題層級設定:

訊息處理相關設定

  • message.format.version:指定代理伺服器寫入磁碟時的訊息格式版本,必須是有效的 API 版本號。
  • message.timestamp.difference.max.ms:允許訊息時間戳與代理伺服器接收時間戳之間的最大差異(毫秒)。僅在 message.timestamp.type 設定為 CreateTime 時有效。
  • message.timestamp.type:決定寫入磁碟的訊息時間戳型別。目前支援 CreateTime(客戶端指定的時間戳)和 LogAppendTime(代理伺服器寫入分割區的時間)。

日誌壓縮與保留相關設定

  • min.cleanable.dirty.ratio:日誌壓縮器嘗試壓縮分割區的頻率,以未壓縮日誌段與總日誌段的比例表示。僅適用於日誌壓縮主題。
  • min.compaction.lag.ms:訊息在日誌中保持未壓縮的最小時間。
  • retention.bytes:為該主題保留的訊息量(位元組)。
  • retention.ms:為該主題保留訊息的時間(毫秒)。

分割區與複製相關設定

  • min.insync.replicas:分割區被視為可用的最小同步複本數。
  • unclean.leader.election.enable:是否允許不乾淨的長官者選舉。如果設為 false,將不允許不乾淨的長官者選舉,以避免資料遺失。

日誌段相關設定

  • segment.bytes:寫入單個日誌段的訊息量(位元組)。
  • segment.index.bytes:單個日誌段索引的最大大小(位元組)。
  • segment.ms:每個分割區的日誌段輪替頻率(毫秒)。
  • segment.jitter.ms:在輪替日誌段時新增到 segment.ms 的隨機最大毫秒數。

設定範例與解密

# 設定範例
message.format.version=3.0
message.timestamp.type=CreateTime
min.insync.replicas=2

內容解密:

  1. message.format.version=3.0:此設定指定了 Kafka 代理伺服器寫入磁碟時的訊息格式版本為 3.0。這確保了訊息儲存格式的一致性,並且與客戶端的 API 版本相容。
  2. message.timestamp.type=CreateTime:此設定表示寫入磁碟的訊息時間戳是客戶端指定的時間。這對於需要精確時間戳的應用程式非常重要。
  3. min.insync.replicas=2:此設定要求每個分割區至少有兩個同步複本。這提高了資料的可靠性和可用性,因為即使一個複本失敗,資料仍然可以被存取。

客戶端與使用者設定覆寫

對於 Kafka 客戶端和使用者,可以覆寫一些預設設定,主要涉及各種配額型別。常見的可覆寫設定包括生產者和消費者的位元組速率配額。

客戶端相關設定

  • consumer_bytes_rate:單個客戶端 ID 在一秒內從單個代理伺服器允許消費的訊息量(位元組)。
  • producer_bytes_rate:單個客戶端 ID 在一秒內向單個代理伺服器允許生產的訊息量(位元組)。
  • controller_mutations_rate:建立主題請求、建立分割區請求和刪除主題請求的變異速率。該速率由建立或刪除的分割區數量累積。

設定範例與解密

# kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config "controller_mutations_rate=10" --entity-type clients --entity-name <client ID> --entity-type users --entity-name <user ID>

內容解密:

  1. 指令解說:此指令用於修改客戶端和使用者的設定,具體來說是修改 controller_mutations_rate 為 10。這限制了客戶端和使用者對 Kafka 叢集進行變異操作的速率,避免過載叢集。
  2. 引數說明
    • --bootstrap-server localhost:9092:指定 Kafka 叢集的引導伺服器地址。
    • --alter:表示要修改現有的設定。
    • --add-config "controller_mutations_rate=10":新增或修改 controller_mutations_rate 設定為 10。
    • --entity-type clients --entity-name <client ID>:指定要修改設定的客戶端 ID。
    • --entity-type users --entity-name <user ID>:指定要修改設定的使用者 ID。

代理伺服器設定覆寫

Kafka 代理伺服器的許多設定可以在執行時動態修改,無需重新佈署 Kafka 叢集。以下是一些重要的代理伺服器設定:

  • min.insync.replicas:調整生產者設定為 acks=all(或 -1)時,成功寫入請求所需的最小同步複本數。
  • unclean.leader.election.enable:允許複本被選為長官者,即使這可能導致資料遺失。在某些情況下,這可以幫助解除 Kafka 叢集的阻塞狀態。
  • max.connections:允許連線到代理伺服器的最大連線數。此外,還可以使用 max.connections.per.ipmax.connections.per.ip.overrides 進行更精細的流量控制。

設定範例與解密

# 設定範例
min.insync.replicas=2
unclean.leader.election.enable=false
max.connections=1000

內容解密:

  1. min.insync.replicas=2:此設定確保生產者傳送訊息時,至少有兩個同步複本確認收到訊息後,才認為寫入成功。這提高了資料的一致性和可靠性。
  2. unclean.leader.election.enable=false:此設定禁止不乾淨的長官者選舉,避免因為選舉不同步的複本作為長官者而導致資料遺失。
  3. max.connections=1000:此設定限制了代理伺服器允許的最大連線數為 1000,防止過多的連線請求耗盡資源。

Kafka 工具使用:組態管理與訊息生產及消費

組態管理

Kafka 提供了一個名為 kafka-configs.sh 的工具,用於管理和檢視 Kafka 叢集中的各種組態。這個工具可以用來檢查特定主題(topic)、代理(broker)或客戶端(client)的組態。

檢視組態覆寫

使用 --describe 命令可以列出所有組態覆寫。例如,檢視名為 “my-topic” 的主題的組態覆寫:

# kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name my-topic
Configs for topics:my-topic are retention.ms=3600000

這個命令只會顯示被覆寫的組態,不會顯示叢集的預設組態。

移除組態覆寫

可以使用 --alter 命令和 --delete-config 引數來刪除組態覆寫,使其還原為叢集預設值。例如,刪除 “my-topic” 主題的 retention.ms 組態覆寫:

# kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
Updated config for topic: "my-topic".

訊息生產與消費

在與 Kafka 互動時,經常需要手動生產或消費一些範例訊息,以驗證應用程式的行為。Kafka 提供了兩個工具:kafka-console-consumer.shkafka-console-producer.sh,用於與 Kafka 主題互動。

控制檯生產者

kafka-console-producer.sh 工具可以用於向 Kafka 主題寫入訊息。預設情況下,每行讀取一條訊息,使用 tab 字元分隔鍵和值(如果沒有 tab 字元,則鍵為 null)。例如,向名為 “my-topic” 的主題生產四條訊息:

# kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
>Message 1
>Test Message 2
>Test Message 3
>Message 4
>^D

可以使用 --producer-property 選項來傳遞生產者組態選項,例如:

# kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --producer-property linger.ms=1000

或者使用 --producer.config 選項指定一個包含組態選項的檔案。

生產者組態選項

控制檯生產者有多個命令列引數可用於調整其行為,例如:

  • --batch-size:指定同步傳送訊息時的批次大小。
  • --timeout:在非同步模式下,指定等待批次大小的最大時間。
  • --compression-codec:指定生產訊息時使用的壓縮型別。
  • --sync:同步生產訊息,等待每條訊息被確認後再傳送下一條。
行讀取器選項

kafka.tools.ConsoleProducer$LineMessageReader 類別負責讀取標準輸入並建立生產者記錄。它有多個有用的選項,可以使用 --property 命令列選項傳遞給控制檯生產者,例如:

  • ignore.error:設定為 false 時,如果 parse.key 為 true 且沒有鍵分隔符,則會丟擲異常。預設為 true。
  • parse.key:設定為 false 時,總是將鍵設定為 null。預設為 true。
  • key.separator:指定讀取時訊息鍵和訊息值之間的分隔字元。預設為 tab 字元。

自訂行讀取行為

你可以提供自己的類別給 Kafka,以自訂讀取行的方法。該類別必須擴充套件 kafka.common.MessageReader,並負責建立 ProducerRecord。使用 --line-reader 選項在命令列上指定你的類別,並確保包含該類別的 JAR 在類別路徑中。

注意事項

在使用控制檯生產者和消費者時,應避免編寫包裝這些工具的應用程式,因為這類別應用程式很脆弱,容易丟失訊息。建議直接使用 Java 使用者端程式函式庫或其他語言的第三方使用者端程式函式庫,它們直接使用 Kafka 協定。

Kafka 命令列工具使用

Kafka 提供了一系列命令列工具來幫助管理員和開發者與 Kafka 叢集互動。其中,kafka-console-consumer.sh 是一個非常有用的工具,用於從 Kafka 叢集中的一個或多個主題消費訊息。

使用 Kafka Console Consumer

kafka-console-consumer.sh 允許使用者從指定的主題消費訊息,並將訊息列印到標準輸出中,預設情況下,訊息以原始位元組的形式輸出,不包含鍵(key),且無任何格式化。

基本使用選項

要開始使用 kafka-console-consumer.sh,需要提供連線到 Kafka 叢集的字串、要消費的主題名稱,以及想要消費的訊息時間範圍。

  • --bootstrap-server:指定連線到 Kafka 叢集的伺服器地址。
  • --topic:指定要消費的單一主題。
  • --whitelist:指定一個正規表示式,用於比對所有要消費的主題。

版本相容性

確保使用的 kafka-console-consumer.sh 版本與 Kafka 叢集版本相符是非常重要的。舊版本的 console consumer 可能會因為與叢集或 ZooKeeper 的互動方式不正確而損壞叢集。

使用範例

以下是一個從比對字首 “my” 的所有主題(在此例中只有 “my-topic”)消費訊息的範例:

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'my.*' --from-beginning
Message 1
Test Message 2
Test Message 3
Message 4
^C

自訂消費者組態

除了基本的命令列選項外,還可以透過兩種方式將正常的消費者組態選項傳遞給 console consumer:

  1. 使用 --consumer.config <config-file> 指定一個包含組態選項的檔案。
  2. 使用 --consumer-property <key>=<value> 在命令列上直接指定組態選項。

其他常用選項

  • --formatter <classname>:指定用於解碼訊息的訊息格式化器類別。
  • --from-beginning:從最舊的偏移量開始消費訊息。
  • --max-messages <int>:在離開前消費的最大訊息數量。
  • --partition <int>:只從指定的分割區消費訊息。
  • --offset:指定要消費的偏移量 ID,或使用 “earliest” 或 “latest” 指定消費起始點。
  • --skip-message-on-error:在處理錯誤時跳過訊息,而不是停止。

訊息格式化器選項

除了預設的格式化器外,還有其他三種可用的格式化器:

  • kafka.tools.LoggingMessageFormatter:使用記錄器輸出訊息,而不是標準輸出。
  • kafka.tools.ChecksumMessageFormatter:只列印訊息的校驗和。
  • kafka.tools.NoOpMessageFormatter:消費訊息但不輸出。

使用範例

# kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist 'my.*' --from-beginning --formatter kafka.tools.ChecksumMessageFormatter
checksum:0
checksum:0
checksum:0
checksum:0

DefaultMessageFormatter 的屬性

kafka.tools.DefaultMessageFormatter 支援多個屬性,可以透過 --property 命令列選項傳遞,如下表所示:

屬性描述
print.timestamp設定為 true 以顯示每條訊息的時間戳(如果可用)。
print.key設定為 true 以顯示訊息鍵以及值。
print.offset設定為 true 以顯示訊息偏移量以及值。
print.partition設定為 true 以顯示訊息被消費的分割區。
key.separator指定在列印時用於分隔訊息鍵和訊息值的分隔字元。
line.separator指定用於分隔訊息的分隔字元。
key.deserializer提供一個類別名稱,用於在列印前反序列化訊息鍵。
value.deserializer提供一個類別名稱,用於在列印前反序列化訊息值。

這些反序列化器類別必須實作 org.apache.kafka.common.serialization.Deserializer,並且 console consumer 將呼叫其 toString 方法來取得要顯示的輸出。通常,您會將這些反序列化器實作為 Java 類別,並透過設定 CLASSPATH 環境變數將其插入到 console consumer 的類別路徑中。

內容解密:

此段落主要講解了 Kafka 中 kafka-console-consumer.sh 工具的使用方法,包括基本使用選項、自訂消費者組態、其他常用選項、以及不同的訊息格式化器。其中詳細介紹瞭如何使用不同的格式化器以及如何透過屬性自訂預設格式化器的行為。這些內容對於管理和除錯 Kafka 叢集非常有幫助。

Kafka 叢集管理與分割槽操作

在使用 Kafka 的過程中,管理和監控消費者群組的偏移量(offsets)以及分割槽(partitions)的狀態是非常重要的。本文將介紹如何使用 Kafka 提供的工具來檢查消費者偏移量、進行分割槽管理以及如何手動變更分割槽的副本分配。

檢查消費者偏移量

為了檢查特定的消費者群組是否正在提交偏移量,或者偏移量提交的頻率,可以使用 Kafka 提供的 kafka-console-consumer.sh 工具來消費內部主題 __consumer_offsets。這個主題儲存了所有消費者群組的偏移量資訊。

要正確解碼這個主題中的訊息,需要使用特定的格式化類別 kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter。以下是一個範例命令,用於從 __consumer_offsets 主題中消費最早的一條訊息:

# kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic __consumer_offsets --from-beginning --max-messages 1 \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--consumer-property exclude.internal.topics=false

內容解密:

  • --bootstrap-server localhost:9092 指定了 Kafka 叢集的連線地址。
  • --topic __consumer_offsets 指定了要消費的主題名稱。
  • --from-beginning 表示從最早的訊息開始消費。
  • --max-messages 1 表示只消費一條訊息。
  • --formatter 指定了解碼訊息的格式化類別。
  • --consumer-property exclude.internal.topics=false 允許消費內部主題。

分割槽管理

Kafka 提供了一些工具來幫助管理分割槽,包括重新選舉長官副本(leader replicas)和手動分配分割槽給代理(brokers)。

長官副本選舉

在 Kafka 中,分割槽有多個副本以確保可靠性,但只有一個副本可以作為長官副本處理所有的生產和消費請求。為了保持叢集的負載平衡,Kafka 提供了自動長官副本平衡的功能。如果這個功能未被啟用,可以使用 kafka-leader-election.sh 工具手動觸發長官副本選舉。

以下是一個範例命令,用於在整個叢集中為所有主題啟動首選長官副本選舉:

# kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type PREFERRED --all-topic-partitions

內容解密:

  • --bootstrap-server localhost:9092 指定了 Kafka 叢集的連線地址。
  • --election-type PREFERRED 指定了選舉型別為首選長官副本。
  • --all-topic-partitions 表示對所有主題的所有分割槽進行選舉。

變更分割槽的副本

有時候需要手動變更分割槽的副本分配,例如當叢集新增或移除代理時,或者需要調整某個主題的複製因子。可以使用 kafka-reassign-partitions.sh 工具來完成這個任務。

首先,需要建立一個 JSON 檔案,列出要變更的分割槽和主題。然後,使用這個 JSON 檔案生成變更建議。最後,執行變更並驗證變更結果。

假設有一個四個代理的 Kafka 叢集,現在新增了兩個代理,想要將某些主題的分割槽移動到新的代理上。

首先,建立一個 JSON 檔案 topics.json,內容如下:

{
  "topics": [
    {
      "topic": "foo1"
    },
    {
      "topic": "foo2"
    }
  ],
  "version": 1
}

內容解密:

  • JSON 檔案中列出了要變更的分割槽所屬的主題名稱。
  • version 欄位目前始終為 1。

接下來,使用這個 JSON 檔案生成變更建議,並執行變更。具體命令和步驟請參考 Kafka 的官方檔案。