在數據驅動的商業環境中,即時處理串流數據是獲取競爭優勢的關鍵。然而,建構穩健的串流管道常面臨數據一致性、延遲與容錯等挑戰。本文探討一種結合 Apache Spark Structured Streaming 與 Delta Lake 的現代數據架構,旨在解決傳統串流處理的痛點。藉由 Delta Lake 的 ACID 事務與高效合併操作,開發者能在串流情境下實現可靠的數據寫入與更新。文章將拆解從原始數據攝取、中繼層的精煉轉換,到最終服務層的業務價值聚合,呈現一個端到端的即時數據處理方案,為複雜的數據工程需求提供實踐指引。

第十三章:構建串流管道:Spark與Scala的即時資料處理

數據攝取:從Kafka到Delta Lake的串流旅程

最後,串流作業被啟動並設置為等待終止,這意味著它將一直運行直到手動停止

玄貓的程式碼展示了使用Apache Spark Structured Streaming串流數據Kafka攝取到Delta Lake儲存層的典型設置,並包含必要的配置和設置以確保數據的可靠性和安全性

讓玄貓繼續在玄貓的白銀層中轉換玄貓的數據

轉換數據:從青銅到白銀的數據精煉

玄貓的白銀層程式碼將處理玄貓的設備數據,扁平化轉換去重來自玄貓青銅層的數據

請參閱以下程式碼:

val reprocess: Boolean = args(0).toBoolean

val bronzeSource: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/bronze/data/"

val target: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/silver/"

此程式碼根據命令列參數定義了一個reprocess布林變數,並設置了青銅層白銀層數據源檔案路徑,如下所示:

val bronzeData: DataFrame = spark.read.format("delta").
load(bronzeSource)

在這裡,該過程將位於bronzeSource路徑的Delta Lake青銅層中的數據載入到名為bronzeDataDataFrame中。

接下來,以下程式碼定義了一個用於解析bronzeDatavalue欄位中的JSON數據模式

val jsonSchema: StructType = StructType(
Seq(
StructField("device_id", StringType),
StructField("country", StringType),

構建串流管道:Spark與Scala應用程式

264

StructField("event_type", StringType),
StructField("event_ts", TimestampType)
)
)

它指定了數據的結構,包含device_idcountryevent_typeevent_ts等欄位。

現在,以下程式碼處理並轉換bronzeData中的數據:

val updateSilver: DataFrame = bronzeData
.select(from_json(col("value"), jsonSchema).alias("value"))
.select(
col("value.country"),
col("value.event_type"),
col("value.event_ts")
)
.dropDuplicates("device_id", "country", "event_ts")

它從value欄位中的JSON數據中提取欄位,並根據device_idcountryevent_ts欄位刪除重複的行,創建一個名為updateSilverDataFrame

接下來,以下程式碼檢查reprocess變數的值:

