Delta Live Tables(DLT)簡化了資料轉換流程,提供資料品質控管、自動化資料譜系追蹤等功能。本文將探討 Unity Catalog 整合應用,示範如何建立目錄、分配許可權,確保資料安全。同時,文章也將深入 DLT 管道設定,包含產品版本選擇、執行模式設定以及叢集組態最佳化,並說明如何透過 %pip 安裝 Python 套件、從外部資源載入依賴。最後,文章將以實際案例示範如何使用 DLT 實作 SCD Type 2 變更,包含 Auto Loader 增量載入、資料轉換、外部資料函式庫整合等步驟,提供讀者更全面的 DLT 應用。
使用Delta Live Tables進行資料轉換的高階應用
建立集中式資料儲存位置
在建立新的DLT(Delta Live Tables)資料管道時,第一步是定義一個集中式的資料儲存位置。Unity Catalog提供了一個簡單的方法來建立新的目錄(Catalog)。可以透過多種方法來完成,例如使用Catalog Explorer UI、從筆記本中執行SQL陳述式,或使用Databricks REST API。
透過Databricks Catalog Explorer UI建立新目錄
- 首先,點選導航側邊欄中的Catalog Explorer標籤來導航到Catalog Explorer。
- 接下來,點選Create Catalog按鈕。
- 給目錄一個有意義的名稱。
- 選擇Standard作為目錄型別。
- 最後,點選Create按鈕來建立新的目錄。
分配目錄許可權
Unity Catalog的一個好處是預設情況下資料是安全的。換句話說,除非明確允許,否則儲存在Unity Catalog中的資料的存取許可權預設被拒絕。若要在新建立的目錄中建立新的表格,需要授予建立和操作新表格的許可權。
重要注意事項
如果您是目標目錄和架構物件的建立者和擁有者,同時也是DLT管道的建立者和擁有者,那麼您不需要執行以下GRANT陳述式。GRANT陳述式旨在展示在典型的Unity Catalog中繼資料儲存中跨多個群組和使用者共用資料資產所需的許可權型別。
首先,讓我們授予使用目錄的許可權。從一個新的筆記本中,執行以下SQL語法來授予使用新建立的目錄的許可權,其中my_user是Databricks使用者的名稱,而chp2_transforming_data是在前面的例子中建立的目錄的名稱:
%sql
GRANT USE CATALOG, CREATE SCHEMA ON CATALOG `chp2_transforming_data`
TO `my_user`;
接下來,我們需要建立一個將儲存DLT管道輸出資料集的架構。從同一個筆記本中,執行以下SQL陳述式來建立新的架構:
%sql
USE CATALOG `chp2_transforming_data`;
CREATE SCHEMA IF NOT EXISTS `ride_hailing`;
USE SCHEMA `ride_hailing`;
然後,執行以下陳述式來授予在新建立的架構中建立物化檢視的許可權:
%sql
GRANT USE SCHEMA, CREATE TABLE, CREATE MATERIALIZED VIEW ON SCHEMA
`ride_hailing` TO `my_user`;
到目前為止,您應該已經瞭解了Unity Catalog如何使對資料管道資料集應用一致的資料安全性變得簡單,從而為資料管理員提供了一系列選項來在其組織內強制實施資料集許可權。
資料管道設定
到目前為止,我們只討論瞭如何使用DLT框架來宣告表格、檢視和對到達資料的轉換。然而,執行特定資料管道的計算資源也在將最新資料落地到湖倉(lakehouse)中發揮了重要作用。在本文中,我們將討論不同的資料管道設定以及如何控制計算資源,如叢集,在執行時。
DLT產品版本
DLT產品版本告訴DLT框架您的資料管道將使用哪一組功能。較高的產品版本將包含更多的功能,因此Databricks將評估更高的價格(DBU)。
Databricks為DLT管道提供了三種型別的產品版本,按功能集排名,從最少功能到最多功能:
- Core:Core是基礎產品版本。這個產品版本適用於只向流式表格追加新資料的流式工作負載。資料預期(data quality enforcement)和應用變更資料擷取(change data capture)的工具在這個產品版本中不可用。
- Pro:Pro產品版本是Core之上的下一個版本。這個生產版本適用於向流式表格追加新資料並使用APPLY CHANGES命令應用更新和刪除的流式工作負載。然而,在這個產品版本中,資料品質預期不可用。
- Advanced:Advanced產品版本是最具特色的產品版本。在這個生產版本中,資料品質預期是可用的,同時支援向流式表格追加新資料以及應用在上游資料來源中發生的插入、更新和刪除。
管道執行模式
DLT提供了一種方式來通知系統對資料管道的更改是實驗性的。此功能稱為資料管道環境模式。有兩種可用的環境模式——開發(development)和生產(production)。主要區別在於計算資源的行為。
在開發環境模式下,如果遇到失敗,資料流任務不會自動重試。這允許資料工程師在臨時開發週期中干預並糾正任何程式錯誤。此外,在開發模式下發生故障時,執行資料管道更新的叢集將保持執行狀態。這允許資料工程師檢視叢集的日誌和指標,並且還可以防止每次管道執行時叢集重新組態和初始化叢集執行時的耗時過程,這取決於雲供應商,可能需要10到15分鐘才能完成。
設定環境模式
可以從DLT UI設定資料管道環境模式,方法是點選DLT UI中資料管道頂部導航列上的環境模式切換開關。
或者,也可以使用Databricks REST API設定環境模式。在下面的程式碼片段中,我們將使用Python requests函式庫向Databricks DLT pipelines REST API傳送PUT請求,以設定DLT管道的開發模式。請注意,端點URL將根據您的Databricks工作區佈署而更改,以下程式碼片段僅為示例:
import requests
response = requests.put(
# 請替換成您的工作區URL和管道ID
'https://your-workspace-url.cloud.databricks.com/api/2.0/pipelines/your-pipeline-id',
headers={'Authorization': 'Bearer your-access-token'},
json={'development': True}
)
內容解密:
此段程式碼使用了Python的requests函式庫向Databricks的REST API發送了一個PUT請求,以更新DLT管道的組態。請求的URL包含了工作區的URL和管道的ID,需要根據實際情況進行替換。請求頭中包含了授權資訊,使用了Bearer Token進行身份驗證。請求體是一個JSON物件,其中包含了要更新的組態項,在此例中是將development模式設定為True。
圖表翻譯:
此圖示展示了DLT管道執行模式的切換過程。在開發模式下,叢集會保持執行以便於除錯,而在生產模式下,則會根據設定的重試策略進行自動重試。
圖表翻譯: 此圖表呈現了DLT管道執行模式的主要區別。在開發模式下,系統會保持叢集執行,以便於開發者進行除錯和錯誤修正;而在生產模式下,系統則會根據設定的策略進行自動重試,以提高整體的可靠性和穩定性。
Delta Live Tables(DLT)組態與執行解析
DLT 是 Databricks Data Intelligence Platform 上的一個無版本產品功能。換言之,Databricks 管理著資料管道所使用的底層 Databricks Runtime(DBR)。此外,Databricks 會自動將資料管道叢集升級至最新的穩定執行版本。執行時升級非常重要,因為它們引入了錯誤修復、新效能功能和其他增強功能。
DLT 執行時升級與例外處理
DLT 管道有一個 Channel 設定,允許資料工程師選擇兩個通道選項之一:Current 和 Preview。Preview 通道允許資料工程師組態資料管道,以使用最新的實驗性執行時,其中包含新的效能功能和其他增強功能。然而,由於這是一個實驗性的執行時,因此不建議在生產環境中執行的資料管道使用 Preview 通道的 Databricks 執行時。相反,建議使用前者,即 Current,它選擇 Databricks 執行時的最新穩定版本。
生產模式下的 DLT 自動執行時升級例外處理
圖表翻譯: 此圖示展示了在生產模式下,DLT 如何處理執行時升級失敗的情況。當執行時升級失敗時,DLT 會嘗試降級至較低的執行時版本並重試執行資料管道。
管道叢集型別
每個資料管道將具有兩個相關聯的叢集:一個用於執行資料集更新,另一個用於執行表維護任務。這些叢集的設定在管道設定中使用 JSON 叢集組態定義來表達。
叢集組態範例
{
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
},
{
"label": "updates",
"node_type_id": "i4i.xlarge",
"driver_node_type_id": "i4i.2xlarge",
"spark_conf": {
"spark.sql.shuffle.partitions": "auto"
}
}
]
}
內容解密:
此 JSON 組態定義了兩個叢集組態:一個預設組態和一個更新組態。預設組態將應用於更新和維護 DLT 叢集,而更新組態僅應用於更新 DLT 叢集。在預設組態中,我們啟用了自動縮放,並指定了叢集的最小和最大工作節點數量。在更新組態中,我們指定了更新叢集的驅動程式和工作節點的例項型別,並啟用了 Auto-Optimized Shuffle(AOS)效能功能。
重點整理
- DLT 管道具有自動執行時升級功能,以確保使用最新的穩定版本。
- 可以透過 Channel 設定選擇 Current 或 Preview 通道。
- 生產環境下的 DLT 管道應使用 Current 通道,以確保穩定性。
- DLT 管道具有自動例外處理功能,可以在執行時升級失敗時降級至較低的執行時版本。
- 每個資料管道具有兩個相關聯的叢集:更新叢集和維護叢集。
- 叢集組態可以使用 JSON 定義,包括自動縮放、例項型別和 Spark 組態等。
使用Delta Live Tables進行資料轉換的高階應用
叢集組態與計算資源管理
在資料工程領域中,叢集組態是最佳化資料處理管線(Data Pipeline)效能的關鍵。Delta Live Tables(DLT)提供了靈活的叢集組態選項,允許資料工程師根據特定的工作負載進行調優,以獲得更好的效能。
傳統計算與無伺服器計算的比較
DLT支援兩種計算模式:傳統計算和無伺服器計算。傳統計算提供了對計算資源的最大控制權,但需要手動管理叢集的各個方面,如自動擴充套件行為、執行引擎選擇(Photon或Catalyst)和叢集標籤等。無伺服器計算則抽象了底層的叢集設定,由Databricks自動管理計算資源,提供了更快的叢集啟動速度和更高的安全性。
{
"clusters": [{
"label": "updates",
"node_type_id": "i4i.xlarge",
"driver_node_type_id": "i4i.2xlarge"
}]
}
無伺服器計算的優勢
無伺服器計算模式適合於需要快速反應處理需求變化的場景,並且可以減少基礎設施維護的負擔。此外,無伺服器執行還支援連續處理模式下的物化檢視更新。
載入外部依賴
DLT提供了多種方式來安裝執行時依賴,包括使用%pip命令、從工作區檔案或Databricks Repo載入模組,以及使用叢集初始化指令碼。
使用 %pip 命令安裝 Python 依賴
%pip install numpy pandas scikit-learn /Volumes/tradingutils/tech-analysis-utils-v1.whl
載入外部依賴的最佳實踐
- 將依賴安裝陳述式放在筆記本的最頂部,以便快速參照。
- 支援安裝Python函式庫,但不支援安裝JVM函式庫。
資料管線處理模式
DLT支援兩種資料管線處理模式:觸發式處理和連續處理。
觸發式處理模式
觸發式處理模式會更新管線中的資料集,然後立即終止叢集,釋放計算資源。這種模式適合於臨時任務或按計劃執行的任務。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Delta Live Tables 資料轉換進階應用
package "系統架構" {
package "前端層" {
component [使用者介面] as ui
component [API 客戶端] as client
}
package "後端層" {
component [API 服務] as api
component [業務邏輯] as logic
component [資料存取] as dao
}
package "資料層" {
database [主資料庫] as db
database [快取] as cache
}
}
ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取
note right of api
RESTful API
或 GraphQL
end note
@enduml圖表翻譯: 此圖示展示了觸發式處理模式的工作流程。當觸發事件發生時,系統會執行管線更新,然後終止叢集,釋放資源。
連續處理模式
連續處理模式則持續更新管線中的資料集,適合於需要實時資料處理的場景。
使用Delta Live Tables實作SCD Type 2變更的實作練習
本章節將結合目前所學的知識,建立一個DLT資料管線,套用SCD Type 2變更至下游資料集。
練習目標
- 使用Databricks Auto Loader增量載入寫入雲端儲存帳戶原始登入區的JSON檔案。
- 對下游欄位進行轉換,並與外部Postgres資料函式庫的資料進行合併。
實作步驟
Step 1:產生模擬的計程車行程資料
首先,需要從本章的GitHub倉函式庫中複製相關的筆記本。執行Generate Mock Taxi Trip Data筆記本,以產生模擬的計程車行程資料,並將其儲存為多個JSON檔案於雲端儲存帳戶。
Step 2:建立DLT資料管線定義
建立一個新的筆記本,例如命名為Taxi Trips DLT Pipeline。匯入DLT Python模組以定義資料集和依賴關係。
import dlt
import pyspark.sql.functions as F
Step 3:定義串流資料表
使用cloudFiles資料來源監聽原始登入區的新檔案事件,建立一個串流資料表來擷取計程車行程JSON資料。
SCHEMA_LOCATION = "/tmp/chp_02/taxi_data_chkpnt"
RAW_DATA_LOCATION = "/tmp/chp_02/taxi_data/"
@dlt.table(
name="raw_taxi_trip_data",
comment="Raw taxi trip data generated by the data generator notebook"
)
def raw_taxi_trip_data():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", SCHEMA_LOCATION)
.load(RAW_DATA_LOCATION)
)
#### 內容解密:
spark.readStream.format("cloudFiles"):使用Auto Loader讀取雲端儲存中的檔案。.option("cloudFiles.format", "json"):指定讀取的檔案格式為JSON。.option("cloudFiles.schemaLocation", SCHEMA_LOCATION):指定schema的儲存位置,用於追蹤schema變更。.load(RAW_DATA_LOCATION):載入指定路徑下的JSON檔案。
Step 4:套用SCD Type 2變更
定義一個新的串流資料表taxi_trip_data_merged,並使用apply_changes()函式套用SCD Type 2變更。
dlt.create_streaming_table("taxi_trip_data_merged")
dlt.apply_changes(
target="taxi_trip_data_merged",
source="raw_taxi_trip_data",
keys=["trip_id"],
sequence_by=F.col("sequence_num"),
apply_as_deletes=F.expr("op_type = 'D'"),
except_column_list=["op_type", "op_date", "sequence_num"],
stored_as_scd_type=2
)
#### 內容解密:
dlt.create_streaming_table("taxi_trip_data_merged"):建立一個新的串流資料表。dlt.apply_changes():套用變更至目標資料表。target="taxi_trip_data_merged":指定目標資料表。source="raw_taxi_trip_data":指定來源資料表。keys=["trip_id"]:指定用於識別記錄的主鍵。sequence_by=F.col("sequence_num"):指定用於排序的欄位。apply_as_deletes=F.expr("op_type = 'D'"):指定刪除條件。except_column_list=["op_type", "op_date", "sequence_num"]:指定要排除的欄位。stored_as_scd_type=2:指定SCD的型別為Type 2。
Step 5:轉換和豐富資料
對上游資料表的欄位進行轉換,例如四捨五入浮點數型別的欄位,並將trip_distance欄位拆分為英里和公里兩個欄位。同時,連線到遠端的Postgres資料函式庫,讀取最新的計程車司機資訊。
@dlt.table(
name="raw_driver_data",
comment="Dataset containing info about the taxi drivers"
)
def raw_driver_data():
postgresdb_url = f"jdbc:postgresql://{POSTGRES_HOSTNAME}:{POSTGRES_PORT}/{POSTGRES_DB}"
conn_props = {
"user": POSTGRES_USERNAME,
# 其他連線屬性
}
# 讀取Postgres資料函式庫中的資料
#### 內容解密:
@dlt.table():定義一個新的資料表。postgresdb_url:Postgres資料函式庫的連線URL。conn_props:連線屬性,包括使用者名稱等。