在現代資料工程實踐中,確保數據流的穩定性與可靠性是至關重要的環節。單次的資料驗證雖然能捕捉當下的錯誤,卻無法揭示數據隨時間變化的趨勢或潛在的系統性問題。為此,建立一套能夠持久化儲存並分析歷史資料品質指標的機制,成為實現主動式資料治理的關鍵。本文將深入探討 Amazon Deequ 函式庫如何透過其指標儲存庫(MetricsRepository)功能,從單純的資料剖析與約束檢查,進階到基於歷史數據的趨勢分析與異常偵測。我們將展示如何將驗證結果系統性地儲存,並利用這些歷史紀錄設定動態基準,自動識別資料量、分佈或格式的異常波動,從而將資料品質監控從被動反應提升為主動預防的層次。

第七章:資料剖析與資料品質

範例 7.8

以下是一個ResultKey類型的範例鍵,玄貓將在玄貓的範例中使用:

val key = ResultKey(System.currentTimeMillis(), Map("tag" -> "metricsRepository"))

範例 7.9

對於玄貓的範例,玄貓將使用前面章節中使用的航班資料。玄貓將定義玄貓希望資料遵守的約束,並將相應的指標儲存在玄貓將查詢的儲存庫中。

有幾種方法可以查詢儲存在指標儲存庫中的結果,如下面的範例所示。與之前一樣,玄貓將從定義DataFrame開始:

val session = Spark.initSparkSession("de-with-scala")
val db = Database("my_db")
val df = db
.multiPartitionRead(
session = session,
dbTable = "my_db.flights",
partitionCol = "day_of_week",
upperBound = "7",
lowerBound = "1",
7
)
.filter(col("airline") === lit("US")) // 過濾出航空公司為 "US" 的航班

範例 7.10

然後,玄貓將使用一個臨時檔案定義一個指標儲存庫

val session: SparkSession = Spark.initSparkSession("de-with-scala")
val db: Database = Database("my_db")
val df: DataFrame = db
.multiPartitionRead(
session = session,
dbTable = "my_db.flights",
partitionCol = "day_of_week",
upperBound = "7",
lowerBound = "1",
7
)
.filter(col("airline") === lit("US")) // 過濾出航空公司為 "US" 的航班

範例 7.11

在下一步中,玄貓將使用MetricsRepository創建一個VerificationResult類別:

val verificationResult = VerificationSuite()
.onData(df)
.addCheck(
Check(CheckLevel.Error, "checks on flights data") // 定義一個錯誤級別的檢查
.isComplete("airline") // 檢查 airline 欄位是否完整
.isComplete("flight_number") // 檢查 flight_number 欄位是否完整
.isContainedIn("cancelled", Array("0", "1")) // 檢查 cancelled 欄位是否只包含 "0" 或 "1"
.isNonNegative("distance") // 檢查 distance 欄位是否非負
.isContainedIn("cancellation_reason", Array("A", "B", "C", "D")) // 檢查 cancellation_reason 欄位是否只包含 "A", "B", "C", "D"
)
.useRepository(fileRepo) // 使用檔案儲存庫
.saveOrAppendResult(key) // 儲存或附加結果
.run()

範例 7.12

最後,玄貓可以查詢指標儲存庫並查看結果:

// 獲取過去 100 秒內的所有指標
val metricsAsDF = fileRepo
.load()
.after(System.currentTimeMillis() - 100000)
.getSuccessMetricsAsDataFrame(session)
metricsAsDF.show()

// 根據鍵獲取指標
val metricsAsMap = fileRepo
.loadByKey(key)
.get
.metricMap
metricsAsMap.foreach { case (a, b) =>
println(s"For '${b.instance}' ${b.name} is ${b.value.get}")
}

// 根據標籤值獲取指標
val metricsJSON = fileRepo
.load()
.withTagValues(Map("tag" -> "metricsRepository"))
.getSuccessMetricsAsJson()
println(metricsJSON)

範例 7.13

metricsAsDF.show()將以表格格式列印指標

圖7.3 – 作為 DataFrame 的指標

metricsAsMap.foreach將列印指標

圖7.4 – 使用 foreach 列印指標

println(metricsJSON)將以JSON格式列印指標