reprocess match {
case true =>
updateSilver.write
.format("delta")
.mode("overwrite")
.save(s"${target}silver_devices")
case _ => {
val silverTarget = DeltaTable.forPath(spark, s"{$target}silver_
devices")

silverTarget
.as("devices")
.merge(
updateSilver.as("update"),
"update.country AND devices.event_ts = update.event_ts"
)
.whenNotMatched()
.insertAll()
.execute()
}
}

創建服務層

265

如果reprocesstrue,它將覆蓋整個白銀層。如果reprocess是其他任何值,它將使用Delta Lakemerge操作更新白銀層,根據指定的條件將updateSilver中的數據合併到現有的白銀層中。

總而言之,此程式碼從青銅層獲取數據,應用轉換,並更新白銀層,同時根據reprocess的值考慮是重新處理還是增量更新。它利用Delta Lake有效地管理數據更新和轉換

創建服務層:構建黃金層數據

玄貓管道的最後一步將是寫入玄貓的服務層,稱為黃金層

請參閱以下程式碼:

private def writeDelta(df: DataFrame, tableName: String) = {
df.write
.format("delta")
.mode("overwrite")
.save(s"${target}${tableName}")
}

此程式碼定義了一個可重用函數writeDelta,它接受一個DataFrame (df)和一個表名tableName作為參數。它將DataFrame寫入具有指定表名的Delta Lake表,如果該表已存在,則覆蓋現有數據

接下來,請看以下程式碼:

此圖示:白銀層數據轉換與更新流程

看圖說話:

此圖示詳細展示了數據從青銅層流向白銀層的轉換與更新過程,這是串流管道數據精煉的關鍵步驟。它突顯了Spark如何處理原始數據,進行結構化去重,並利用Delta Lake高級功能高效管理數據更新

  1. 讀取青銅層數據
  • 流程從Delta Lake青銅層開始。Spark應用程式使用spark.read.format("delta").load(bronzeSource)指令,將青銅層中未經處理的原始數據載入為一個DataFrame。這些數據通常以key-value對的形式存在,其中value是一個包含IoT設備事件的JSON字串。
  1. JSON解析與扁平化
  • 載入的原始DataFrame會經過JSON解析步驟。程式碼使用from_json(col("value"), jsonSchema)函數,根據預先定義的jsonSchema來解析value欄位中的JSON字串。
  • 解析後,數據會被扁平化,從一個複雜的JSON結構轉換為一個結構化的DataFrame,其中包含device_idcountryevent_typeevent_ts等直接可用的欄位。
  1. 數據去重
  • 在數據被寫入白銀層之前,會執行去重操作。由於Kafka至少一次傳遞語義,數據可能會出現重複。
  • dropDuplicates("device_id", "country", "event_ts")指令會根據這三個關鍵欄位來識別並移除重複的行,確保白銀層數據的唯一性品質。這一步產生了updateSilver DataFrame
  1. Delta Lake合併/覆蓋邏輯
  • 這是將處理後的數據寫入白銀層的關鍵決策點,由reprocess布林變數控制:
  • 完全覆蓋(overwrite)模式:如果reprocesstrue,表示需要完全重新處理重建白銀層。此時,updateSilver DataFrame將使用mode("overwrite")寫入Delta Lake白銀層,替換所有現有數據
  • 增量合併(merge)模式:如果reprocessfalse,則採用增量更新策略。程式碼使用Delta Lakemerge操作(DeltaTable.forPath().merge())。merge操作能夠根據指定的匹配條件(例如update.country AND devices.event_ts = update.event_ts)來智能地更新插入刪除數據。在這裡,它主要用於插入新數據whenNotMatched().insertAll()),實現高效的增量更新,這在處理持續串流數據時至關重要。

這個流程展示了如何將原始、可能重複的串流數據轉換為清洗、結構化且去重白銀層數據,並利用Delta Lake的強大功能來實現靈活高效的數據管理。

第十三章:構建串流管道:Spark與Scala的即時資料處理

創建服務層:構建黃金層數據

val silverSource: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/silver/ silver_devices"

val target: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/gold/"

val silverData: DataFrame = spark.read.format("delta").
load(silverSource)

在上述程式碼中,定義了白銀層黃金層檔案路徑。玄貓的程式碼將位於silverSource路徑的Delta Lake白銀層中的數據載入到名為silverDataDataFrame中。

現在,玄貓處理來自silverData的數據,以按日期和事件類型計算事件數量

val case1Df: DataFrame = silverData
.groupBy(to_date($"event_ts").alias("event_date"), $"event_type")
.agg(count($"event_type").alias("event_count"))

writeDelta(
case1Df,
"event_by_date"
)

此程式碼按event_ts列的日期部分和event_type列對數據進行分組,然後計算每個日期每個事件類型事件數量。結果使用writeDelta函數寫入名為event_by_dateDelta Lake表。

接下來,玄貓需要編寫程式碼來查找最新的設備狀態

private val windowSpec =
Window.partitionBy("device_id").orderBy(desc("event_ts"))

private val dfWindowedRank = silverData
.withColumn("dense_rank", dense_rank().over(windowSpec))

val case2Df: DataFrame = dfWindowedRank
.filter("dense_rank = 1")
.drop("dense_rank")

writeDelta(
case2Df,
"latest_device_status"
)

此程式碼創建了一個視窗規範,用於按device_id分區並按event_ts降序排序。然後,它根據視窗規範在每個分區內計算密集排名DataFrame被過濾,只保留密集排名為1的行,代表最新的設備狀態。結果使用writeDelta函數寫入名為latest_device_statusDelta Lake表。

玄貓現在有一個運作中的串流處理,讓玄貓在下一節中繼續進行編排

