Kafka 叢集管理涉及多個導向,從主題的建立、修改到消費者群組的管理和動態組態調整,都需要管理員掌握相關工具和技術。有效地管理 Kafka 叢集對於確保系統的穩定性、可擴充套件性和效能至關重要。本文將探討如何使用 Kafka 提供的命令列工具,例如 kafka-topics.shkafka-consumer-groups.shkafka-configs.sh,來執行各種管理任務,並提供一些 Shell 指令碼範例,以協助管理員自動化執行這些任務,提升管理效率。瞭解這些工具的使用方法和注意事項,能有效地管理 Kafka 叢集,確保系統的穩定執行。

管理 Kafka 叢集

管理 Kafka 叢集需要額外的工具來對主題、組態等進行管理變更。Kafka 提供了多個命令列介面(CLI)工具,用於對叢集進行管理操作。這些工具以 Java 類別實作,並提供了一組指令碼以正確呼叫這些類別。雖然這些工具提供了基本功能,但在更複雜的操作中可能會顯得不足,或是在較大規模的使用中變得難以操作。本章節將介紹 Apache Kafka 開放原始碼專案中提供的基礎工具。有關社群開發的進階工具的更多資訊,請參閱 Apache Kafka 網站。

授權管理操作

雖然 Apache Kafka 實作了身份驗證和授權來控制主題操作,但預設組態並未限制這些工具的使用。這意味著這些 CLI 工具可以在無需任何身份驗證的情況下使用,從而允許在沒有安全檢查或稽核的情況下執行主題變更等操作。請務必確保在您的佈署中僅限管理員使用這些工具,以防止未經授權的變更。

主題操作

kafka-topics.sh 工具提供了對大多數主題操作的便捷存取。它允許您建立、修改、刪除和列出叢集中的主題資訊。雖然有些主題組態可以透過此命令進行,但它們已被棄用,建議使用更穩健的 kafka-config.sh 工具進行組態變更。要使用 kafka-topics.sh 命令,您必須透過 --bootstrap-server 選項提供叢集連線字串和埠。在下面的範例中,叢集連線字串是在 Kafka 叢集中的其中一個主機上本地執行的,我們將使用 localhost:9092

檢查版本

許多 Kafka 的命令列工具依賴於執行的 Kafka 版本來正確操作。這包括一些命令可能會將資料儲存在 ZooKeeper 中,而不是連線到 broker 本身。因此,確保您使用的工具版本與叢集中的 broker 版本相符非常重要。最安全的方法是在 Kafka broker 本身執行工具,使用佈署的版本。

建立新主題

使用 --create 命令建立新主題時,需要提供多個必要引數來在叢集中建立新主題。即使某些引數可能已經組態了 broker 級別的預設值,也必須在使用此命令時提供。額外的引數和組態覆寫也是可能的,使用 --config 選項,但將在本章後面介紹。以下是三個必要引數的清單:

  • --topic:您要建立的主題名稱。
  • --replication-factor:在叢集中維護的主題副本數量。
  • --partitions:要為主題建立的分割槽數量。

良好的主題命名慣例

主題名稱可以包含字母數字字元、下劃線、破折號和句點;但是,不建議在主題名稱中使用句點。Kafka 內部的指標會將句點字元轉換為下劃線字元(例如,“topic.1” 在指標計算中變為 “topic_1”),這可能會導致主題名稱衝突。

另一個建議是避免使用雙下劃線開頭命名主題。按照慣例,Kafka 內部操作的主題是以雙下劃線命名慣例建立的(例如 __consumer_offsets 主題,用於追蹤消費者群組偏移儲存)。因此,不建議使用雙下劃線命名慣例開頭的主題名稱,以避免混淆。

建立新主題很簡單。執行 kafka-topics.sh 如下:

# kafka-topics.sh --bootstrap-server <connection-string>:<port> --create --topic <string> --replication-factor <integer> --partitions <integer>

該命令將導致叢集建立具有指定名稱和分割槽數量主題。對於每個分割槽,叢集將適當地選擇指定數量的副本。這意味著,如果叢集設定為機架感知副本分配,則每個分割槽的副本將位於不同的機架中。如果不希望進行機架感知分配,請指定 --disable-rack-aware 命令列引數。

例如,建立一個名為 “my-topic” 的主題,具有八個分割槽,每個分割槽有兩個副本:

# kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --replication-factor 2 --partitions 8
Created topic "my-topic".

使用 –if-exists 和 –if-not-exists 引數

在自動化中使用 kafka-topics.sh 時,您可能希望在建立新主題時使用 --if-not-exists 引數,以避免在主題已經存在時傳回錯誤。

雖然為 --alter 命令提供了 --if-exists 引數,但不建議使用它。使用此引數將導致命令在正在變更的主題不存在時不傳回錯誤。這可能會遮蔽應該建立但不存在的主題所產生的問題。

列出叢集中的所有主題

--list 命令列出叢集中的所有主題。清單以每行一個主題的格式顯示,無特定順序,這對於產生完整的主題清單非常有用。

以下是 --list 選項列出叢集中所有主題的範例:

# kafka-topics.sh --bootstrap-server localhost:9092 --list
my-topic
__consumer_offsets
another-topic

內容解密:

  • 本段落主要介紹瞭如何使用 kafka-topics.sh 工具來管理 Kafka 主題,包括建立、列出和修改主題。
  • 重點說明瞭使用 --create--list 和其他引數來執行不同操作的方法。
  • 同時強調了正確組態和安全性的重要性,例如限制對管理工具的存取。

Kafka 主題操作流程圖示

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Kafka叢集管理工具實務操作

package "Linux Shell 操作" {
    package "檔案操作" {
        component [ls/cd/pwd] as nav
        component [cp/mv/rm] as file
        component [chmod/chown] as perm
    }

    package "文字處理" {
        component [grep] as grep
        component [sed] as sed
        component [awk] as awk
        component [cut/sort/uniq] as text
    }

    package "系統管理" {
        component [ps/top/htop] as process
        component [systemctl] as service
        component [cron] as cron
    }

    package "管線與重導向" {
        component [| 管線] as pipe
        component [> >> 輸出] as redirect
        component [$() 命令替換] as subst
    }
}

nav --> file : 檔案管理
file --> perm : 權限設定
grep --> sed : 過濾處理
sed --> awk : 欄位處理
pipe --> redirect : 串接命令
process --> service : 服務管理

note right of pipe
  命令1 | 命令2
  前者輸出作為後者輸入
end note

@enduml

此圖示展示了使用 kafka-topics.sh 工具進行不同操作的流程,包括建立新主題和列出現有主題。

詳細解說:

  1. 使用者啟動 kafka-topics.sh 工具。
  2. 指定所需的操作,例如建立新主題或列出現有主題。
  3. 若選擇建立新主題,則需要提供必要的引數,如主題名稱、複本因子和分割槽數量。
  4. 若選擇列出主題,則直接執行 --list 命令以顯示所有現有主題。
  5. 在建立新主題時,系統會檢查提供的引數是否正確。若正確,則成功建立新主題;若不正確,則傳回錯誤訊息。

Kafka 主題管理與操作詳解

Kafka 的主題(Topic)管理是確保系統高效運作的關鍵部分。透過 kafka-topics.sh 工具,管理員可以列出、描述、修改甚至增加主題的分割區,以滿足不斷變化的業務需求。

列出所有主題

使用 --list 引數可以列出 Kafka 叢集中的所有主題。例如:

# kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
my-topic
other-topic

這裡會列出內部主題 __consumer_offsets,如果需要排除內部主題,可以使用 --exclude-internal 引數。

內容解密:

  • --list 用於列出所有主題。
  • --bootstrap-server 指定 Kafka 叢集的連線地址。
  • --exclude-internal 用於排除內部主題。

描述主題詳情

使用 --describe 引數可以取得某個或多個主題的詳細資訊,包括分割區數量、組態覆寫和每個分割區的副本分配情況。

例如,描述名為 my-topic 的主題:

# kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
Topic: my-topic PartitionCount: 8 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
...

輸出結果包括主題的分割區數量、複製因子、組態引數以及每個分割區的 Leader 和副本資訊。

內容解密:

  • --describe 用於取得主題的詳細資訊。
  • PartitionCountReplicationFactor 分別表示分割區數量和複製因子。
  • Configs 列出主題的組態引數。
  • 每個分割區的 LeaderReplicasIsr(In-Sync Replicas)狀態。

篩選主題資訊

