Kafka 作為高效能分散式訊息佇列,在現代資料架構中扮演關鍵角色。其核心設計理念在於提供高吞吐量、持久化和容錯能力,支援多生產者多消費者模式。Kafka 的叢集架構由多個 Broker 組成,每個主題分為多個分割槽,確保負載平衡和資料冗餘。MirrorMaker 工具則能實作跨叢集資料同步,滿足不同資料隔離和災難復原需求。Kafka 的資料持久化機制和可組態保留策略,保障資料的可靠性和靈活性。
Kafka 不僅是訊息系統,更是資料生態系統的核心樞紐,串聯不同應用程式和系統,實作資料生產者和消費者的解耦。它支援多種型別資料處理,提供結構化資料流,賦能即時資料處理和分析。Kafka 的應用場景廣泛,涵蓋活動追蹤、訊息傳遞、指標和日誌收集、提交日誌和串流處理等。其高效能、可擴充套件性和多生產者、多消費者支援,使其成為企業構建資料管道的首選。Kafka 的誕生源於 LinkedIn 對高吞吐量、持久化訊息系統的需求,解決了早期資料處理系統的諸多瓶頸,並逐步演變為開源專案,在全球範圍內得到廣泛應用。
Apache Kafka 技術深度解析
Apache Kafka 是一種高效能、分散式、具永續性的訊息佇列系統,廣泛應用於大資料處理、即時資料分析等領域。其設計核心在於提供高效的訊息傳遞機制,支援多生產者、多消費者的架構,以及可擴充套件的叢集管理。
叢集架構與分割槽機制
Kafka 的叢集架構由多個 Broker 組成,每個主題(Topic)被劃分為多個分割槽(Partition)。每個分割槽都有一個 Leader 和多個 Follower,Leader 負責處理該分割槽的所有寫入請求,而 Follower 則負責從 Leader 複製資料,以確保資料的冗餘和容錯能力。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 叢集架構與分割槽機制
rectangle "produce" as node1
rectangle "replicate" as node2
rectangle "consume" as node3
node1 --> node2
node2 --> node3
@enduml內容解密:
此圖示展示了 Kafka 中生產者將訊息傳送至 Leader,然後由 Leader 將資料複製到 Follower 的過程。消費者可以從 Leader 或 Follower 讀取資料,提高了資料讀取的靈活性。
資料持久化與保留策略
Kafka 提供了資料持久化功能,將訊息儲存在磁碟上,並根據設定的保留策略來管理資料的儲存時間或大小。使用者可以根據不同的主題設定不同的保留策略,例如將某些主題的資料保留數天,而將其他主題的資料保留數小時。
多叢集架構與 MirrorMaker
隨著 Kafka 佈署規模的擴大,使用多叢集架構變得越來越普遍。多叢集架構可以用於隔離不同型別的資料、滿足安全需求或實作災難還原。Kafka 提供了 MirrorMaker 工具,用於在不同叢集之間複製資料,從而實作資料的聚合和分析。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 多叢集架構與 MirrorMaker
rectangle "多叢集架構" as n1
rectangle "MirrorMaker" as n2
n1 --> n2
@enduml內容解密:
此圖示展示瞭如何使用 MirrorMaker 將多個本地叢集的資料聚合到一個中央叢集,並進一步將該叢集的資料複製到其他資料中心,實作跨資料中心的資料同步。
為何選擇 Kafka?
Kafka 的優勢在於其能夠無縫處理多個生產者和消費者,支援高效的訊息傳遞和持久化儲存。其可擴充套件性使得使用者可以輕鬆地從小規模叢集擴充套件到大規模叢集,同時保持系統的高用性和效能。
- 多生產者支援:Kafka 可以處理來自多個生產者的訊息,使其成為聚合來自不同前端系統資料的理想選擇。
- 多消費者支援:Kafka 允許多個消費者獨立讀取訊息流,而不會相互幹擾。
- 根據磁碟的持久化儲存:訊息被寫入磁碟並根據可組態的保留規則進行儲存,確保了資料的永續性和可靠性。
- 可擴充套件性:Kafka 的設計使其能夠輕鬆擴充套件,以處理大量資料和支援高吞吐量的應用場景。
- 高效能:Kafka 在高負載下仍能保持優異的效能,提供亞秒級的訊息延遲。
綜上所述,Apache Kafka 憑藉其強大的功能和靈活的架構,已成為大資料和即時資料處理領域的重要工具。其在多生產者、多消費者支援、資料持久化和可擴充套件性方面的優勢,使其成為許多企業的首選訊息佇列系統。
Apache Kafka 在資料生態系統中的角色
Apache Kafka 不僅是一個訊息系統,更是現代資料生態系統中的核心元件。它提供了不同應用程式和系統之間的資料流通管道,實作資料生產者與消費者之間的解耦。Kafka 的設計使其能夠處理多種型別的資料,並提供結構化的資料流,對於即時資料處理和分析至關重要。
資料生態系統的迴圈
在資料處理環境中,應用程式扮演著不同的角色。有些應用程式負責生成或引入資料到系統中,而其他的則消費這些資料以生成報告、指標或其他資料產品。這種資料的流動形成了一個迴圈,不斷地在不同的系統和應用程式之間傳遞和轉換。Apache Kafka 在這個生態系統中扮演著「迴圈系統」的角色,它負責在不同的基礎設施成員之間傳遞訊息,為所有客戶端提供一致的介面。
使用案例
活動追蹤
Kafka 最初在 LinkedIn 的使用案例是使用者活動追蹤。前端應用程式會生成有關使用者操作的事件訊息,這些訊息被發布到 Kafka 的主題中,然後由後端應用程式消費,用於生成報告、更新搜尋結果或進行其他操作,以提供豐富的使用者經驗。
訊息傳遞
Kafka 也被用於訊息傳遞場景,應用程式可以生成通知訊息而無需關心訊息的格式或傳送方式。一個單獨的應用程式可以讀取所有待傳送的訊息,並統一處理,包括格式化、聚合和根據使用者偏好傳送通知。
指標和日誌收集
Kafka 非常適合收集應用程式和系統的指標和日誌。應用程式可以定期將指標發布到 Kafka 主題中,這些指標可以被監控和預警系統消費,也可以被用於離線分析,如 Hadoop,用於進行長期分析。
提交日誌
由於 Kafka 根據提交日誌的概念,資料函式庫變更可以被發布到 Kafka,主題中的應用程式可以輕鬆地監控這個變更流,以接收即時更新。這個變更日誌流也可以用於將資料函式庫更新複製到遠端系統,或將多個應用程式的變更合併到單一的資料函式庫檢視中。
流處理
流處理是另一個重要的應用領域。Kafka 的訊息可以被流處理框架處理,執行諸如計數指標、分割訊息或使用多源資料轉換訊息等任務。流處理使得對資料的即時處理成為可能,這與 Hadoop 中常見的長時間範圍內的資料聚合不同。
Kafka 的起源
Kafka 是為瞭解決 LinkedIn 的資料管道問題而建立的。它被設計為一個高效能的訊息系統,能夠處理多種型別的資料,並提供關於使用者活動和系統指標的乾淨、結構化的即時資料。
LinkedIn 的問題
LinkedIn 面臨著與本章開頭描述的類別似問題:一個用於收集系統和應用程式指標的系統存在諸多缺陷,包括根據輪詢的指標收集、指標收集間隔大、以及缺乏對應用程式擁有者管理自身指標的支援。這個系統需要大量的人工干預,並且不一致。
Kafka 的誕生與發展
在 LinkedIn,早期的資料處理系統存在多個問題。監控系統和使用者活動追蹤系統各自獨立,且無法共用後端服務。監控系統使用精簡的指標名稱,但不同系統間的指標名稱不一致。同時,使用者活動追蹤系統被設計為一個 HTTP 服務,前端伺服器定期將 XML 格式的批次訊息推播到該服務。這些訊息隨後被傳輸到離線處理平台進行解析和整理。
初期的挑戰
這個系統存在多個缺陷。首先,XML 格式不一致,解析過程計算成本高。其次,變更追蹤的使用者活動型別需要前端和離線處理平台之間的協調工作。即使如此,系統仍然頻繁因架構變更而中斷。此外,該系統根據小時級別的批次處理,無法實作即時追蹤。
監控和使用者活動追蹤無法共用同一個後端服務,因為監控服務過於笨重,資料格式不適合活動追蹤,且監控的輪詢模式與追蹤的推播模式不相容。使用者活動追蹤服務由於其脆弱性和批次處理模式,不適合用於即時監控和警示。然而,這兩類別資料具有許多共同特徵,將它們相關聯(如特定型別的使用者活動如何影回應用程式效能)是非常有價值的。使用者活動的特定型別下降可能指示服務該活動的應用程式存在問題,但處理活動批次的延遲意味著對這些問題的反應遲緩。
探索現有解決方案
起初,團隊研究了現有的開源解決方案,以尋找能夠提供即時資料存取並擴充套件以處理所需訊息流量的新系統。他們使用 ActiveMQ 建立了原型系統,但當時它無法滿足規模需求。此外,對於 LinkedIn 的使用方式,ActiveMQ 是個脆弱的解決方案,他們發現了許多會導致代理程式暫停的缺陷。這些暫停會備份客戶端的連線,並幹擾應用程式向使用者提供請求的能力。因此,團隊決定開發自定義的資料管道基礎設施。
Kafka 的誕生
LinkedIn 的開發團隊由 Jay Kreps 長官,他曾負責開發和開源發布分散式鍵值儲存系統 Voldemort。初始團隊還包括 Neha Narkhede 和後來的 Jun Rao。他們共同致力於建立一個能夠滿足監控和追蹤系統需求,並能擴充套件以滿足未來需求的訊息傳遞系統。主要目標包括:
- 使用推拉模式解耦生產者和消費者
- 為訊息資料提供永續性,以允許多個消費者消費
- 最佳化訊息的高吞吐量
- 允許系統水平擴充套件,以跟上資料流的增長
最終,他們開發了一個發布/訂閱訊息傳遞系統,其介面類別似於典型的訊息傳遞系統,但儲存層更像日誌聚合系統。結合採用 Apache Avro 進行訊息序列化,Kafka 能夠有效地處理每日數十億條訊息的指標和使用者活動追蹤。
開源與商業化
Kafka 於 2010 年底在 GitHub 上作為開源專案發布,並於 2011 年 7 月被接受為 Apache 軟體基金會的孵化專案。2012 年 10 月,Apache Kafka 從孵化器畢業。自此,它不斷被改進,並在 LinkedIn 以外建立了一個強大的貢獻者和提交者社群。如今,Kafka 被用於世界上一些最大的資料管道中,包括 Netflix、Uber 等多家公司。
Kafka 的廣泛採用創造了一個健康的生態系統。全球數十個國家的活躍聚會小組提供了本地的討論和支援,還有許多與 Apache Kafka 相關的開源專案。LinkedIn 繼續維護多個專案,包括 Cruise Control、Kafka Monitor 和 Burrow。Confluent 也發布了多個專案,如 ksqlDB、schema registry 和 REST proxy。
商業參與
2014 年秋天,Jay Kreps、Neha Narkhede 和 Jun Rao 離開 LinkedIn,創立了 Confluent,一家專注於為 Apache Kafka 提供開發、企業支援和培訓的公司。他們還與其他公司(如 Heroku)合作提供 Kafka 的雲端服務。Confluent 透過與 Google 的合作夥伴關係,在 Google Cloud Platform 上提供受管理的 Kafka 叢集,以及在 Amazon Web Services 和 Azure 上提供類別似服務。Confluent 的另一個重要舉措是組織 Kafka Summit 會議系列。自 2016 年開始,每年在美國和倫敦舉辦,Kafka Summit 為社群提供了一個全球性的知識分享平台。
名稱由來
Jay Kreps 解釋了 Kafka 名稱的由來:
我認為,由於 Kafka 是一個為寫入而最佳化的系統,使用一位作家的名字是合理的。我在大學裡上了很多文學課,並且喜歡 Franz Kafka。另外,這個名字對於一個開源專案來說聽起來很酷。所以基本上,它們之間沒有太大的關係。
結語
瞭解了 Kafka 的歷史和發展後,我們可以開始建立自己的資料管道。在下一章中,我們將探討如何安裝和組態 Kafka,包括選擇合適的硬體以及在轉向生產營運時需要注意的事項。
安裝 Kafka 的步驟
在開始使用 Apache Kafka 之前,需要設定環境並安裝必要的元件,包括 Java 和 ZooKeeper。本章節將詳細介紹如何在 Linux 環境下安裝 Kafka,以及一些基本的組態選項和硬體選擇建議。
環境設定
選擇作業系統
Apache Kafka 是一個 Java 應用程式,可以在多個作業系統上執行。雖然 Kafka 可以在 Windows、macOS、Linux 等多個 OS 上執行,但 Linux 是推薦的作業系統。本章節的安裝步驟將重點放在 Linux 環境下的設定和使用。
安裝 Java
在安裝 ZooKeeper 或 Kafka 之前,需要設定好 Java 環境。Kafka 和 ZooKeeper 可以與所有根據 OpenJDK 的 Java 實作(包括 Oracle JDK)一起正常工作。最新版本的 Kafka 支援 Java 8 和 Java 11。建議安裝最新發布的修補程式版本的 Java 環境,因為舊版本可能存在安全漏洞。
# 安裝 JDK 版本 11 更新 10 到 /usr/java/jdk-11.0.10
export JAVA_HOME=/usr/java/jdk-11.0.10
安裝 ZooKeeper
Apache Kafka 使用 Apache ZooKeeper 儲存關於 Kafka 叢集的後設資料,以及消費者客戶端的詳細資訊。ZooKeeper 是一個集中式服務,用於維護組態資訊、命名、提供分散式同步和提供群組服務。
獨立伺服器
ZooKeeper 提供了一個基本的示例組態檔案,可以在大多數情況下正常工作。下面是手動建立一個基本組態的示例:
# tar -zxf apache-zookeeper-3.5.9-bin.tar.gz
# mv apache-zookeeper-3.5.9-bin /usr/local/zookeeper
# mkdir -p /var/lib/zookeeper
# cp > /usr/local/zookeeper/conf/zoo.cfg << EOF
> tickTime=2000
> dataDir=/var/lib/zookeeper
> clientPort=2181
> EOF
# /usr/local/zookeeper/bin/zkServer.sh start
驗證 ZooKeeper 是否正常執行
可以透過連線到客戶端埠並傳送四字母命令 srvr 來驗證 ZooKeeper 是否正確執行:
# telnet localhost 2181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
srvr
Zookeeper version: 3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 5
Connection closed by foreign host.
ZooKeeper 叢集
ZooKeeper 被設計為以叢集(稱為 ensemble)方式工作,以確保高用性。建議 ensemble 包含奇數個伺服器(例如 3、5 等),因為大多數 ensemble 成員必須正常工作,ZooKeeper 才能回應請求。
組態 ZooKeeper 叢集
要組態 ZooKeeper 叢集中的伺服器,它們必須具有列出所有伺服器的共同組態,並且每個伺服器需要在資料目錄中具有 myid 檔案,以指定伺服器的 ID 號。
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888
組態解說:
tickTime:ZooKeeper 中使用的基本時間單位(毫秒)。dataDir:儲存資料的目錄。clientPort:客戶端連線的埠。initLimit:允許 follower 連線到 leader 的時間限制。syncLimit:限制 follower 與 leader 不同步的時間。server.X:指定 ensemble 中的伺服器,格式為server.id=host:port:port。
Apache Kafka 與 ZooKeeper 的安裝與組態
Apache Kafka 是一個分散式串流處理平台,而 ZooKeeper 則是用於管理和協調 Kafka 叢集的關鍵元件。本將介紹如何在單一機器上測試 ZooKeeper 叢集,以及如何安裝和組態 Kafka Broker。
組態 ZooKeeper 叢集
要組態 ZooKeeper 叢集,需要在 zoo.cfg 檔案中指定叢集中的所有伺服器。每個伺服器的組態格式如下:
server.X=hostname:peerPort:leaderPort
X是伺服器的 ID 號碼,必須是整數,但不一定是從零開始或連續的。hostname是伺服器的主機名稱或 IP 地址。peerPort是伺服器之間通訊的 TCP 連線埠。leaderPort是進行 Leader 選舉的 TCP 連線埠。
客戶端只需要能夠透過 clientPort 連線到叢集,但叢集中的伺服器必須能夠在所有三個連線埠上相互通訊。
@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333
title 組態 ZooKeeper 叢集
rectangle "peerPort" as node1
rectangle "leaderPort" as node2
node1 --> node2
@enduml此圖示展示了 ZooKeeper 叢集中伺服器之間的通訊關係。
在單一機器上測試 ZooKeeper 叢集
可以在單一機器上測試 ZooKeeper 叢集,方法是將所有主機名稱指定為 localhost,並為每個例項指定唯一的 peerPort 和 leaderPort。此外,還需要為每個例項建立一個單獨的 zoo.cfg 檔案,指定唯一的 dataDir 和 clientPort。
安裝 Kafka Broker
安裝 Kafka 之前,需要先組態好 Java 和 ZooKeeper。可以從 Kafka 官網下載最新的版本。以下範例將 Kafka 安裝在 /usr/local/kafka 目錄下,並組態使用之前啟動的 ZooKeeper 伺服器,將訊息日誌段儲存在 /tmp/kafka-logs 目錄下。
# tar -zxf kafka_2.13-2.7.0.tgz
# mv kafka_2.13-2.7.0 /usr/local/kafka
# mkdir /tmp/kafka-logs
# export JAVA_HOME=/usr/java/jdk-11.0.10
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
內容解密:
- 解壓縮 Kafka 安裝包:使用
tar命令解壓縮下載的 Kafka 安裝包。 - 移動 Kafka 目錄:將解壓縮後的 Kafka 目錄移動到
/usr/local/kafka。 - 建立日誌目錄:建立
/tmp/kafka-logs目錄用於儲存 Kafka 的訊息日誌段。 - 設定 JAVA_HOME 環境變數:指定 Java 的安裝路徑。
- 啟動 Kafka Broker:使用
kafka-server-start.sh指令碼以守護程式模式啟動 Kafka Broker。
驗證 Kafka Broker
啟動 Kafka Broker 後,可以透過建立測試主題、生產訊息和消費訊息來驗證其工作狀態。
# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test
# /usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
Test Message 1
Test Message 2
^C
# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Test Message 1
Test Message 2
^C
內容解密:
- 建立測試主題:使用
kafka-topics.sh指令碼建立一個名為test的主題。 - 生產訊息:使用
kafka-console-producer.sh指令碼向test主題生產訊息。 - 消費訊息:使用
kafka-console-consumer.sh指令碼從test主題消費訊息。
組態 Broker
Kafka 的預設組態足以執行單機伺服器,但對於大型安裝來說,需要調整多個組態引數。以下是一些重要的 Broker 組態引數:
broker.id:每個 Kafka Broker 必須有一個唯一的整數 ID。listeners:指定 Broker 監聽的 URI 清單。zookeeper.connect:指定用於儲存 Broker 元資料的 ZooKeeper 連線字串。log.dirs:指定儲存訊息日誌段的目錄清單。
這些引數對於在叢集環境中執行 Kafka 至關重要。正確組態這些引數可以確保 Kafka Broker 的高效執行和資料永續性。