在現代大規模數據處理管線中,確保資料品質是一項核心挑戰,傳統手動抽樣檢驗不僅效率低落,也難以應對複雜多變的資料來源。本文旨在闡述如何利用建構於 Apache Spark 之上的 Deequ 函式庫,將資料品質管理從被動的修正轉變為主動的預防性策略。文章聚焦於 Deequ 的兩大核心功能:其一為資料剖析與自動約束建議,此功能可自動掃描資料集,發掘潛在的資料規律與異常模式,並生成對應的品質檢查程式碼,大幅降低了規則定義的初期門檻;其二為明確的約束定義與驗證框架,讓開發者能根據業務邏輯,編寫嚴謹的資料品質測試套件。此整合性方法論,使資料品質監控得以無縫嵌入資料工程流程,確保數據在進入下游分析應用前的準確性與一致性。
第七章:資料剖析與資料品質
利用自動約束建議
Deequ提供了一個強大的功能,它可以分析資料並建議可以作為檢查應用的約束。為了了解它是如何工作的,玄貓將再次使用航班資料。在第四章中,玄貓定義了一個與資料庫協作的介面,玄貓將用它來創建一個DataFrame。然後,玄貓將把這個DataFrame傳遞給ConstraintSuggestionRunner,以便Deequ建議約束。
以下是完整的程式碼:
package com.packt.dewithscala.chapter7
import com.packt.dewithscala.utils._
import com.amazon.deequ.suggestions.ConstraintSuggestionRunner
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
object ConstraintSuggestion extends App {
val session: SparkSession = Spark.initSparkSession("de-with-scala")
val db: Database = Database("my_db")
Data Profiling and Data Quality
152
val df: DataFrame = db
.multiPartitionRead(
session = session,
dbTable = "my_db.flights",
partitionCol = "day_of_week",
upperBound = "7",
lowerBound = "1",
7
)
.filter(col("airline") === lit("US")) // 過濾出航空公司為 "US" 的航班
// constraint suggestions
val suggestionsResult: ConstraintSuggestionResult =
ConstraintSuggestionRunner()
.onData(df)
.addConstraintRules(Rules.DEFAULT) // 添加預設的約束規則
.run()
suggestionsResult.columnToSuggestions.foreach { case (column, suggestions) =>
suggestions.foreach { suggestion =>
println(
s"Constraint suggestion for '$column':\t${suggestion.
description}\n" +
s"The corresponding scala code is ${suggestion.
codeForConstraint}"
)
}
}
}
範例 7.2
以下片段顯示了截斷的輸出:
[info] Constraint suggestion for 'wheels_on': 'wheels_on' is not null
[info] The corresponding scala code is .isComplete("wheels_on")
[info] Constraint suggestion for 'departure_delay': 'departure_delay' is not null
[info] The corresponding scala code is .isComplete("departure_delay")
[info] Constraint suggestion for 'origin_airport': 'origin_airport' is not null
Defining constraints
153
[info] The corresponding scala code is .isComplete("origin_airport")
[info] Constraint suggestion for 'origin_airport': 'origin_airport'
has value range 'CLT', 'PHX', 'PHL', 'DCA', 'BOS', 'LGA', 'MCO',
'LAX', 'TPA', 'ORD', 'FLL', 'DFW', 'LAS', 'ATL', 'SFO', 'IAH', 'PIT',
'DEN', 'MSP', 'EWR', 'BWI', 'PBI', 'RSW', 'MIA', 'RDU', 'SAN', 'SEA',
'DTW', 'JFK', 'JAX', 'SLC', 'MCI' for at least 89.0% of values
[info] The corresponding scala code is .isContainedIn("origin_
airport", Array("CLT", "PHX", "PHL", "DCA", "BOS", "LGA", "MCO",
"LAX", "TPA", "ORD", "FLL", "DFW", "LAS", "ATL", "SFO", "IAH", "PIT",
"DEN", "MSP", "EWR", "BWI", "PBI", "RSW", "MIA", "RDU", "SAN", "SEA",
"DTW", "JFK", "JAX", "SLC", "MCI"), _ >= 0.89, Some("It should be
above 0.89!"))
[info] Constraint suggestion for 'day_of_week': 'day_
of_week' is not null
[info] The corresponding scala code is .isComplete("day_of_week")
[info] Constraint suggestion for 'day_of_week': 'day_of_week' has
value range '5', '4', '1', '2', '3', '7', '6'
[info] The corresponding scala code is .isContainedIn("day_of_week",
Array("5", "4", "1", "2", "3", "7", "6"))
[info] Constraint suggestion for 'day_of_week': 'day_of_week' has no
negative values
[info] The corresponding scala code is .isNonNegative("day_of_week")
前面的片段包含Deequ建議的約束的詳細資訊:
wheels_on欄位沒有任何null值。因此,玄貓可能希望對該欄位定義一個非空檢查,使用isComplete("wheels_on"),Deequ已將其列印到控制台。- 對於89%的記錄,
origin_airport具有列出的值之一。
此圖示:Deequ 自動約束建議流程
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "資料準備" as DataPrep {
rectangle "原始航班資料 (MySQL)" as RawFlightData
component "SparkSession 初始化" as SparkInit
component "資料讀取與過濾 (US 航空公司)" as DataReadFilter
rectangle "df: DataFrame (已過濾航班資料)" as FilteredDF
}
package "Deequ 自動約束建議" as DeequAutoSuggest {
component "ConstraintSuggestionRunner().onData(df)" as SuggestionRunnerInit
component "addConstraintRules(Rules.DEFAULT)" as AddDefaultRules
component "run() -> ConstraintSuggestionResult" as RunSuggestion
component "columnToSuggestions.foreach { case (column, suggestions) => ... }" as ProcessSuggestions
rectangle "建議的約束列表 (含 Scala 程式碼)" as SuggestedConstraints
}
DataPrep --> DeequAutoSuggest
RawFlightData --> SparkInit
SparkInit --> DataReadFilter
DataReadFilter --> FilteredDF
FilteredDF --> SuggestionRunnerInit
SuggestionRunnerInit --> AddDefaultRules
AddDefaultRules --> RunSuggestion
RunSuggestion --> ProcessSuggestions
ProcessSuggestions --> SuggestedConstraints : 輸出自動建議的資料品質約束
note right of SuggestedConstraints
- 自動識別資料模式並生成相應的 Deequ 約束
- 包含非空檢查、值域檢查、非負數檢查等
- 提供可直接使用的 Scala 程式碼片段
end note
@enduml看圖說話:
此圖示展示了Deequ如何自動建議資料品質約束的完整流程。首先,在資料準備階段,玄貓從MySQL資料庫中的原始航班資料開始。透過SparkSession的初始化,玄貓讀取並過濾出美國航空公司的航班資料,形成一個名為df的DataFrame。這個FilteredDF是進行約束建議的基礎。
接下來,在Deequ自動約束建議階段,玄貓將這個FilteredDF傳遞給ConstraintSuggestionRunner。玄貓透過addConstraintRules(Rules.DEFAULT)來啟用Deequ的預設約束建議規則,然後執行run()方法。run()方法會深入分析df的資料模式,並根據這些模式生成一系列潛在的資料品質約束。這些建議的結果會被封裝在ConstraintSuggestionResult中。最後,玄貓透過迭代columnToSuggestions來處理這些建議,並將每個建議的描述和對應的Scala程式碼片段列印出來,形成建議的約束列表。
這個流程的關鍵在於Deequ能夠自動識別資料中的潛在問題點,例如欄位是否應為非空、值是否應在特定範圍內、數值是否應為非負數等,並直接提供可執行的Scala程式碼,極大地簡化了資料品質規則的定義過程。這使得資料工程師可以更高效地建立和維護資料品質檢查。
第七章:資料剖析與資料品質
day_of_week欄位的值必須介於1到7之間,依此類推。
在本節中,玄貓研究了Deequ如何建議約束。在下一節中,玄貓將看到如何使用這些建議來定義實際要在資料集上執行的檢查。
定義約束
在上一節中,玄貓研究了Deequ如何自動建議約束的範例,以及玄貓如何收集各種資料指標。玄貓現在將定義玄貓期望DataFrame通過的實際約束。在下面的程式碼中,玄貓定義了玄貓期望航班資料通過的以下約束:
airline欄位不應包含任何NULL值。flight_number欄位不應包含任何NULL值。cancelled欄位應僅包含0或1。distance欄位不應包含任何負值。cancellation_reason欄位應僅包含A、B、C或D。
如果所有檢查都通過,那麼玄貓會在控制台列印「data looks good」;否則,玄貓會列印約束以及結果狀態。
這是它的程式碼。作為第一步,玄貓將使用玄貓在MySQL中載入的flights表格創建一個DataFrame:
val session = Spark.initSparkSession("de-with-scala")
val db = Database("my_db")
val df = db
.multiPartitionRead(
session = session,
dbTable = "my_db.flights",
partitionCol = "day_of_week",
upperBound = "7",
lowerBound = "1",
7
)
.filter(col("airline") === lit("US")) // 過濾出航空公司為 "US" 的航班
範例 7.3
定義好DataFrame後,玄貓可以定義玄貓希望DataFrame遵守的約束:
val verificationResult = VerificationSuite()
.onData(df)
.addCheck(
Check(CheckLevel.Error, "checks on flights data") // 定義一個錯誤級別的檢查
.isComplete("airline") // 檢查 airline 欄位是否完整
.isComplete("flight_number") // 檢查 flight_number 欄位是否完整
.isContainedIn("cancelled", Array("0", "1")) // 檢查 cancelled 欄位是否只包含 "0" 或 "1"
.isNonNegative("distance") // 檢查 distance 欄位是否非負
.isContainedIn("cancellation_reason", Array("A", "B", "C", "D")) // 檢查 cancellation_reason 欄位是否只包含 "A", "B", "C", "D"
)
.run()
範例 7.4
最後,玄貓可以列印結果:
verificationResult.status match {
case CheckStatus.Success => println("data looks good") // 所有檢查都通過
case _ => // 有檢查失敗
val constraintResults = verificationResult.checkResults.flatMap(check => check._2) // 從檢查結果中提取約束結果
constraintResults
.filter(_.status != ConstraintStatus.Success) // 過濾出失敗的約束
.foreach { checkResult =>
println(s"${checkResult.status}") // 列印約束狀態
println(checkResult.message.getOrElse("")) // 列印錯誤訊息
}
}
範例 7.5
運行此程式碼會將以下內容列印到控制台:
[info] for ComplianceConstraint(Compliance(cancellation_reason
contained in A,B,C,D,`cancellation_reason` IS NULL OR `cancellation_
reason` IN ('A','B','C','D'),None)) the check result was Failure
[info] Value: 0.020466497244797825 does not meet the constraint
requirement!
此失敗的原因是cancellation_reason也包含空白。事實上,訊息「Value: 0.020466497244797825 does not meet the constraint requirement」顯示只有2%的記錄通過了檢查。
在本節中,玄貓研究了如何定義玄貓期望資料集遵守的約束。在下一節中,玄貓將研究實際儲存Deequ收集的指標的機制,例如Amazon S3和Hadoop分散式檔案系統(HDFS)。
持久化資料品質指標使玄貓能夠運行分析以查看趨勢並發現資料中的任何波動。創建一個記憶體中儲存庫很簡單,如下一個範例所示:
val inMemoryRepo = new InMemoryMetricsRepository()
範例 7.6
同樣,玄貓可以按如下方式創建一個基於檔案的儲存庫:
val fileRepo = FileSystemMetricsRepository(sparkSession, filePath)
範例 7.7
每次運行的指標都使用ResultKey類型的鍵儲存。ResultKey定義為具有以下簽名的案例類別:
case class ResultKey(dataSetDate: Long, tags: Map[String, String] = Map.empty)
此圖示:Deequ 約束定義與結果儲存流程
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "資料準備與約束定義" as DataPrepAndConstraintDef {
rectangle "df: DataFrame (已過濾航班資料)" as FilteredDF
component "VerificationSuite().onData(df)" as VerificationInit
component "addCheck(Check(CheckLevel.Error, \"checks on flights data\"))" as AddCheck
component "isComplete(\"airline\")" as IsCompleteAirline
component "isComplete(\"flight_number\")" as IsCompleteFlightNumber
component "isContainedIn(\"cancelled\", Array(\"0\", \"1\"))" as IsContainedCancelled
component "isNonNegative(\"distance\")" as IsNonNegativeDistance
component "isContainedIn(\"cancellation_reason\", Array(\"A\", \"B\", \"C\", \"D\"))" as IsContainedCancellationReason
component "run() -> VerificationResult" as RunVerification
rectangle "verificationResult: 結果物件" as VerificationResultObj
}
package "結果處理與指標儲存" as ResultHandlingAndMetricsStorage {
component "verificationResult.status match { ... }" as MatchStatus
component "println(\"data looks good\")" as PrintSuccess
component "filter(_.status != ConstraintStatus.Success)" as FilterFailedConstraints
component "foreach { checkResult => println(...) }" as PrintFailureDetails
component "InMemoryMetricsRepository()" as InMemoryRepo
component "FileSystemMetricsRepository(sparkSession, filePath)" as FileSystemRepo
component "ResultKey(dataSetDate: Long, tags: Map[String, String])" as ResultKeyDef
rectangle "持久化指標儲存庫 (記憶體/檔案)" as MetricsRepository
}
FilteredDF --> VerificationInit
VerificationInit --> AddCheck
AddCheck --> IsCompleteAirline
IsCompleteAirline --> IsCompleteFlightNumber
IsCompleteFlightNumber --> IsContainedCancelled
IsContainedCancelled --> IsNonNegativeDistance
IsNonNegativeDistance --> IsContainedCancellationReason
IsContainedCancellationReason --> RunVerification
RunVerification --> VerificationResultObj
VerificationResultObj --> MatchStatus
MatchStatus --> PrintSuccess
MatchStatus --> FilterFailedConstraints
FilterFailedConstraints --> PrintFailureDetails
InMemoryRepo --> MetricsRepository
FileSystemRepo --> MetricsRepository
ResultKeyDef --> MetricsRepository : 定義指標儲存鍵
note right of VerificationResultObj
- 包含所有約束檢查的結果狀態
- 揭示資料是否符合預期品質標準
end note
note right of MetricsRepository
- 支援記憶體或檔案系統儲存
- 允許長期追蹤資料品質趨勢與波動
end note
@enduml看圖說話:
此圖示詳細闡述了Deequ中約束定義、驗證及結果儲存的完整流程。在資料準備與約束定義階段,玄貓從FilteredDF開始,這是玄貓已過濾的航班資料。玄貓使用VerificationSuite().onData(df)初始化一個驗證套件,並透過addCheck方法添加一個包含多個具體約束的檢查。這些約束包括:airline和flight_number的完整性檢查、cancelled欄位值是否包含在"0", "1"中的範圍檢查、distance欄位的非負數檢查,以及cancellation_reason欄位值是否包含在"A", "B", "C", "D"中的範圍檢查。所有這些約束都設定為CheckLevel.Error,表示任何失敗都將被視為錯誤。執行run()方法後,會生成一個VerificationResult物件,其中包含了所有檢查的狀態和詳細結果。
進入結果處理與指標儲存階段,玄貓首先透過verificationResult.status來判斷整體驗證結果。如果所有檢查都成功,則列印「data looks good」;否則,玄貓會過濾出所有失敗的約束,並列印其狀態和詳細錯誤訊息,例如圖示中顯示的cancellation_reason欄位的失敗案例。為了長期追蹤資料品質,Deequ提供了指標儲存庫的機制。玄貓可以選擇創建一個InMemoryMetricsRepository用於臨時儲存,或創建一個FileSystemMetricsRepository將指標持久化到檔案系統(如S3或HDFS)。這些指標都使用ResultKey進行索引,ResultKey包含dataSetDate和tags,方便玄貓日後查詢和分析資料品質趨勢。這個流程確保了資料在進入下游系統之前,其品質得到嚴格的控制和監測。
好的,這是一份為「第七章:資料剖析與資料品質」撰寫的玄貓風格結論。
第七章結論:從數據洞察到品質契約的工程實踐
從效能評估視角來看,本章深入剖析了 Deequ 如何將抽象的資料品質概念,轉化為具體、可衡量的工程實踐。Deequ 的自動約束建議(Constraint Suggestion)與手動定義驗證套件(Verification Suite),共同構建了從探索到執行的完整路徑。前者是高效的起點,能快速描繪資料輪廓,但其價值在於啟發而非最終規範;後者則是將隱性的業務規則顯性化,構建起數據流的關鍵防線。真正的挑戰在於融合兩者,將機器發現的模式與深刻的領域知識結合,避免因未預期的資料狀況(如 cancellation_reason 欄位的空值)導致的驗證盲點,從而確保數據的根本可靠性。
展望未來,Deequ 的核心潛力不僅是單次攔截錯誤,更在於透過指標持久化機制(Metrics Repository)建立的資料品質時間序列。這將驅動團隊從被動的「問題修復」思維,轉向主動的「趨勢預警」與「品質退化分析」,為整個數據平台建立起一套自我監控與持續優化的免疫系統。
玄貓認為,對於追求卓越數據績效的團隊,應將這些品質約束視為與業務邏輯同等重要的「品質即程式碼」(Quality-as-Code),並將其整合進持續交付流程中,這才是保障數據資產長期價值的根本之道。