--describe 命令結合其他引數可以篩選出特定的主題或分割區,例如:

  • --topics-with-overrides:僅顯示具有與叢集預設值不同的組態的主題。
  • --under-replicated-partitions:顯示副本不同步的分割區。
  • --at-min-isr-partitions:顯示 ISR 數量達到最小值的分割區。
  • --under-min-isr-partitions:顯示 ISR 數量低於最小值的分割區。
  • --unavailable-partitions:顯示沒有 Leader 的分割區。

例如,查詢 ISR 數量達到最小值的分割區:

# kafka-topics.sh --bootstrap-server localhost:9092 --describe --at-min-isr-partitions
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0
...

內容解密:

  • 各個篩選引數用於診斷叢集問題。
  • --at-min-isr-partitions 用於檢查處於最小 ISR 狀態的分割區。

增加分割區數量

隨著業務增長,可能需要增加主題的分割區數量以擴充套件處理能力。使用 --alter 命令可以實作這一點。

例如,將 my-topic 的分割區數量增加到 16:

# kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 16
# kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
Topic: my-topic PartitionCount: 16 ReplicationFactor: 2 Configs: segment.bytes=1073741824
...

內容解密:

  • --alter 命令用於修改主題組態。
  • --partitions 用於指定新的分割區數量。
  • 增加分割區後,使用 --describe 命令驗證變更結果。

Kafka 主題管理與消費者群組操作

Kafka 是一種高效能的分散式訊息佇列系統,廣泛應用於大資料處理和實時資料流處理。在 Kafka 中,Topic(主題)是用於組織和分類別訊息的關鍵概念,而 Consumer Group(消費者群組)則是實作訊息消費的協調和管理的重要機制。

主題管理

新增與調整分割槽

在 Kafka 中,主題的分割槽(Partition)數量決定了其可擴充套件性和平行處理能力。新增分割槽可以透過 kafka-topics.sh 工具進行操作。然而,對於帶有鍵值(Keyed)的訊息主題,增加分割槽會改變鍵值到分割槽的對映關係,從而對消費者造成影響。因此,對於帶有鍵值的訊息主題,建議在建立時就確定好分割槽數量,以避免後續調整帶來的問題。

減少分割槽

目前,Kafka 不支援直接減少主題的分割槽數量。刪除分割槽將導致資料丟失和客戶端視角的不一致性。如果需要減少分割槽,建議刪除原主題並重新建立,或者建立一個新的主題並將生產流量切換到新主題。

刪除主題

刪除主題可以釋放叢集資源,包括磁碟空間、檔案控制程式碼和記憶體等。執行刪除操作前,需要確保 delete.topic.enable 組態項為 true。刪除操作是非同步的,可能需要一段時間才能完成。建議操作員一次刪除一到兩個主題,以避免對控制器造成過大負擔。

消費者群組管理

列出和描述消費者群組

kafka-consumer-groups.sh 工具可用於管理消費者群組,包括列出群組、描述特定群組、刪除群組或重置偏移量等操作。使用 --list 引數可以列出所有消費者群組,而使用 --describe--group 引數可以取得特定群組的詳細資訊,包括其消費的主題、分割槽、偏移量等。

# 列出所有消費者群組
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 描述特定消費者群組
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer

刪除消費者群組

使用 --delete 引數可以刪除整個消費者群組,包括其儲存的所有偏移量。在執行刪除操作前,需要確保群組內沒有活躍的消費者成員。

重點注意事項

  1. 刪除主題是不可逆的操作,將導致所有訊息被刪除,請謹慎操作。
  2. 調整帶有鍵值的訊息主題的分割槽數量可能會影響消費者的消費行為,建議在建立主題時就規劃好分割槽數量。
  3. 刪除消費者群組前,請確保群組內沒有活躍的消費者成員,以避免錯誤發生。

程式碼範例與解析

# 建立主題
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 4 --replication-factor 2

# 描述主題
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

內容解密:

  1. kafka-topics.sh 是用於管理 Kafka 主題的命令列工具。
  2. --bootstrap-server localhost:9092 指定了 Kafka 叢集的連線地址。
  3. --create 引數用於建立新主題。
  4. --topic my-topic 指定了要建立的主題名稱。
  5. --partitions 4 設定了主題的分割槽數量為 4。
  6. --replication-factor 2 設定了每個分割槽的副本因子為 2,以提高資料的可用性和容錯性。
  7. --describe 引數用於取得指定主題的詳細資訊,包括其分割槽、分佈和副本狀態等。