編排玄貓的串流處理:Databricks Workflows應用

玄貓在上一章的批次處理中使用了Argo進行編排,因此讓玄貓使用Databricks Workflows來展示在雲端環境中排程玄貓流程的另一種方式。請參閱第二章以了解如何使用Databricks社群版

運用從前幾章中學到的知識來構建您的Spark jar並將其部署到Databricks可以訪問的位置。這可以是雲端儲存帳戶或您的Databricks File System (DBFS)

前往您的Databricks工作區,並在左側導航中尋找工作流程(Workflows)

編排玄貓的串流處理

267

圖13.3 – Databricks導航介面

以下是創建新工作流程以編排您的管道的步驟:

  1. 在您的Databricks左側導航中,點擊工作流程(Workflows)。玄貓流程的第一步將是創建一個作業來運行玄貓的串流處理。透過點擊**「創建作業」來創建一個新工作流程,然後輸入您的jar位置,以及類別名稱套件**。請參閱以下螢幕截圖範例:

圖13.4 – Databricks工作流程中的串流任務

構建串流管道:Spark與Scala應用程式

268

  1. 接下來,玄貓將使用一個特殊觸發器來確保只有一個進程持續運行。在作業的右側區域,找到排程與觸發器(Schedule & Triggers)部分並點擊添加觸發器(Add trigger)

圖13.5 – Databricks排程與觸發器

  1. 從下拉列表中選擇連續觸發器類型(Continuous trigger type)

圖13.6 – Databricks觸發器類型

  1. 接下來,請確保您選擇活動觸發器狀態(Active trigger status),因為這將在您保存時立即啟動該過程:

圖13.7 – Databricks觸發器狀態

  1. 點擊保存(Save),您的流程將開始。連續觸發器是一種特殊類型的觸發器,它將確保此流程始終運行。由於玄貓的流程是串流管道,這是一個理想的觸發器

  2. 接下來,玄貓希望創建另一個管道,該管道排程每15分鐘運行一次,以處理玄貓從青銅層到白銀層再到黃金層的數據

此圖示:黃金層數據生成與更新流程

看圖說話:

此圖示闡明了從白銀層數據生成和更新黃金層數據的流程,這是串流管道最終服務層。它展示了Spark如何從已清洗和去重的白銀層中提取業務價值,並將其轉化為兩個不同用途的黃金層表

  1. 讀取白銀層數據
  • 流程從Delta Lake白銀層開始。Spark應用程式讀取silverSource路徑下的Delta Lake表,將其載入為silverData DataFrame。這個DataFrame包含了經過清洗、去重且結構化的設備狀態事件
  1. 聚合計算(每日事件數)
  • 為了滿足**「識別每天設備狀態的總數」的業務需求,silverData會進行聚合操作**。
  • 程式碼使用groupBy(to_date($"event_ts").alias("event_date"), $"event_type")事件日期事件類型進行分組。
  • 然後,使用agg(count($"event_type").alias("event_count"))計算每個組的事件總數,生成case1Df
  • case1Df隨後透過writeDelta函數寫入Delta Lake黃金層表event_by_date,模式為覆蓋(overwrite)。這確保了每日統計數據始終是最新的。
  1. 最新狀態計算(設備狀態)
  • 為了滿足**「識別任何設備的當前狀態」的業務需求,silverData會進行視窗函數計算**。
  • 首先定義一個視窗規範Window.partitionBy("device_id").orderBy(desc("event_ts")),這表示按device_id分組,並在每個分組內按event_ts降序排序(最新事件在前)。
  • 接著,使用withColumn("dense_rank", dense_rank().over(windowSpec))為每個設備的事件分配一個密集排名
  • 最後,透過filter("dense_rank = 1")選取排名為1的行,即每個設備的最新事件,生成case2Df
  • case2Df隨後透過writeDelta函數寫入Delta Lake黃金層表latest_device_status,模式同樣為覆蓋(overwrite)。這確保了每個設備的當前狀態始終是最新的。
  1. Delta Lake寫入器(覆蓋模式)
  • writeDelta函數是一個通用的寫入工具,它將處理後的DataFrame覆蓋模式寫入指定的Delta Lake表。這意味著每次運行時,這些黃金層表都會被最新的計算結果完全替換,確保了數據的即時性一致性

