Unity Catalog 提供資料血緣追蹤功能,讓開發者能清楚掌握資料在不同資料集之間的流動與轉換過程。藉由 Catalog Explorer 圖形化介面,可以直觀地檢視資料集與欄位之間的依賴關係,快速理解資料處理流程。此外,也能透過查詢系統表格,例如 system.access.table_lineage,以程式化的方式取得資料血緣資訊,方便整合至其他系統或進行自動化分析。對於複雜的資料處理流程,這項功能有助於識別潛在的資料品質問題和影響範圍,提升資料治理效率。更進一步,本文介紹如何使用 Terraform 將 DLT 管道定義為程式碼,實作基礎設施即程式碼(IaC)。透過 Terraform,可以自動化 DLT 管道的佈署、維護和管理,簡化操作流程,並提高佈署效率與可靠性。

使用 Unity Catalog 檢視資料血緣關係

您現在應該清楚地看到兩個上游表格如何連線形成 combined_table 表格。

圖 7.6 – 點選血緣圖上的連線連結可以生成血緣連線資訊

接下來,點選連線上游表格與下游表格 combined_table 的箭頭,以顯示更多關於血緣連線的詳細資訊。您會注意到側邊面板會開啟,顯示有關血緣連線的資訊,例如源和目標表格,但它還會顯示這些資料資產在 Databricks Data Intelligence 平台上的各種其他物件中的使用情況。例如,UI 面板將列出這些資料集目前如何在筆記本、工作流程、DLT 管道和 DBSQL 查詢中被利用。在這種情況下,我們只使用資料生成器筆記本生成了這些表格,因此它是血緣連線資訊中列出的唯一物件。

圖 7.7 – 可以從血緣圖檢視資料集之間的連線詳細資訊

也可以使用目錄資源管理器追蹤列的血緣關係。在同一個血緣圖中,點選 combined_table 表格中的不同列以顯示血緣資訊。例如,透過點選 description 表格列,血緣圖將被更新,以清晰地顯示 description 列是如何計算的。在這種情況下,該列是透過將文字字串與父表格中的 category 列以及子表格中的藝術家名稱串聯起來計算的。

圖 7.8 – 可以透過點選列來顯示上游血緣連線,從而追蹤列的血緣關係

正如您所見,從目錄資源管理器生成血緣圖提供了 Unity Catalog 中資料集之間最新關係的準確快照。這些關係可以幫助我們識別資料更改對下游依賴項的影響,例如更改列的資料型別或刪除資料集等。

在下一節中,我們將瞭解資料血緣關係如何幫助我們識別資料集之間的關係,發現利用這些資料集的相關筆記本,並避免在組織中引入破壞性更改。

識別依賴關係和影響

在本文中,我們將再次利用目錄資源管理器中的血緣圖 UI,更好地瞭解更改特定列的資料型別和值將如何影響下游資料集和 Databricks 工作區中的下游流程(如筆記本和工作流程)。

首先,讓我們在 Databricks 工作區中建立一個新的筆記本,其中將包含新的 DLT 管道的定義。我們的 DLT 管道中的第一個資料集將攝取儲存在預設 Databricks 檔案系統(DBFS)下的 /databricks-datasets 目錄中的商業航空公司航班資訊原始 CSV 檔案。每個 Databricks 工作區都將有權存取此資料集。

建立一個新的筆記本單元格,並新增以下程式碼片段,用於定義我們資料管道中的銅表:

import dlt

@dlt.table(
    name="commercial_airliner_flights_bronze",
    comment="The commercial airliner flight data dataset located in `/databricks-datasets/`"
)
def commercial_airliner_flights_bronze():
    path = "/databricks-datasets/airlines/"
    return (spark.readStream
            .format("csv")
            .schema(schema)
            .option("header", True)
            .load(path))

內容解密:

此段程式碼定義了一個名為 commercial_airliner_flights_bronze 的 DLT 表格,用於讀取儲存在 /databricks-datasets/airlines/ 目錄下的 CSV 檔案。該表格使用 spark.readStream 方法以流式讀取 CSV 檔案,並指定了 schema 和 header 選項。這裡使用了 @dlt.table 裝飾器來定義 DLT 表格。

我們希望透過新增有關商業航空公司飛機的資訊來增強航班資料。建立一個新的筆記本單元格,並新增以下程式碼片段,該程式碼片段定義了一個靜態參考表格,其中包含有關流行商業航空公司飛機的資訊,包括製造商名稱、飛機型號、原產國和燃油容量等:

commercial_airliners = [
    ("Airbus A220", "Canada", 2, 2013, 2016, 287, 287, 5790),
    ("Airbus A330neo", "Multinational", 2, 2017, 2018, 123, 123, 36744),
    ("Airbus A350 XWB", "Multinational", 2, 2013, 2014, 557, 556, 44000),
    ("Antonov An-148/An-158", "Ukraine", 2, 2004, 2009, 37, 8, 98567),
    ("Boeing 737", "United States", 2, 1967, 1968, 11513, 7649, 6875),
    ("Boeing 767", "United States", 2, 1981, 1982, 1283, 764, 23980),
    ("Boeing 777", "United States", 2, 1994, 1995, 1713, 1483, 47890),
    ("Boeing 787 Dreamliner", "United States", 2, 2009, 2011, 1072, 1069, 33340),
    ("Embraer E-Jet family", "Brazil", 2, 2002, 2004, 1671, 1443, 3071),
    ("Embraer E-Jet E2 family", "Brazil", 2, 2016, 2018, 81, 23, 3071)
]

commercial_airliners_schema = "jet_model string, Country_of_Origin string, Engines int, First_Flight int, Airline_Service_Entry int, Number_Built int, Currently_In_Service int, Fuel_Capacity int"

airliners_df = spark.createDataFrame(
    data=commercial_airliners,
    schema=commercial_airliners_schema
)

內容解密:

此段程式碼建立了一個包含商業航空公司飛機資訊的靜態 DataFrame。該 DataFrame 使用 spark.createDataFrame 方法建立,並指定了 schema 和資料。

接下來,我們將把航空公司飛機參考表格儲存到 Unity Catalog 中先前建立的 schema:

airliners_table_name = f"{catalog_name}.{schema_name}.{table_name}"

(airliners_df.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", True)
 .saveAsTable(airliners_table_name))

內容解密:

此段程式碼將航空公司飛機參考表格儲存到 Unity Catalog 中的指定表格名稱。使用 saveAsTable 方法以 Delta Lake 的格式儲存 DataFrame,並指定了 overwrite 和 mergeSchema 的選項。

讓我們在資料管道中新增另一步驟,將靜態的商業航空公司飛機參考表格與航空公司航班資料流進行連線。在新的筆記本單元格中,建立以下使用者定義函式(UDF),該函式將為商業航空公司資料集中的每個條目生成尾號:

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

@udf(returnType=StringType())
def generate_jet_model():
    import random
    # UDF 的實作細節

內容解密:

此段程式碼定義了一個名為 generate_jet_model 的 UDF,用於生成尾號。該 UDF 使用 @udf 裝飾器定義,並指定了傳回型別為 StringType。具體實作細節待補充。

使用 Unity Catalog 檢視資料血統

在 Databricks Data Intelligence 平台上,資料血統(Data Lineage)是一個重要的功能,它能夠幫助我們追蹤資料在不同資料集之間的流動和轉換。在本章中,我們將探討如何使用 Unity Catalog 來檢視資料血統。

建立 DLT 管道

首先,我們需要建立一個 DLT(Delta Live Tables)管道來處理商業航班資料。我們將建立一個新的筆記本,並在其中新增以下程式碼:

commercial_jets = [
    "Airbus A220",
    "Airbus A320",
    "Airbus A330",
    "Airbus A330neo",
    "Airbus A350 XWB",
    "Antonov An-148/An-158",
    "Boeing 737",
    "Boeing 767",
    "Boeing 777",
    "Boeing 787 Dreamliner",
    "Comac ARJ21 Xiangfeng",
    "Comac C919",
    "Embraer E-Jet family",
    "Embraer E-Jet E2 family",
    "Ilyushin Il-96",
    "Sukhoi Superjet SSJ100",
    "Tupolev Tu-204/Tu-214"
]

import random
def generate_jet_model():
    random_index = random.randint(0, 16)
    return commercial_jets[random_index]

@dlt.table(
    name="commercial_airliner_flights_silver",
    comment="The commercial airliner flight data augmented with randomly generated jet model and used fuel amount."
)
def commercial_airliner_flights_silver():
    return (dlt.read_stream("commercial_airliner_flights_bronze")
            .withColumn("jet_model", generate_jet_model())
            .join(spark.table(airliners_table_name), ["jet_model"], "left"))

內容解密:

這段程式碼定義了一個 DLT 表格 commercial_airliner_flights_silver,它讀取 commercial_airliner_flights_bronze 表格的資料,並增加了一個新的欄位 jet_model,該欄位的值是隨機從 commercial_jets 列表中選取的。然後,它將這個新的表格與 airliners_table_name 表格進行左連線。

建立計算碳足跡的筆記本

接下來,我們需要建立一個新的筆記本來計算商業航班的碳足跡。我們將在這個筆記本中新增以下程式碼:

def calc_carbon_footprint(fuel_consumed_gallons):
    # 3.1kg 的 CO2 是由每 1kg 的燃料燃燒產生的
    # 因此,我們將燃料品質乘以 3.1 來估計 CO2 的排放量
    # 來源:https://ecotree.green/en/calculate-flight-co2
    # 1 加侖的航空燃油大約重 3.03907 公斤
    return (fuel_consumed_gallons * 3.03907) * 3.1