Kafka 管理工具實務操作

Kafka 提供了多種管理工具來協助管理員監控、除錯及最佳化叢集運作。本文將介紹如何使用 kafka-consumer-groups.shkafka-configs.sh 工具進行消費者群組管理和動態組態變更。

消費者群組管理

Kafka 的 kafka-consumer-groups.sh 工具可用於管理消費者群組,包括列出所有消費者群組、檢查群組偏移量、管理偏移量等操作。

列出所有消費者群組

要列出 Kafka 叢集中的所有消費者群組,可以使用以下命令:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

這個命令會顯示當前叢集中所有活躍的消費者群組名稱。

檢查消費者群組偏移量

可以使用以下命令檢查特定消費者群組的偏移量:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-consumer --describe

此命令會顯示 my-consumer 群組的詳細偏移訊息,包括每個分割槽的當前偏移量、最新偏移量及滯後量。

重設消費者群組偏移量

在某些情況下,需要重設消費者群組的偏移量。例如,當需要重新處理某些訊息或跳過無法處理的訊息時,可以使用 --reset-offsets 引數:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-consumer --reset-offsets --to-current --dry-run

這個命令會模擬重設偏移量的操作,但不會實際執行。可以透過移除 --dry-run 引數來實際執行重設操作。

匯出和匯入偏移量

Kafka 支援將偏移量匯出到 CSV 檔案,並從 CSV 檔案匯入偏移量。這對於備份和還原偏移量非常有用。

匯出偏移量

要匯出特定主題的偏移量到 CSV 檔案,可以使用以下命令:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --export --group my-consumer --topic my-topic --reset-offsets --to-current --dry-run > offsets.csv

這會將 my-topic 主題的偏移量匯出到 offsets.csv 檔案中。

匯入偏移量

要從 CSV 檔案匯入偏移量,首先需要停止所有相關的消費者。然後,可以使用以下命令:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-consumer --from-file offsets.csv --execute

這會根據 offsets.csv 檔案中的內容重設 my-consumer 群組的偏移量。

動態組態變更

Kafka 的 kafka-configs.sh 工具允許管理員在不重新啟動叢集的情況下動態變更組態。

修改主題組態

可以使用以下命令修改特定主題的組態:

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

這個命令將 my-topic 主題的訊息保留時間修改為 1 小時(3600000 毫秒)。

有效的主題組態鍵值

Kafka 支援多種可動態修改的主題組態。部分重要的組態鍵值包括:

組態鍵值描述
cleanup.policy如果設為 compact,則只保留每個 key 的最新訊息。
compression.typeBroker 寫入訊息批次到磁碟時使用的壓縮型別。
retention.ms訊息保留時間。
max.message.bytes主題中單個訊息的最大大小(以位元組為單位)。

重點整理

  1. 消費者群組管理:使用 kafka-consumer-groups.sh 工具進行列出、檢查及重設消費者群組的偏移量。
  2. 偏移量管理:支援將偏移量匯出到 CSV 檔案並從中匯入,以方便備份和還原。
  3. 動態組態變更:使用 kafka-configs.sh 工具在執行階段修改主題、Broker 等的組態,無需重新啟動叢集。

範例程式碼解密:

以下是一個簡單的 shell 指令碼,用於檢查 Kafka 叢集中的所有消費者群組並將結果輸出到日誌檔案:

#!/bin/bash

# 設定 Kafka bootstrap server 位址
BOOTSTRAP_SERVER="localhost:9092"

# 列出所有消費者群組
echo "Listing all consumer groups..."
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --list > consumer_groups.log

# 對每個群組進行描述
for group in $(cat consumer_groups.log); do
    echo "Describing consumer group: $group"
    kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --group $group --describe >> consumer_groups.log
done

echo "Consumer group information has been saved to consumer_groups.log"

內容解密:

  1. 指令碼目的:自動化列出 Kafka 叢集中的所有消費者群組並檢查每個群組的詳細資訊。
  2. 變數設定BOOTSTRAP_SERVER 設定 Kafka 的連線位址。
  3. 列出消費者群組:使用 kafka-consumer-groups.sh 命令列出所有消費者群組並將結果輸出到 consumer_groups.log
  4. 迴圈檢查每個群組:對每個群組執行描述操作,並將結果追加到同一個日誌檔案中。
  5. 完成提示:輸出完成訊息,確認日誌已儲存。