Kafka 生產者負責將訊息傳送到 Kafka 叢集,而消費者則負責從 Kafka 叢集中讀取訊息。分割槽策略決定了訊息如何分配到不同的分割槽,這對於訊息的順序和負載平衡至關重要。預設的分割槽策略根據鍵的雜湊值,但開發者可以根據業務需求自定義分割槽策略,例如將特定客戶的訊息分配到專用分割槽,以避免熱點問題。生產者攔截器可以在訊息傳送前後執行自定義邏輯,例如統計訊息數量、修改訊息內容等。配額管理可以限制客戶端的生產和消費速率,防止個別客戶端佔用過多資源。消費者群組允許多個消費者共同消費同一個主題,實作負載平衡和高用性。當消費者加入或離開群組時,會觸發 Partition Rebalance,重新分配分割槽的所有權。理解這些機制對於構建高效可靠的 Kafka 應用至關重要。
Kafka 生產者分割槽策略與自定義實作
Kafka 是一種高效能的分散式訊息佇列系統,其生產者(Producer)負責將資料傳送到 Kafka 叢集。在資料傳送過程中,分割槽(Partition)策略扮演著至關重要的角色。預設的分割槽策略是根據雜湊(Hash)的方法,將資料均勻地分配到不同的分割槽中。
預設分割槽策略的特性
預設分割槽策略使用雜湊函式將鍵(Key)對映到分割槽。這種方法的優點是能夠在分割槽數量不變的情況下,保證相同的鍵始終被寫入相同的分割槽。然而,當分割槽數量發生變化時,這種保證就會被破壞。舊的記錄仍然保留在原來的分割槽中,而新的記錄可能會被寫入不同的分割槽。
自定義分割槽策略
在某些情況下,預設的分割槽策略可能無法滿足特定的業務需求。例如,假設某個客戶(Customer)「Banana」的交易量佔據了總交易量的 10% 以上,使用預設的雜湊分割槽策略可能會導致某個分割槽過載。為瞭解決這個問題,可以實作自定義的分割槽策略。
以下是一個自定義分割槽器的範例:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceof String))) {
throw new InvalidRecordException("We expect all messages to have customer name as key");
}
if (((String) key).equals("Banana")) {
return numPartitions - 1; // Banana will always go to last partition
}
// Other records will get hashed to the rest of the partitions
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
@Override
public void close() {}
}
內容解密:
BananaPartitioner類別實作了Partitioner介面,用於自定義分割槽策略。partition方法根據鍵(Key)的值決定分割槽。如果鍵是「Banana」,則將其分配到最後一個分割槽。- 其他記錄將使用雜湊函式分配到剩餘的分割槽中。
configure方法用於組態分割槽器,但在此範例中未被使用。close方法用於關閉分割槽器,但在此範例中未被實作。
標頭(Headers)與攔截器(Interceptors)
除了鍵(Key)和值(Value)之外,Kafka 記錄還可以包含標頭(Headers)。標頭是一種有序的鍵值對集合,用於新增有關記錄的中繼資料。
攔截器(Interceptors)是一種用於修改 Kafka 使用者端應用程式行為的機制。生產者攔截器(ProducerInterceptor)可以用於捕捉有關傳送記錄的資訊、修改記錄或增強記錄。
以下是一個簡單的生產者攔截器範例:
public class CountingProducerInterceptor implements ProducerInterceptor {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
static AtomicLong numSent = new AtomicLong(0);
static AtomicLong numAcked = new AtomicLong(0);
@Override
public void configure(Map<String, ?> map) {
Long windowSize = Long.valueOf((String) map.get("counting.interceptor.window.size.ms"));
executorService.scheduleAtFixedRate(CountingProducerInterceptor::run, windowSize, windowSize, TimeUnit.MILLISECONDS);
}
// ...
}
內容解密:
CountingProducerInterceptor類別實作了ProducerInterceptor介面,用於捕捉有關傳送記錄和確認(Acknowledgement)的資訊。configure方法用於組態攔截器,並啟動一個排程執行緒來定期執行某個任務。- 使用
AtomicLong變數來計數傳送的記錄和收到的確認。
Kafka 生產者攔截器與配額管理
Kafka 提供了多種機制來監控和控制客戶端的行為,包括生產者攔截器和配額管理。這些功能使得管理員能夠對 Kafka 叢集進行更精細的控制,確保系統的穩定性和效能。
生產者攔截器
生產者攔截器是一種機制,允許開發者在不修改客戶端程式碼的情況下,對生產者的行為進行監控和修改。攔截器可以實作 ProducerInterceptor 介面,該介面定義了幾個關鍵方法:
1. onSend(ProducerRecord producerRecord)
此方法在生產者傳送記錄之前被呼叫。它允許攔截器修改或記錄被傳送的資料。
public ProducerRecord onSend(ProducerRecord producerRecord) {
numSent.incrementAndGet();
return producerRecord;
}
內容解密:
onSend方法用於在傳送記錄之前進行攔截。numSent.incrementAndGet();用於增加已傳送記錄的計數。- 該方法傳回原始的
ProducerRecord物件,表示不對記錄進行任何修改。
2. onAcknowledgement(RecordMetadata recordMetadata, Exception e)
此方法在收到 Kafka 的確認回應時被呼叫,用於處理確認結果。
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
numAcked.incrementAndGet();
}
內容解密:
onAcknowledgement方法在收到 Kafka 的確認回應時被呼叫。numAcked.incrementAndGet();用於增加已確認記錄的計數。- 如果發生異常,
e引數將包含相關的異常資訊。
3. close()
此方法在生產者關閉時被呼叫,用於清理資源。
public void close() {
executorService.shutdownNow();
}
內容解密:
close方法在生產者關閉時被呼叫。executorService.shutdownNow();用於立即關閉執行緒池,釋放資源。
組態和使用生產者攔截器
要使用生產者攔截器,需要將其組態新增到生產者的組態中。可以透過以下步驟實作:
- 將包含攔截器的 JAR 檔案新增到類別路徑中。
- 建立一個組態檔案,包含攔截器的類別名和其他相關組態。
- 使用
--producer.config選項執行 Kafka 生產者客戶端,指定組態檔案。
配額管理
Kafka 提供了配額機制,用於限制客戶端的生產和消費速率。配額可以根據客戶端 ID、使用者或其他標準進行設定。
組態配額
配額可以透過修改 Kafka broker 的組態檔案或使用 kafka-configs 命令進行設定。例如:
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024' --entity-name clientC --entity-type clients
內容解密:
- 此命令用於為客戶端
clientC設定生產速率配額為 1024 位元組每秒。 --entity-type clients指定了配額適用的實體型別為客戶端。
節流行為
當客戶端達到其配額限制時,broker 將開始節流客戶端的請求,透過延遲回應來降低客戶端的請求速率。節流行為可以透過多個指標進行監控,例如 produce-throttle-time-avg 和 fetch-throttle-time-max。
Kafka消費者:從Kafka讀取資料
需要從Kafka讀取資料的應用程式會使用KafkaConsumer來訂閱Kafka主題並接收訊息。從Kafka讀取資料與從其他訊息系統讀取資料有些不同,涉及一些獨特的概念和想法。在瞭解如何使用消費者API之前,先了解這些概念是非常重要的。我們將先解釋一些重要的概念,然後透過一些範例展示如何使用消費者API來實作具有不同需求的應用程式。
Kafka消費者概念
要了解如何從Kafka讀取資料,首先需要了解其消費者和消費者群組。以下章節將介紹這些概念。
消費者和消費者群組
假設有一個應用程式需要從Kafka主題讀取訊息,對它們進行一些驗證,然後將結果寫入另一個資料儲存中。在這種情況下,應用程式將建立一個消費者物件,訂閱適當的主題,並開始接收訊息、驗證它們並寫入結果。這可能在一段時間內運作良好,但如果生產者寫入主題的訊息速率超過了應用程式驗證它們的速率怎麼辦?如果僅限於單個消費者讀取和處理資料,應用程式可能會越來越落後,無法跟上傳入訊息的速率。
顯然,需要擴充套件對主題的消費。就像多個生產者可以寫入同一個主題一樣,我們需要允許多個消費者從同一個主題讀取,將資料分攤給他們。Kafka消費者通常是消費者群組的一部分。當多個消費者訂閱同一個主題並屬於同一個消費者群組時,群組中的每個消費者將從主題中的不同分割槽子集接收訊息。
擴充套件消費
讓我們考慮一個具有四個分割槽的主題T1。現在,假設我們建立了一個新的消費者C1,它是群組G1中唯一的消費者,並用它來訂閱主題T1。消費者C1將獲得T1所有四個分割槽的所有訊息。
圖示說明:單一消費者群組與四個分割槽
如果我們向G1新增另一個消費者C2,每個消費者將只獲得兩個分割槽的訊息。也許來自分割槽0和2的訊息會到C1,而來自分割槽1和3的訊息會到消費者C2。
圖示說明:四個分割槽分配給群組中的兩個消費者
如果G1有四個消費者,那麼每個消費者將從單個分割槽讀取訊息。如果我們向單個群組新增的消費者超過了主題中的分割槽數量,那麼一些消費者將閒置並且不會接收任何訊息。
多個消費者群組
除了擴充套件單個應用程式之外,通常有多個應用程式需要從同一個主題讀取資料。實際上,Kafka的主要設計目標之一是使生成到Kafka主題的資料可供組織中的多種使用情況使用。在這些情況下,我們希望每個應用程式都能獲得所有訊息,而不是僅僅是一個子集。為了確保應用程式獲得主題中的所有訊息,請確保應用程式具有自己的消費者群組。與許多傳統的訊息系統不同,Kafka可以在不降低效能的情況下擴充套件到大量的消費者和消費者群組。
程式碼範例:建立Kafka消費者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
內容解密:
此範例程式碼建立了一個Kafka消費者,並訂閱了名為"foo"和"bar"的兩個主題。它使用了一個無限迴圈來不斷地輪詢Kafka代理以取得新的訊息記錄。poll方法用於取得一批訊息記錄,然後對每條記錄進行處理,包括列印其偏移量、鍵和值。這段程式碼演示瞭如何使用Kafka的KafkaConsumer類別來建立一個基本的訊息消費者,用於訂閱和處理Kafka主題中的訊息。
Consumer Groups 與 Partition Rebalance 機制
在 Kafka 中,Consumer Groups 是一種用於實作高用性和可擴充套件性的機制。當一個 Consumer Group 訂閱了一個或多個 Topic 時,Group 中的 Consumers 會共同消費這些 Topic 中的訊息。每個 Consumer 只會消費一部分的 Partition,這樣可以實作負載平衡和提高消費效率。
Consumer Groups 與 Partition Rebalance
當一個新的 Consumer 加入 Group 時,它會開始消費之前由其他 Consumer 消費的 Partition。同樣地,當一個 Consumer 離開或當機時,它所消費的 Partition 會被其他 Consumer 接管。這種 Partition 所有權的轉移被稱為 Rebalance。
Rebalance 提供了一種高用性和可擴充套件性的機制,讓我們可以輕鬆地新增或移除 Consumers。然而,在正常情況下,Rebalance 並不是我們想要的,因為它可能會導致短暫的消費停滯。
Eager Rebalances 與 Cooperative Rebalances
Kafka 中有兩種 Rebalance 策略:Eager Rebalances 和 Cooperative Rebalances。
Eager Rebalances:在 Eager Rebalance 中,所有 Consumers 都會停止消費,放棄它們所擁有的所有 Partition,然後重新加入 Group,並獲得新的 Partition 分配。這種方式會導致整個 Group 的短暫停滯。
+
- | Stop 消費 | +
| | v
- | 重新分配Partition | +
| | v
- | Resume 消費 | +
**圖示說明**:Eager Rebalance 的過程,包括停止消費、重新分配 Partition 和還原消費。
Cooperative Rebalances:Cooperative Rebalance(也稱為 Incremental Rebalance)則是逐步地將一部分 Partition 從一個 Consumer 分配給另一個 Consumer,並允許 Consumers 繼續處理未被重新分配的 Partition。這種方式可以減少停滯時間,特別是在大型 Consumer Group 中。
@startuml skinparam backgroundColor #FEFEFE skinparam componentStyle rectangle
title Kafka生產者消費者與分割槽策略
package “系統架構” { package “前端層” { component [使用者介面] as ui component [API 客戶端] as client }
package "後端層" {
component [API 服務] as api
component [業務邏輯] as logic
component [資料存取] as dao
}
package "資料層" {
database [主資料庫] as db
database [快取] as cache
}
}
ui –> client : 使用者操作 client –> api : HTTP 請求 api –> logic : 處理邏輯 logic –> dao : 資料操作 dao –> db : 持久化 dao –> cache : 快取
note right of api RESTful API 或 GraphQL end note
@enduml
**此圖示**展示了 Cooperative Rebalance 的逐步過程。
### Consumer 如何維護其在 Group 中的成員資格
Consumers 透過向 Kafka Broker(被指定為 Group Coordinator)傳送心跳訊號來維護其在 Group 中的成員資格和 Partition 所有權。只要 Consumer 持續傳送心跳訊號,它就被視為活著的。如果 Consumer 停止傳送心跳訊號一段時間,其會話就會超時,Group Coordinator 就會認為它已經死亡,並觸發 Rebalance。
#### 如何將 Partition 分配給 Consumers
當一個 Consumer 想要加入 Group 時,它會向 Group Coordinator 傳送 JoinGroup 請求。第一個加入 Group 的 Consumer 成為 Group Leader。Group Leader 負責使用 PartitionAssignor 的實作來決定哪些 Partition 應該由哪些 Consumers 處理。然後,Group Leader 將分配結果傳送給 Group Coordinator,後者再將這個資訊傳送給所有 Consumers。
### Static Group Membership
預設情況下,Consumer 在其 Consumer Group 中的身份是暫時的。當 Consumers 離開 Group 時,它們所分配到的 Partition 會被復原;當它們重新加入時,它們會被分配一個新的成員 ID 和一套新的 Partition,透過 Rebalance 協定。