[{"name":"Compliance","tag":"metricsRepository","dataset_
date":1683725262618,"instance":"distance is non-negative","enti-
ty":"Column","value":1.0},{"name":"Completeness","tag":"metricsRepos-
itory","dataset_date":1683725262618,"instance":"flight_number","en-
tity":"Column","value":1.0},{"name":"Compliance","tag":"metricsRepos-
itory","dataset_date":1683725262618,"instance":"cancelled contained
in 0,1","entity":"Column","value":1.0},{"name":"Compliance","tag":"-
metricsRepository","dataset_date":1683725262618,"instance":"can-
cellation_reason contained in A,B,C,D","entity":"Column","value":
0.020466497244797825},{"name":"Completeness","tag":"metricsRepos-
itory","dataset_date":1683725262618,"instance":"airline","enti-
ty":"Column","value":1.0}]

在本節中,玄貓研究了如何使用MetricsRepository來儲存Deequ收集的各種指標。在下一節中,玄貓將研究如何使用這些儲存的指標檢測資料中的異常

檢測異常

此圖示:Deequ 指標儲存與查詢流程

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "資料準備與約束執行" as DataPrepAndVerification {
rectangle "df: DataFrame (已過濾航班資料)" as FilteredDF
component "VerificationSuite().onData(df)" as VerificationInit
component "addCheck(Check(...))" as AddChecks
component "useRepository(fileRepo)" as UseFileRepo
component "saveOrAppendResult(key)" as SaveResult
component "run() -> VerificationResult" as RunVerification
rectangle "fileRepo: FileSystemMetricsRepository" as FileSystemRepo
}

package "指標查詢與展示" as MetricsQueryAndDisplay {
component "fileRepo.load().after(...).getSuccessMetricsAsDataFrame(session)" as QueryMetricsAsDF
component "metricsAsDF.show()" as ShowMetricsDF
component "fileRepo.loadByKey(key).get.metricMap" as QueryMetricsByKey
component "metricsAsMap.foreach { ... }" as ShowMetricsMap
component "fileRepo.load().withTagValues(...).getSuccessMetricsAsJson()" as QueryMetricsAsJson
component "println(metricsJSON)" as ShowMetricsJson
}

FilteredDF --> VerificationInit
VerificationInit --> AddChecks
AddChecks --> UseFileRepo
UseFileRepo --> SaveResult
SaveResult --> RunVerification
RunVerification --> FileSystemRepo : 將驗證結果指標儲存至檔案儲存庫

FileSystemRepo --> QueryMetricsAsDF
QueryMetricsAsDF --> ShowMetricsDF : 以 DataFrame 形式顯示指標

FileSystemRepo --> QueryMetricsByKey
QueryMetricsByKey --> ShowMetricsMap : 以 Map 形式顯示指標

FileSystemRepo --> QueryMetricsAsJson
QueryMetricsAsJson --> ShowMetricsJson : 以 JSON 形式顯示指標

note right of FileSystemRepo
- 持久化儲存資料品質指標
- 支援時間範圍、鍵和標籤查詢
end note

note right of ShowMetricsJson
- 提供多種格式展示指標,方便分析與整合
- 為後續異常檢測提供基礎數據
end note

@enduml

看圖說話:

此圖示詳盡地描繪了Deequ資料品質指標的儲存與查詢流程。在資料準備與約束執行階段,玄貓首先準備了FilteredDF,這是經過篩選的航班資料。玄貓使用VerificationSuite定義了一系列資料品質檢查,並將這些檢查應用於FilteredDF。關鍵步驟在於透過useRepository(fileRepo)指定使用一個檔案系統指標儲存庫 (FileSystemMetricsRepository),並利用saveOrAppendResult(key)將驗證結果的指標持久化儲存到這個fileRepo中。run()方法執行這些檢查並將結果儲存起來。

進入指標查詢與展示階段,玄貓展示了多種從fileRepo中檢索和展示指標的方法。玄貓可以透過fileRepo.load().after(...).getSuccessMetricsAsDataFrame(session)查詢特定時間範圍內的成功指標,並以DataFrame的形式展示,方便進行表格化分析。此外,玄貓可以透過fileRepo.loadByKey(key).get.metricMap使用之前儲存的ResultKey來精確地獲取特定運行結果的指標Map,並逐一列印其詳細資訊。最後,玄貓還可以透過fileRepo.load().withTagValues(...).getSuccessMetricsAsJson()根據自定義的標籤值來查詢指標,並將結果以JSON格式輸出,這對於與其他系統整合或進行程式化處理非常有用。這個流程的設計旨在提供靈活的指標儲存和多樣化的查詢方式,為後續的資料品質趨勢分析異常檢測奠定堅實的基礎。

第七章:資料剖析與資料品質

Deequ透過指標儲存庫支援資料中的異常檢測,這在上一節中已涵蓋。例如,玄貓可以創建一個規則來檢查記錄數是否比上次運行增加了50%。如果增加了,則檢查將失敗。

為了向玄貓展示它是如何工作的,玄貓將使用一個虛構的場景,玄貓每天都會收到一批產品以添加到庫存中。玄貓希望檢查玄貓在任何給定一天收到的產品數量是否比上次運行增加了50%。對於這個範例,玄貓將使用一個記憶體中儲存庫來儲存指標。像之前一樣,讓玄貓定義玄貓將在這個範例中使用的DataFrame

val session = Spark.initSparkSession("de-with-scala")
import session.implicits._

val yesterdayDF = Seq((1, "Product 1", 100), (2, "Product 2", 50)).toDF(
"product_id",
"product_name",
"cost_per_unit"
)

val todayDF = Seq(
(3, "Product 3", 70),
(4, "Product 4", 120),
(5, "Product 5", 65),
(6, "Product 6", 40)
).toDF("product_id", "product_name", "cost_per_unit")

範例 7.14

然後玄貓將定義一個記憶體中指標儲存庫和兩個結果鍵:

val repository = new InMemoryMetricsRepository()

val yesterdayKey = ResultKey(
System.currentTimeMillis() - 24 * 60 * 60 * 1000, // 昨天時間戳
Map("tag" -> "yesterday")
)

val todayKey = ResultKey(
System.currentTimeMillis(), // 當前時間戳
Map("tag" -> "now")
)

範例 7.15

然後玄貓運行異常檢查,首先在昨天的資料上使用記憶體中儲存庫來儲存Deequ收集的指標

VerificationSuite()
.onData(yesterdayDF)
.useRepository(repository)
.saveOrAppendResult(yesterdayKey)
.addAnomalyCheck(
RelativeRateOfChangeStrategy(maxRateIncrease = Some(1.5)), // 設定最大增長率為 1.5 (150%)
Size() // 對 Size 指標進行異常檢查
)
.run()

範例 7.16

最後,玄貓使用相同的指標儲存庫定義一個檢查,然後列印結果:

val verificationResult = VerificationSuite()
.onData(todayDF)
.useRepository(repository)
.saveOrAppendResult(todayKey)
.addAnomalyCheck(
RelativeRateOfChangeStrategy(maxRateIncrease = Some(1.5)), // 設定最大增長率為 1.5 (150%)
Size() // 對 Size 指標進行異常檢查
)
.run()

verificationResult.status match {
case CheckStatus.Success => println("data looks good")
case _ =>
val constraintResults = verificationResult.checkResults.flatMap(check => check._2) // 從檢查結果中提取約束結果
constraintResults
.filter(_.status != ConstraintStatus.Success) // 過濾出失敗的約束
.foreach { checkResult =>
println(s"${checkResult.status}") // 列印約束狀態
println(checkResult.message.getOrElse("")) // 列印錯誤訊息
}
}

範例 7.17

一旦運行,它將在玄貓的終端中列印以下輸出:

[info] for AnomalyConstraint(Size(None)) the check result was Failure
[info] Value: 4.0 does not meet the constraint requirement!

在本節中,玄貓研究了Deequ提供的異常檢測。還有各種其他選項,例如SimpleThresholdStrategyAbsoluteChangeStrategy等,以支援各種用例。

看圖說話:

此圖示清晰地展示了Deequ異常檢測的流程,特別是針對資料量變化的監測。在資料準備階段,玄貓首先創建了兩個DataFrameyesterdayDF代表昨日的產品資料,todayDF代表今日的產品資料。玄貓還初始化了一個InMemoryMetricsRepository來儲存指標,並為昨日和今日的資料分別創建了yesterdayKeytodayKey

接著,在異常檢測執行階段,玄貓分兩步進行。首先,玄貓對yesterdayDF運行VerificationSuite,使用InMemoryRepo儲存其Size()指標,並將結果與yesterdayKey關聯。這個步驟是為了建立一個基準。然後,玄貓對todayDF運行相同的VerificationSuite,同樣使用InMemoryRepo儲存其Size()指標,並與todayKey關聯。關鍵在於addAnomalyCheck方法,它配置了RelativeRateOfChangeStrategy,設定maxRateIncrease = Some(1.5),表示如果Size指標的增長率超過150%(即是原來的2.5倍),則視為異常。

最後,在結果處理階段,RunToday的結果會被MatchStatus判斷。如果todayDFSize指標相對於yesterdayDFSize指標增長超過150%,則CheckStatus將為Failure。此時,玄貓會過濾出失敗的約束,並列印出詳細的失敗訊息,明確指出Value: 4.0 does not meet the constraint requirement!,這表示今日的產品數量(4個)相對於昨日(2個)增長了100%,超過了設定的150%閾值,因此觸發了異常警報。這個流程有效地幫助玄貓自動監測資料量的異常波動,確保資料管道的穩定性。

縱觀本章對資料剖析與品質管理的探討,我們清晰地看見一條從「靜態驗證」邁向「動態監控」的演進路徑。Deequ 的核心價值不僅在於提供豐富的資料約束檢查,更在於其創新的 MetricsRepository 機制,它成功地為資料品質賦予了時間的維度,將單次的「資料健康檢查」提升為連續的「數據生命徵象追蹤」。

相較於傳統僅關注當下是否合規的無狀態(stateless)檢查,MetricsRepository 透過持久化指標,使我們得以分析趨勢、比較歷史,並實現如 RelativeRateOfChangeStrategy 這類更具智慧的異常檢測。然而,真正的挑戰在於將業務洞察轉化為精準的異常偵測策略——判斷何為「異常」而非「正常波動」,這考驗著團隊對資料脈絡的深層理解與持續優化的能力。

展望未來,這種自動化的資料品質監控將成為現代數據平台(Data Platform)的基礎設施。它不再是一個孤立的校驗步驟,而是深度整合至 DataOps 流程,成為保障資料可信度、觸發自動化修復、乃至提升決策品質的智慧中樞。

玄貓認為,導入這種具備歷史脈絡的資料品質監控框架,已非錦上添花的選項,而是高階管理者捍衛資料資產、確保其長期價值的核心基石,更是數據驅動型組織邁向成熟的關鍵一步。