這個流程有效地將白銀層的詳細事件數據轉化為業務分析和即時查詢所需的高度聚合和精煉的黃金層數據,為決策者提供了清晰、及時的洞察

第十三章:構建串流管道:Spark與Scala的即時資料處理

數據攝取:從Kafka到Delta Lake的串流旅程

最後,串流作業被啟動並設置為等待終止,這意味著它將一直運行直到手動停止

玄貓的程式碼展示了使用Apache Spark Structured Streaming串流數據Kafka攝取到Delta Lake儲存層的典型設置,並包含必要的配置和設置以確保數據的可靠性和安全性

讓玄貓繼續在玄貓的白銀層中轉換玄貓的數據

轉換數據:從青銅到白銀的數據精煉

玄貓的白銀層程式碼將處理玄貓的設備數據,扁平化轉換去重來自玄貓青銅層的數據

請參閱以下程式碼:

val reprocess: Boolean = args(0).toBoolean

val bronzeSource: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/bronze/data/"

val target: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/silver/"

此程式碼根據命令列參數定義了一個reprocess布林變數,並設置了青銅層白銀層數據源檔案路徑,如下所示:

val bronzeData: DataFrame = spark.read.format("delta").
load(bronzeSource)

在這裡,該過程將位於bronzeSource路徑的Delta Lake青銅層中的數據載入到名為bronzeDataDataFrame中。

接下來,以下程式碼定義了一個用於解析bronzeDatavalue欄位中的JSON數據模式

val jsonSchema: StructType = StructType(
Seq(
StructField("device_id", StringType),
StructField("country", StringType),

構建串流管道:Spark與Scala應用程式

264

StructField("event_type", StringType),
StructField("event_ts", TimestampType)
)
)

它指定了數據的結構,包含device_idcountryevent_typeevent_ts等欄位。

現在,以下程式碼處理並轉換bronzeData中的數據:

val updateSilver: DataFrame = bronzeData
.select(from_json(col("value"), jsonSchema).alias("value"))
.select(
col("value.country"),
col("value.event_type"),
col("value.event_ts")
)
.dropDuplicates("device_id", "country", "event_ts")

它從value欄位中的JSON數據中提取欄位,並根據device_idcountryevent_ts欄位刪除重複的行,創建一個名為updateSilverDataFrame

接下來,以下程式碼檢查reprocess變數的值:

reprocess match {
case true =>
updateSilver.write
.format("delta")
.mode("overwrite")
.save(s"${target}silver_devices")
case _ => {
val silverTarget = DeltaTable.forPath(spark, s"{$target}silver_
devices")

silverTarget
.as("devices")
.merge(
updateSilver.as("update"),
"update.country AND devices.event_ts = update.event_ts"
)
.whenNotMatched()
.insertAll()
.execute()
}
}

創建服務層

265

如果reprocesstrue,它將覆蓋整個白銀層。如果reprocess是其他任何值,它將使用Delta Lakemerge操作更新白銀層,根據指定的條件將updateSilver中的數據合併到現有的白銀層中。

總而言之,此程式碼從青銅層獲取數據,應用轉換,並更新白銀層,同時根據reprocess的值考慮是重新處理還是增量更新。它利用Delta Lake有效地管理數據更新和轉換

創建服務層:構建黃金層數據

玄貓管道的最後一步將是寫入玄貓的服務層,稱為黃金層

請參閱以下程式碼:

private def writeDelta(df: DataFrame, tableName: String) = {
df.write
.format("delta")
.mode("overwrite")
.save(s"${target}${tableName}")
}

此程式碼定義了一個可重用函數writeDelta,它接受一個DataFrame (df)和一個表名tableName作為參數。它將DataFrame寫入具有指定表名的Delta Lake表,如果該表已存在,則覆蓋現有數據

接下來,請看以下程式碼:

此圖示:白銀層數據轉換與更新流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

database "Delta Lake (青銅層)" as BronzeLayer
rectangle "Spark 應用程式 (白銀層轉換)" as SilverApp {
component "讀取青銅層數據" as ReadBronze
component "JSON 解析與扁平化" as JsonParse
component "數據去重" as Deduplicate
component "Delta Lake 合併/覆蓋邏輯" as DeltaMerge
}
database "Delta Lake (白銀層)" as SilverLayer

BronzeLayer --> ReadBronze : 載入原始數據
ReadBronze --> JsonParse : 原始 DataFrame (key, value)
JsonParse --> Deduplicate : 結構化 DataFrame (device_id, country, event_type, event_ts)
Deduplicate --> DeltaMerge : 去重後的 DataFrame (updateSilver)

DeltaMerge --> SilverLayer : 寫入/更新白銀層數據

note left of ReadBronze
spark.read.format("delta").load(bronzeSource)
end note

note right of JsonParse
- from_json(col("value"), jsonSchema)
- 提取所需欄位
end note

note right of Deduplicate
- dropDuplicates("device_id", "country", "event_ts")
- 確保數據唯一性
end note

note right of DeltaMerge
- reprocess = true: overwrite 模式
- reprocess = false: merge 模式 (upsert)
- 使用 DeltaTable.forPath().merge()
end note

@enduml

看圖說話:

此圖示詳細展示了數據從青銅層流向白銀層的轉換與更新過程,這是串流管道數據精煉的關鍵步驟。它突顯了Spark如何處理原始數據,進行結構化去重,並利用Delta Lake高級功能高效管理數據更新

  1. 讀取青銅層數據
  • 流程從Delta Lake青銅層開始。Spark應用程式使用spark.read.format("delta").load(bronzeSource)指令,將青銅層中未經處理的原始數據載入為一個DataFrame。這些數據通常以key-value對的形式存在,其中value是一個包含IoT設備事件的JSON字串。
  1. JSON解析與扁平化
  • 載入的原始DataFrame會經過JSON解析步驟。程式碼使用from_json(col("value"), jsonSchema)函數,根據預先定義的jsonSchema來解析value欄位中的JSON字串。
  • 解析後,數據會被扁平化,從一個複雜的JSON結構轉換為一個結構化的DataFrame,其中包含device_idcountryevent_typeevent_ts等直接可用的欄位。
  1. 數據去重
  • 在數據被寫入白銀層之前,會執行去重操作。由於Kafka至少一次傳遞語義,數據可能會出現重複。
  • dropDuplicates("device_id", "country", "event_ts")指令會根據這三個關鍵欄位來識別並移除重複的行,確保白銀層數據的唯一性品質。這一步產生了updateSilver DataFrame
  1. Delta Lake合併/覆蓋邏輯
  • 這是將處理後的數據寫入白銀層的關鍵決策點,由reprocess布林變數控制:
  • 完全覆蓋(overwrite)模式:如果reprocesstrue,表示需要完全重新處理重建白銀層。此時,updateSilver DataFrame將使用mode("overwrite")寫入Delta Lake白銀層,替換所有現有數據
  • 增量合併(merge)模式:如果reprocessfalse,則採用增量更新策略。程式碼使用Delta Lakemerge操作(DeltaTable.forPath().merge())。merge操作能夠根據指定的匹配條件(例如update.country AND devices.event_ts = update.event_ts)來智能地更新插入刪除數據。在這裡,它主要用於插入新數據whenNotMatched().insertAll()),實現高效的增量更新,這在處理持續串流數據時至關重要。

這個流程展示了如何將原始、可能重複的串流數據轉換為清洗、結構化且去重白銀層數據,並利用Delta Lake的強大功能來實現靈活高效的數據管理。

第十三章:構建串流管道:Spark與Scala的即時資料處理

創建服務層:構建黃金層數據

val silverSource: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/silver/ silver_devices"

val target: String = "./src/main/scala/com/packt/dewithscala/
chapter13/data/gold/"

val silverData: DataFrame = spark.read.format("delta").
load(silverSource)

在上述程式碼中,定義了白銀層黃金層檔案路徑。玄貓的程式碼將位於silverSource路徑的Delta Lake白銀層中的數據載入到名為silverDataDataFrame中。

現在,玄貓處理來自silverData的數據,以按日期和事件類型計算事件數量

val case1Df: DataFrame = silverData
.groupBy(to_date($"event_ts").alias("event_date"), $"event_type")
.agg(count($"event_type").alias("event_count"))

writeDelta(
case1Df,
"event_by_date"
)

此程式碼按event_ts列的日期部分和event_type列對數據進行分組,然後計算每個日期每個事件類型事件數量。結果使用writeDelta函數寫入名為event_by_dateDelta Lake表。