內容解密:

這段程式碼定義了一個函式 calc_carbon_footprint,它根據燃油消耗量(以加侖為單位)計算碳足跡。該函式首先將燃油消耗量轉換為公斤,然後乘以 3.1 來估計 CO2 的排放量。

使用 Catalog Explorer 檢視資料血統

現在,我們可以使用 Catalog Explorer 來檢視 commercial_airliner_flights_silver 表格的資料血統。我們可以透過點選左側導航欄中的 Catalog Explorer,然後在搜尋框中輸入表格名稱來找到該表格。

此圖示顯示了 commercial_airliner_flights_silver 表格的欄位之間的依賴關係。

圖表翻譯: 此圖表顯示了 commercial_airliner_flights_silver 表格的欄位之間的依賴關係。我們可以看到哪些欄位是從其他表格中繼承而來的,以及哪些欄位是透過轉換或計算得到的。

使用系統表格查詢資料血統

我們還可以使用系統表格來查詢資料血統。例如,我們可以使用以下查詢來取得 commercial_airliner_flights_silver 表格的上游和下游依賴關係:

SELECT *
FROM system.access.table_lineage
WHERE source_table_name LIKE '%commercial_airliners_silver%';

內容解密:

這段 SQL 程式碼查詢了 system.access.table_lineage 系統表格,以取得 commercial_airliner_flights_silver 表格的上游和下游依賴關係。查詢結果將顯示該表格的所有上游和下游表格,以及相關的筆記本連線資訊。

使用 Terraform 佈署、維護和管理 DLT 管道

在 Databricks 中,Terraform 是一種流行的自動化工具,可以將資料管道表示為程式碼,即所謂的基礎設施即程式碼(IaC)。本章將探討如何使用 Terraform 設定本地開發環境,並將不同的資源佈署到 Databricks 工作區。接下來,我們將探討如何使用 Terraform 表示資料管道,以及如何設定 Delta Live Tables(DLT)管道的不同方面。

本章重點

  • 介紹 Databricks 的 Terraform 提供者
  • 設定本地環境
  • 使用 Terraform 設定 DLT 管道
  • 自動化 DLT 管道佈署
  • 實作練習 - 使用 VS Code 佈署 DLT 管道

技術需求

要遵循本章的範例,您需要具備在 Databricks 工作區中建立和啟動全用途叢集的許可權,以便匯入和執行本章附帶的筆記本。所有程式碼範例都可以從本章的 GitHub 儲存函式庫下載。

介紹 Databricks 的 Terraform 提供者

Terraform 是一種開源的佈署自動化工具,可以用於以可重複和可預測的方式自動化雲端基礎設施的佈署。Databricks 提供了一個 Terraform 提供者,用於將 Databricks 工作區和工作區物件佈署到主要的雲端提供者。

使用 Terraform 的優勢

  • 輕鬆在主要雲端提供者之間佈署基礎設施,使在雲端之間遷移變得簡單。
  • 透過專注於定義組態而不是手動佈署和維護資料管道,可以輕鬆擴充套件到數百個資料管道。
  • 管道定義簡潔,使雲端管理員能夠專注於表達應該更改什麼,而不是如何佈署基礎設施。

設定本地 Terraform 環境

在開始將資料管道物件佈署到 Databricks 工作區之前,需要安裝 Terraform 命令列介面(CLI)工具。可以從 HashiCorp 網站免費下載 Terraform CLI。

建立 Terraform 組態檔

首先,建立一個名為 chp8_databricks_terraform 的新目錄。在該目錄中,建立一個新的 Terraform 組態檔,命名為 main.tf。此檔案將用於定義資料管道和其他相關的工作區物件。

# main.tf
provider "databricks" {
  # 設定 Databricks 提供者
}

內容解密:

此程式碼定義了一個基本的 Terraform 組態檔,使用 Databricks 提供者。在 provider "databricks" 區塊中,您需要設定必要的引數以連線到 Databricks 工作區。

使用 Terraform 設定 DLT 管道

可以使用 Terraform 設定 DLT 管道。首先,需要定義一個 databricks_pipeline 資源。

# dlt_pipeline.tf
resource "databricks_pipeline" "this" {
  name = "我的 DLT 管道"
  # 其他設定...
}

內容解密:

此程式碼定義了一個 databricks_pipeline 資源,用於建立一個新的 DLT 管道。在 name 屬性中,指定了管道的名稱。您還可以設定其他屬性,例如 storage, configuration, 和 cluster

自動化 DLT 管道佈署

可以使用 Terraform 自動化 DLT 管道的佈署。首先,需要定義一個 databricks_pipeline 資源,然後使用 terraform apply 命令佈署管道。

# 佈署 DLT 管道
terraform apply

內容解密:

此命令用於佈署使用 Terraform 定義的基礎設施,包括 DLT 管道。Terraform 將根據組態檔中的定義建立或更新資源。