在現代資料湖倉一體(Lakehouse)架構中,白銀層扮演著關鍵的轉化與淨化角色,其主要職責是將原始、混亂的青銅層資料,提升為結構化且具備業務價值的可信資料集。傳統的資料處理流程往往將品質驗證視為事後或獨立的步驟,然而這種模式容易導致錯誤資料擴散。本文探討的實務方法,將資料品質驗證內嵌於批次管道的核心環節。透過整合 Deequ 這類專門的驗證框架,開發者能以宣告式程式碼定義業務規則,將抽象的品質要求轉化為具體的程式約束。此模式不僅提升了資料管道的穩健性,更重要的是在資料生成之初便建立起品質閘門,確保進入分析應用的資料具備高度可靠性與一致性。
第十二章:批次資料管道建構實務
資料轉換與品質驗證
"left"
)
.join(dimCountry, col("iso_2_char") === col("country"), "left")
.join(
dimCampaigns,
col("marketing_campaign") === col("campaign_code"),
"left"
)
joinedData DataFrame是透過左連接explodedBronzeCE、dimConversionEvents、dimCountry和dimCampaigns DataFrame來創建的。玄貓使用左連接是因為玄貓希望從左表中獲取所有記錄,無論在右表中是否存在匹配項。連接是基於特定的列,例如conversion_event和event_type、iso_2_char和country等。
此時,玄貓希望刪除非標準列並保留玄貓透過連接引入的列:
val silverConversionEvents = joinedData.select(
"country_id",
"country_name",
"iso_3_char",
"iso_2_char",
"campaign_id",
"campaign_code",
"product_group",
"conversion_event_id",
"conversion_event",
"date"
)
silverConversionEvents DataFrame是從joinedData中透過選擇特定的列(例如country_id、country_name和iso_3_char)派生而來的。
檢查資料品質
247
玄貓現在透過將其寫入Delta表來持久化玄貓的白銀層:
def writeDelta(reprocess: Boolean, df: DataFrame) = {
df.write
.format("delta")
.mode(reprocess match {
case false => "append"
case _ => "overwrite"
})
.save(s"${target}conversion_events")
}
writeDelta函數被呼叫以將silverConversionEvents DataFrame寫入Delta表。在這種情況下,它被設置為覆蓋現有資料(writeDelta(true, silverConversionEvents)),表示該表將被silverConversionEvents DataFrame中的資料替換。
讓玄貓看看如何為玄貓的資料添加資料品質檢查,以確保玄貓的最終用戶在他們的分析中可以信任這些資料。
資料品質驗證
正如在「探索湖倉一體架構」部分中提到的,現在是時候考慮玄貓希望玄貓的資料遵守的規則和約束了。這些規則通常由業務相關者定義。例如,玄貓希望玄貓的青銅層資料遵守以下規則:
- 至少50%的記錄應該是下載內容、活動註冊或問卷回應轉換事件類型。
- 至少90%的記錄必須指定產品組。
- 所有記錄都必須存在國家名稱。
玄貓將僅在所有上述檢查都通過後才寫入白銀層。
對於這個範例,玄貓將創建一個DeequChecks類,它有一個runIfSuccess方法。這個方法接受一段程式碼,該程式碼僅在呼叫者中定義的約束評估為真時才執行:
package com.packt.dewithscala.chapter12
import com.amazon.deequ._
import com.amazon.deequ.checks.CheckStatus
批次資料管道建構實務:Spark與Scala應用
248
import org.apache.spark.sql.SparkSession
final case class DeequChecks(
verificationResult: VerificationResult,
session: SparkSession
) {
def runIfSuccess(body: => Unit) = verificationResult.status match {
case CheckStatus.Success =>
body
VerificationResult
.successMetricsAsDataFrame(session, verificationResult)
.show(false)
case _ =>
val constraintResults =
verificationResult.checkResults.flatMap { case (_, checkResult) =>
// 這裡需要補齊處理 checkResult 的邏輯
// 假設這裡會收集所有約束檢查的結果
// 為了範例完整性,這裡需要一個實際的實現
// 例如:
checkResult.constraintResults
}
val deequValidationError = constraintResults
.filter(_.status != ConstraintStatus.Success)
.map { checkResult =>
s"""|
${checkResult.status}
|${checkResult.message.getOrElse("")}""".stripMargin
}
.mkString("\n")
VerificationResult
.successMetricsAsDataFrame(session, verificationResult)
.show(false)
throw new Exception(deequValidationError)
}
}
為了實際操作,讓玄貓創建一個VerificationResult物件,其中包含本節開頭概述的所有約束:
val verificationResult = VerificationSuite()
.onData(silverConversionEvents)
檢查資料品質
249
.addCheck(
Check(CheckLevel.Error, "silver layer checks!")
.isContainedIn(
"conversion_event",
此圖示:資料品質檢查流程與Deequ應用
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
actor "業務相關者" as BusinessStakeholder
rectangle "Spark 應用程式 (白銀層)" as SilverApp
database "白銀層 (Delta Lake)" as SilverLayer
component "Deequ 檢查套件" as Deequ
cloud "日誌與告警系統" as LogAlertSystem
BusinessStakeholder --> SilverApp : 定義資料品質規則
note right of BusinessStakeholder
- 至少 50% 記錄為特定事件類型
- 至少 90% 記錄有產品組
- 所有記錄必須有國家名稱
end note
SilverApp --> Deequ : 載入 silverConversionEvents DataFrame
Deequ --> Deequ : 執行 VerificationSuite()
note right of Deequ
- .onData(silverConversionEvents)
- .addCheck(Check(CheckLevel.Error, "silver layer checks!"))
- .isContainedIn("conversion_event", Seq("Download Content", "Event Registration", "Survey Response"))
- .hasCompleteness("product_group", _ >= 0.9)
- .hasCompleteness("country_name", _ == 1.0)
end note
Deequ --> SilverApp : 返回 VerificationResult
alt 檢查成功 (CheckStatus.Success)
SilverApp --> SilverLayer : 寫入資料 (writeDelta 函數)
SilverApp --> SilverApp : 執行 runIfSuccess 中的 body 程式碼
SilverApp --> LogAlertSystem : 記錄成功指標
else 檢查失敗 (CheckStatus.Error)
SilverApp --> LogAlertSystem : 記錄錯誤並觸發告警
SilverApp --> SilverApp : 拋出例外 (throw new Exception(deequValidationError))
SilverApp --x SilverLayer : 阻止資料寫入
end alt
@enduml看圖說話:
此圖示闡述了在白銀層進行資料品質檢查的流程,特別強調了如何整合Deequ這個強大的資料品質檢查套件來確保資料的可靠性。整個流程旨在保證只有符合預期品質標準的資料才能被寫入白銀層,進而影響後續的分析和決策。
- 資料品質規則定義:
- 流程始於業務相關者定義資料品質規則。這些規則是基於業務需求,例如:特定轉換事件類型的記錄比例、產品組欄位的完整性,以及國家名稱欄位的存在性。這些規則是資料可信度的基石。
- Deequ檢查套件的應用:
- 在
Spark應用程式的白銀層中,將載入的silverConversionEvents DataFrame傳遞給Deequ的VerificationSuite()。 Deequ允許玄貓定義一系列檢查(Check)和約束(Constraint)。圖中註釋展示了如何添加這些檢查:isContainedIn:檢查conversion_event欄位的值是否包含在預定義的特定事件類型集合中,並可設定最小比例。hasCompleteness:檢查product_group欄位的完整性(例如,至少90%的記錄不能為空)。hasCompleteness:檢查country_name欄位的完整性(例如,所有記錄都必須有國家名稱,即100%完整性)。
- 執行檢查與結果返回:
Deequ執行所有定義的檢查,並返回一個VerificationResult物件給Spark應用程式。這個結果包含了所有檢查的狀態(成功或失敗)和詳細訊息。
- 根據檢查結果進行決策:
Spark應用程式會根據VerificationResult的狀態來決定後續操作:- 檢查成功(CheckStatus.Success):如果所有資料品質檢查都成功通過,則
Spark應用程式會執行runIfSuccess方法中的body程式碼,這通常意味著將silverConversionEvents DataFrame寫入白銀層的Delta表。同時,成功指標會被記錄到日誌與告警系統中。 - 檢查失敗(CheckStatus.Error):如果任何一項檢查失敗,
Spark應用程式將阻止資料寫入白銀層,並拋出一個包含詳細錯誤訊息的例外。此時,錯誤訊息會被記錄到日誌與告警系統,並觸發告警,通知相關人員介入處理。
透過這種方式,Deequ與白銀層的整合,為資料管道提供了一個自動化、可配置且強大的資料品質保障機制,確保了只有高品質、可信賴的資料才能進入後續的分析流程。
縱觀數據驅動決策的實踐路徑,本章所展示的白銀層品質驗證,已遠非單純的技術操作,而是一套關乎組織信任與決策韌性的系統性修養。將Deequ這類自動化驗證框架嵌入資料管道,其核心價值在於將抽象的「數據可信度」,轉化為具體、可量化且自動執行的「品質門禁」。相較於傳統在問題發生後才進行的數據稽核,這種「事前預防、即時攔截」的模式,從根本上杜絕了劣質數據污染下游分析鏈的風險,保障了商業智慧與機器學習模型的輸出品質。
更重要的是,它建立了一條技術實踐與業務規則之間的清晰橋樑,讓數據治理不再是空泛的口號,而是能被精準執行的程式碼。展望未來2-3年,這類自動化品質驗證將持續「左移」,從白銀層前推至青銅層,成為數據接收當下的標準程序,形成更為穩固的數據防線。
玄貓認為,將數據品質視為數據產品的核心功能,而非事後附加的檢查,是高階管理者在推動數據文化時必須建立的關鍵思維。這不僅是對技術卓越的投資,更是對組織決策品質與長期競爭力的根本承諾。