接下來,玄貓需要編寫程式碼來查找最新的設備狀態

private val windowSpec =
Window.partitionBy("device_id").orderBy(desc("event_ts"))

private val dfWindowedRank = silverData
.withColumn("dense_rank", dense_rank().over(windowSpec))

val case2Df: DataFrame = dfWindowedRank
.filter("dense_rank = 1")
.drop("dense_rank")

writeDelta(
case2Df,
"latest_device_status"
)

此程式碼創建了一個視窗規範,用於按device_id分區並按event_ts降序排序。然後,它根據視窗規範在每個分區內計算密集排名DataFrame被過濾,只保留密集排名為1的行,代表最新的設備狀態。結果使用writeDelta函數寫入名為latest_device_statusDelta Lake表。

玄貓現在有一個運作中的串流處理,讓玄貓在下一節中繼續進行編排

編排玄貓的串流處理:Databricks Workflows應用

玄貓在上一章的批次處理中使用了Argo進行編排,因此讓玄貓使用Databricks Workflows來展示在雲端環境中排程玄貓流程的另一種方式。請參閱第二章以了解如何使用Databricks社群版

運用從前幾章中學到的知識來構建您的Spark jar並將其部署到Databricks可以訪問的位置。這可以是雲端儲存帳戶或您的Databricks File System (DBFS)

前往您的Databricks工作區,並在左側導航中尋找工作流程(Workflows)

編排玄貓的串流處理

267

圖13.3 – Databricks導航介面

以下是創建新工作流程以編排您的管道的步驟:

  1. 在您的Databricks左側導航中,點擊工作流程(Workflows)。玄貓流程的第一步將是創建一個作業來運行玄貓的串流處理。透過點擊**「創建作業」來創建一個新工作流程,然後輸入您的jar位置,以及類別名稱套件**。請參閱以下螢幕截圖範例:

圖13.4 – Databricks工作流程中的串流任務

構建串流管道:Spark與Scala應用程式

268

  1. 接下來,玄貓將使用一個特殊觸發器來確保只有一個進程持續運行。在作業的右側區域,找到排程與觸發器(Schedule & Triggers)部分並點擊添加觸發器(Add trigger)

圖13.5 – Databricks排程與觸發器

  1. 從下拉列表中選擇連續觸發器類型(Continuous trigger type)

圖13.6 – Databricks觸發器類型

  1. 接下來,請確保您選擇活動觸發器狀態(Active trigger status),因為這將在您保存時立即啟動該過程:

圖13.7 – Databricks觸發器狀態

  1. 點擊保存(Save),您的流程將開始。連續觸發器是一種特殊類型的觸發器,它將確保此流程始終運行。由於玄貓的流程是串流管道,這是一個理想的觸發器

  2. 接下來,玄貓希望創建另一個管道,該管道排程每15分鐘運行一次,以處理玄貓從青銅層到白銀層再到黃金層的數據

此圖示:黃金層數據生成與更新流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

database "Delta Lake (白銀層)" as SilverLayer
rectangle "Spark 應用程式 (黃金層生成)" as GoldApp {
component "讀取白銀層數據" as ReadSilver
component "聚合計算 (每日事件數)" as Aggregation1
component "最新狀態計算 (設備狀態)" as Aggregation2
component "Delta Lake 寫入器 (覆蓋)" as DeltaWriter
}
database "Delta Lake (黃金層 - 每日事件數)" as GoldDailyCount
database "Delta Lake (黃金層 - 最新設備狀態)" as GoldLatestStatus

SilverLayer --> ReadSilver : 載入白銀層數據
ReadSilver --> Aggregation1 : DataFrame (silverData)
Aggregation1 --> DeltaWriter : DataFrame (case1Df)
DeltaWriter --> GoldDailyCount : 寫入/覆蓋 `event_by_date` 表

ReadSilver --> Aggregation2 : DataFrame (silverData)
Aggregation2 --> DeltaWriter : DataFrame (case2Df)
DeltaWriter --> GoldLatestStatus : 寫入/覆蓋 `latest_device_status` 表

note left of ReadSilver
spark.read.format("delta").load(silverSource)
end note

note right of Aggregation1
- groupBy(to_date($"event_ts"), $"event_type")
- agg(count($"event_type"))
- 滿足「識別每天設備狀態的總數」需求
end note

