Delta Live Tables(DLT)簡化了資料管線的建置和管理,同時也提供了強大的資料品質管理功能。透過 DLT,我們可以定義資料驗證規則,並在資料處理過程中即時檢查資料是否符合預期。這有助於及早發現和處理資料品質問題,避免不良資料影響下游應用。DLT 提供了多種處理失敗預期的方式,例如警告、丟棄或使管線失敗,可以根據不同的需求選擇合適的策略。此外,DLT 還支援套用多個資料品質預期,並可以將資料驗證規則與 DLT 管道分離,提高程式碼的可維護性和靈活性。
管理資料品質:使用 Delta Live Tables
建立全用途叢集並執行計程車行程資料產生器筆記本
首先,建立一個全用途叢集來執行計程車行程資料產生器筆記本。一旦全用途叢集建立完成,請導航到新的筆記本,並點選 Databricks Data Intelligence Platform 頂部導航列中的叢集下拉式選單。選擇您建立的叢集名稱,並選擇「Attach」以將計程車行程資料產生器筆記本附加到叢集並執行所有儲存格。
計程車行程資料產生器將會在 DBFS 位置附加數個新的 JSON 檔案,其中包含隨機產生的行程資料。
建立新的 DLT 管道定義
現在我們已經產生了新的資料,讓我們建立另一個新的筆記本用於我們的 DLT 管道定義。導航到側邊欄的工作區標籤,深入到您的使用者主目錄,並透過右鍵點選並選擇「Add Notebook」來建立新的筆記本。
為新的筆記本取一個有意義的名稱,例如「第 3 章 - 強制資料品質」。
首先,匯入 DLT Python 模組以及 PySpark 函式:
import dlt
from pyspark.sql.functions import *
定義 bronze 表:yellow_taxi_raw
接下來,讓我們定義一個 bronze 表,yellow_taxi_raw,它將攝入由我們的計程車行程資料產生器寫入 DBFS 位置的計程車行程資料:
@dlt.table(
comment="隨機產生的計程車行程資料集"
)
def yellow_taxi_raw():
path = "/tmp/chp_03/taxi_data"
schema = "trip_id INT, taxi_number INT, passenger_count INT, trip_amount FLOAT, trip_distance FLOAT, trip_date DATE"
return (spark.readStream
.schema(schema)
.format("json")
.load(path))
內容解密:
此段程式碼定義了一個 bronze 表 yellow_taxi_raw,用於讀取計程車行程資料。@dlt.table 是 DLT 的函式註解,用於宣告此函式為一個 streaming 表。spark.readStream 用於讀取 streaming 資料,.schema(schema) 指定了資料的 schema,.format("json") 指定了資料的格式為 JSON,.load(path) 指定了資料的路徑。
定義 silver 表:trip_data_financials
對於我們資料管線的下一層,組織內的利害關係人要求我們提供一種方式,以便他們的業務能夠對傳入的行程資料進行即時財務分析。因此,讓我們新增一個 silver 表,它將轉換傳入的行程資料流,計算我們的計程車公司 Yellow Taxi Corporation 的預期利潤和損失。
讓我們定義我們的 silver 表定義,trip_data_financials。表定義的開始就像任何正常的 streaming 表定義一樣。我們首先定義一個傳回 streaming 表的 Python 函式。接下來,我們使用 DLT 函式註解來宣告此函式為一個 streaming 表,具有可選的名稱 trip_data_financials,以及帶有描述性文字的註解。
@dlt.table(name="trip_data_financials",
comment="來自傳入計程車行程的財務資訊")
@dlt.expect("valid_total_amount", "trip_amount > 0.0")
def trip_data_financials():
return (dlt.readStream("yellow_taxi_raw")
.withColumn("driver_payment", expr("trip_amount * 0.40"))
.withColumn("vehicle_maintenance_fee", expr("trip_amount * 0.05"))
.withColumn("adminstrative_fee", expr("trip_amount * 0.1"))
.withColumn("potential_profits", expr("trip_amount * 0.45")))
內容解密:
此段程式碼定義了一個 silver 表 trip_data_financials,用於轉換 bronze 表 yellow_taxi_raw 中的資料。@dlt.expect 是 DLT 的函式註解,用於強制資料品質限制。在此例中,我們要求 trip_amount 必須大於 0。dlt.readStream 用於讀取 bronze 表中的 streaming 資料,.withColumn 用於新增欄位,expr 用於計算欄位的值。
執行資料管線
讓我們從筆記本中的資料集宣告建立新的資料管線。執行筆記本儲存格,並確保沒有語法錯誤。接下來,Databricks Data Intelligence Platform 將提示您建立新的資料管線。點選「Create pipeline」按鈕以建立新的 DLT 資料管線。
在「Destination settings」下,選擇 Unity Catalog 中的目錄和架構,以儲存管線資料集。在「Compute settings」下,將「Min workers」設為 1,將「Max workers」設為 2。點選「Create」按鈕以接受預設值。最後,點選「Start」按鈕以執行資料管線。
您將被帶到資料流圖的視覺化表示。
圖表翻譯:
此圖示呈現了我們的 NYC Yellow Taxi Corp. 管線的資料流圖。DLT 系統將首先建立並初始化新的 Databricks 叢集,並開始將筆記本中的資料集定義解析為資料流圖。如您所見,DLT 系統將從我們的 DBFS 位置攝入原始行程資料檔案到 streaming 表 yellow_taxi_raw 中。接下來,系統檢測到我們的 silver 表 trip_data_financials 的依賴關係,並立即開始計算我們的 silver 表中的其他四個欄位。在此過程中,我們的資料品質限制正在即時評估傳入的資料。
點選 silver 表,DLT UI 將在右側展開一個窗格,總結 silver 表。點選「Data quality」標籤以檢視資料品質指標。請注意,圖形正在即時更新,因為我們的資料正在被處理。在被資料管線處理的所有資料中,您將注意到大約 10% 的資料未能透過 valid_total_amount 期望 - 這是預期的。資料產生器筆記本將特意將具有負總金額的記錄釋出到我們的雲端儲存位置。我們可以輕鬆地檢視有多少資料正在根據我們定義的資料品質標準進行驗證,以及有多少資料未透過驗證。
管理資料品質:Delta Live Tables 的應用
在前面的章節中,我們已經成功地在 Delta Live Tables(DLT)中編寫了第一個資料品質約束條件。透過幾行程式碼,我們能夠對輸入的資料強制執行資料品質約束,並即時監控資料品質。這為資料工程團隊提供了對資料管線的更多控制權。
對失敗預期的處理
當特定記錄違反了在 DLT 資料集上定義的資料約束時,DLT 可以採取三種型別的動作:
- Warn(警告):當 DLT 遇到表示式違規時,該記錄將被記錄為一個指標,並繼續寫入下游目標資料集。
- Drop(丟棄):當 DLT 遇到表示式違規時,該記錄將被記錄為一個指標,並被阻止進入下游目標資料集。
- Fail(失敗):當 DLT 遇到表示式違規時,整個管線更新將失敗,直到資料工程團隊成員調查並糾正資料違規或可能的資料損壞。
選擇適當的動作取決於個別使用案例以及如何處理不符合資料品質規則的資料。例如,在某些情況下,資料可能不符合定義的資料品質約束,但將違規行記錄在 DLT 系統中並監控資料品質可能符合特定使用案例的要求。另一方面,在某些場景中,特定的資料品質約束必須得到滿足,否則輸入的資料將破壞下游流程。在這種情況下,更積極的動作,如使資料管線執行失敗並回復交易,是適當的行為。
實作範例:因不良資料品質而使管線執行失敗
在某些情況下,您可能希望立即停止資料管線更新的執行,以干預並糾正資料。例如,在 DLT 預期中使用 @dlt.expect_or_fail() 函式裝飾器,可以立即使資料管線執行失敗。
如果操作是表格更新,則交易將立即回復,以防止不良資料的汙染。此外,DLT 將跟蹤有關已處理記錄的其他後設資料,以便資料工程團隊能夠精確地定位資料集中導致失敗的記錄。
更新範例:使 Yellow Taxi Corporation 資料管線失敗
假設我們要更新早期的 Yellow Taxi Corporation 資料管線範例。在這種情況下,負的總金額將破壞下游財務報告。因此,我們希望使管線執行失敗,以便我們的資料工程團隊能夠調查資料中的潛在問題並採取適當的行動,例如手動糾正資料。
在 Delta Live Tables 框架中,調整資料管線的行為就像更新銀級表格定義的函式裝飾器一樣簡單。讓我們更新預期,使用 expect_or_fail 動作:
@dlt.expect_or_fail("valid_total_amount", "trip_amount > 0.0")
完整的銀級表格 trip_data_financials 的定義應如下所示:
@dlt.table(
name="trip_data_financials",
comment="Financial information from completed taxi trips."
)
@dlt.expect_or_fail("valid_total_amount", "trip_amount > 0.0")
def trip_data_financials():
return (
dlt.readStream("yellow_taxi_raw")
.withColumn("driver_payment", expr("trip_amount*0.40"))
.withColumn("vehicle_maintenance_fee", expr("trip_amount*0.05"))
.withColumn("adminstrative_fee", expr("trip_amount*0.1"))
.withColumn("potential_profits", expr("trip_amount*0.45"))
)
程式碼解密:
@dlt.table裝飾器用於定義一個新的 DLT 表格trip_data_financials。@dlt.expect_or_fail裝飾器用於定義一個預期條件valid_total_amount,要求trip_amount必須大於 0.0。如果條件不滿足,則使管線執行失敗。dlt.readStream("yellow_taxi_raw")用於從原始資料來源讀取資料流。.withColumn方法用於新增計算欄位,例如driver_payment、vehicle_maintenance_fee、adminstrative_fee和potential_profits。
套用多個資料品質預期
有時,資料集作者可能希望對資料集的每一行套用多個業務規則或資料品質約束。在這種情況下,DLT 提供了一組特殊的函式裝飾器,用於指定多個資料品質約束定義。
@dlt.expect_all()函式裝飾器可用於組合多個資料品質約束,用於特定的資料集。expect_all_or_drop()可用於指定當輸入資料不符合所有資料品質約束條件時,將其丟棄,不進入目標表格。expect_all_or_fail()將在任何資料品質約束條件不滿足時使管線執行失敗。
圖表翻譯:
此圖示展示了 DLT 如何根據不同的預期條件處理輸入資料,包括 Warn、Drop 和 Fail 三種動作,以及如何套用多個預期條件。DLT 提供了一種靈活的方式來管理和控制資料品質。
透過這些功能,DLT 為資料工程團隊提供了強大的工具,以確保資料品質並滿足業務需求。無論是即時監控還是主動干預,DLT 都能幫助團隊更好地管理和控制資料管線。
使用 Delta Live Tables 管理資料品質
透過 Delta Live Tables 實作資料驗證
讓我們來看看如何丟棄無效的計程車行程資料,以避免進入下游資料集。當資料值不符合驗證標準時,我們可以使用 Delta Live Tables (DLT) 的 expectations 功能來實作資料品質管理。
assertions = {
"total_amount_constraint": "trip_amount > 0.0",
"passenger_count": "passenger_count >= 1"
}
@dlt.table(
name="yellow_taxi_validated",
comment="已驗證的計程車行程資料資料集"
)
@dlt.expect_all_or_drop(assertions)
def yellow_taxi_validated():
return (
dlt.readStream("yellow_taxi_raw")
.withColumn("nyc_congestion_tax", expr("trip_amount * 0.05"))
)
內容解密:
- 定義資料驗證規則:使用 Python 字典
assertions定義了兩個驗證規則,分別檢查trip_amount是否大於 0.0 和passenger_count是否大於或等於 1。 @dlt.expect_all_or_drop裝飾器:這個裝飾器會丟棄不符合assertions中定義的所有驗證規則的資料記錄。yellow_taxi_validated函式:這個函式讀取yellow_taxi_raw資料流,並新增一個名為nyc_congestion_tax的欄位,該欄位的值是trip_amount的 5%。
將資料驗證規則與 DLT 管道分離
在某些情況下,我們希望將資料驗證規則與資料管道定義分開,以便非技術人員可以透過 UI 管理資料品質規則。
建立資料品質規則表
CREATE TABLE IF NOT EXISTS <catalog_name>.<schema_name>.data_quality_rules
(rule_name STRING, rule_expression STRING, dataset_name STRING)
USING DELTA
插入資料品質規則
INSERT INTO data_quality_rules
VALUES
('valid_total_amount', 'trip_amount > 0.0', 'yellow_taxi_raw'),
('valid_passenger_count', 'passenger_count > 0', 'yellow_taxi_raw');
編譯資料品質規則
def compile_data_quality_rules(rules_table_name, dataset_name):
"""讀取 data_quality_rules 表格並轉換為 DLT Expectation 可解釋的格式"""
rules = spark.sql(f"""SELECT * FROM {rules_table_name} WHERE dataset_name='{dataset_name}'""").collect()
rules_dict = {}
if len(rules) == 0:
raise Exception(f"No rules found for dataset '{dataset_name}'")
for rule in rules:
rules_dict[rule.rule_name] = rule.rule_expression
return rules_dict
在 DLT 管道中使用動態載入的資料品質規則
RULES_TABLE = "<catalog_name>.<schema_name>.data_quality_rules"
DATASET_NAME = "yellow_taxi_raw"
@dlt.table(
comment="隨機生成的計程車行程資料"
)
def yellow_taxi_raw():
path = "/tmp/chp_03/taxi_data"
schema = "trip_id INT, taxi_number INT, passenger_count INT, trip_amount FLOAT, trip_distance FLOAT, trip_date DATE"
return (spark.readStream.schema(schema).format("json").load(path))
@dlt.table(
name="yellow_taxi_validated",
comment="已驗證的計程車行程資料資料集"
)
@dlt.expect_all(compile_data_quality_rules(RULES_TABLE, DATASET_NAME))
def yellow_taxi_validated():
return (
dlt.readStream("yellow_taxi_raw")
.withColumn("nyc_congestion_tax", expr("trip_amount * 0.05"))
)
內容解密:
- 建立資料品質規則表:建立一個 Delta 表格
data_quality_rules用於儲存資料品質規則。 - 插入規則:向
data_quality_rules表格中插入具體的資料品質規則。 compile_data_quality_rules函式:這個函式根據指定的dataset_name從data_quality_rules表格中讀取規則,並轉換為 DLT Expectation 可用的格式。- 在 DLT 管道中使用動態規則:透過呼叫
compile_data_quality_rules函式,動態載入資料品質規則並應用於yellow_taxi_validated資料集。
手把手練習 - 將無效資料隔離以進行修正
在這個練習中,我們將建立一個條件式資料流,將不符合資料品質要求的資料隔離,以便後續處理或報告。
import dlt
from pyspark.sql.functions import *
@dlt.table(
name="yellow_taxi_raw",
comment="原始計程車行程資料集"
)
def yellow_taxi_raw():
path = "/tmp/chp_03/taxi_data"
schema = "trip_id INT, taxi_number INT, passenger_count INT, trip_amount FLOAT, trip_distance FLOAT, trip_date DATE"
return (spark.readStream.schema(schema).format("json").load(path))
data_quality_rules = {
"total_amount_assertion": "trip_amount > 0.0",
"passenger_count": "passenger_count >= 1"
}
@dlt.table(
name="yellow_taxi_validated",
comment="應用資料品質規則的驗證表格"
)
@dlt.expect_all_or_drop(data_quality_rules)
def yellow_taxi_validated():
return (
dlt.readStream("yellow_taxi_raw")
.withColumn("nyc_congestion_tax", expr("trip_amount * 0.05"))
)
內容解密:
- 定義原始資料集:定義了一個名為
yellow_taxi_raw的原始資料集,用於讀取 JSON 格式的計程車行程資料。 - 定義資料品質規則:定義了兩個資料品質規則,分別檢查
trip_amount和passenger_count。 - 應用資料品質規則:使用
@dlt.expect_all_or_drop裝飾器,將不符合資料品質規則的資料記錄丟棄。