note right of Aggregation2
- Window.partitionBy("device_id").orderBy(desc("event_ts"))
- withColumn("dense_rank", dense_rank().over(windowSpec))
- filter("dense_rank = 1")
- 滿足「識別任何設備的當前狀態」需求
end note

note right of DeltaWriter
- writeDelta(df, tableName) 函數
- mode("overwrite")
end note

@enduml

看圖說話:

此圖示闡明了從白銀層數據生成和更新黃金層數據的流程,這是串流管道最終服務層。它展示了Spark如何從已清洗和去重的白銀層中提取業務價值,並將其轉化為兩個不同用途的黃金層表

  1. 讀取白銀層數據
  • 流程從Delta Lake白銀層開始。Spark應用程式讀取silverSource路徑下的Delta Lake表,將其載入為silverData DataFrame。這個DataFrame包含了經過清洗、去重且結構化的設備狀態事件
  1. 聚合計算(每日事件數)
  • 為了滿足**「識別每天設備狀態的總數」的業務需求,silverData會進行聚合操作**。
  • 程式碼使用groupBy(to_date($"event_ts").alias("event_date"), $"event_type")事件日期事件類型進行分組。
  • 然後,使用agg(count($"event_type").alias("event_count"))計算每個組的事件總數,生成case1Df
  • case1Df隨後透過writeDelta函數寫入Delta Lake黃金層表event_by_date,模式為覆蓋(overwrite)。這確保了每日統計數據始終是最新的。
  1. 最新狀態計算(設備狀態)
  • 為了滿足**「識別任何設備的當前狀態」的業務需求,silverData會進行視窗函數計算**。
  • 首先定義一個視窗規範Window.partitionBy("device_id").orderBy(desc("event_ts")),這表示按device_id分組,並在每個分組內按event_ts降序排序(最新事件在前)。
  • 接著,使用withColumn("dense_rank", dense_rank().over(windowSpec))為每個設備的事件分配一個密集排名
  • 最後,透過filter("dense_rank = 1")選取排名為1的行,即每個設備的最新事件,生成case2Df
  • case2Df隨後透過writeDelta函數寫入Delta Lake黃金層表latest_device_status,模式同樣為覆蓋(overwrite)。這確保了每個設備的當前狀態始終是最新的。
  1. Delta Lake寫入器(覆蓋模式)
  • writeDelta函數是一個通用的寫入工具,它將處理後的DataFrame覆蓋模式寫入指定的Delta Lake表。這意味著每次運行時,這些黃金層表都會被最新的計算結果完全替換,確保了數據的即時性一致性

這個流程有效地將白銀層的詳細事件數據轉化為業務分析和即時查詢所需的高度聚合和精煉的黃金層數據,為決策者提供了清晰、及時的洞察

縱觀現代即時數據處理的多元挑戰,本章節所構築的端到端串流管道,已不僅是技術堆疊的展示,更是一套成熟的數據資產管理哲學。它清晰地揭示了從原始訊號到商業洞察的價值提煉路徑,體現了現代數據工程的核心思維。

此架構的突破性在於,透過青銅、白銀、黃金的多層次設計,巧妙地將數據的原始保真性、清洗品質與業務價值徹底解耦,擺脫了傳統ETL流程的脆弱與僵化。然而,其真正的挑戰並非單點技術的實現,而在於對不同處理模式的整合調度——從Kafka的持續串流攝取,到白銀層兼具增量合併(merge)與批次重處理(reprocess)的彈性,再到黃金層的週期性聚合覆蓋。這種設計賦予了數據管道前所未有的靈活性與可維護性,讓管理者能在即時性、準確性與運算成本之間做出精準的策略權衡。

展望未來2-3年,這種將串流與批次處理無縫融合於單一儲存層(Delta Lake)的Lakehouse模式,正迅速從前沿探索走向業界標準。數據的即時性將不再是特殊應用場景的昂貴選項,而是企業數據平台必須具備的基礎能力。

玄貓認為,這套架構不僅是技術的實現,更代表了一種數據工程的思維躍遷。對高階管理者與技術領導者而言,掌握其分層、解耦與彈性調度的設計哲學,將是未來構建穩健、可擴展數據資產,並從中持續獲取競爭優勢的關鍵能力。