在資料工程中,資料驗證是確保資料品質的關鍵步驟。本文將介紹 Spark 提供的三種資料驗證模式:PERMISSIVEDROPMALFORMEDFAILFAST,並說明如何使用它們處理格式不正確或損壞的資料。此外,我們也會探討結構描述(Schema)的應用,包含如何定義、使用及維護結構描述,以確保資料符合預期的格式和型別,進而提升資料處理效率及資料分析的準確性。透過結構描述的建立與應用,能有效提升資料處理的可靠度,避免因為資料格式錯誤造成後續流程的錯誤。理解和正確運用這些驗證機制和工具,能有效提升資料管線的穩定性和資料分析的可靠性。

資料驗證的重要性與實務應用

在資料處理的過程中,資料驗證是一個至關重要的步驟。良好的資料驗證機制可以確保資料的品質和可靠性,從而提高資料分析的準確性和效率。本文將探討資料驗證的不同模式、結構描述(Schemas)的應用,以及如何建立和維護結構描述。

資料驗證模式

在處理資料時,經常會遇到格式不正確或損壞的資料。Apache Spark 提供了多種模式來處理這種情況,包括 PERMISSIVEDROPMALFORMEDFAILFAST

  • PERMISSIVE 模式:此模式允許讀取損壞的資料,並將損壞的記錄放入一個特定的欄位中。這種模式適合於需要檢查和除錯損壞記錄的情況。
    # 使用 PERMISSIVE 模式讀取資料
    df = spark.read.option("mode", "PERMISSIVE").json("data.json")
    

內容解密:

  • 這段程式碼展示瞭如何使用 PERMISSIVE 模式讀取 JSON 檔案。

  • PERMISSIVE 模式會嘗試讀取儘可能多的資料,並將無法解析的記錄標記為損壞。

  • 這種模式對於需要檢查和除錯損壞記錄的情況非常有用。

  • DROPMALFORMED 模式:此模式會丟棄損壞的記錄,只保留正確的資料。

    # 使用 DROPMALFORMED 模式讀取資料
    df = spark.read.option("mode", "DROPMALFORMED").json("data.json")
    

內容解密:

  • 這段程式碼展示瞭如何使用 DROPMALFORMED 模式讀取 JSON 檔案。

  • DROPMALFORMED 模式會丟棄無法解析的記錄,只保留正確的資料。

  • 這種模式適合於不需要處理損壞記錄的情況。

  • FAILFAST 模式:此模式在遇到損壞的記錄時立即丟擲異常,停止資料讀取。

    # 使用 FAILFAST 模式讀取資料
    df = spark.read.option("mode", "FAILFAST").json("data.json")
    

內容解密:

  • 這段程式碼展示瞭如何使用 FAILFAST 模式讀取 JSON 檔案。
  • FAILFAST 模式在遇到無法解析的記錄時立即丟擲異常。
  • 這種模式適合於對資料品質要求非常高的情況。

結構描述(Schemas)的應用

結構描述是用於定義資料的結構和約束條件的重要工具。它可以幫助進行資料驗證,確保資料符合預期的格式和型別。

結構描述的作用

  1. 資料型別檢查:結構描述可以檢查資料欄位的型別是否正確。
  2. 必填欄位檢查:可以定義哪些欄位是必填的,從而檢查資料是否完整。
  3. 限制資料範圍:透過結構描述,可以限制資料的範圍,例如數值範圍或字串長度。

如何建立結構描述

建立結構描述的方法有多種,可以根據具體的需求選擇合適的方法。

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType

# 定義結構描述
schema = StructType([
    StructField("user", StringType(), nullable=False),
    StructField("location", ArrayType(StringType()), nullable=False),
    StructField("image_files", StringType(), nullable=True),
    StructField("description", StringType(), nullable=True),
    StructField("count", IntegerType(), nullable=True)
])

# 使用結構描述讀取資料
df = spark.read.schema(schema).json("data.json")

內容解密:

  • 這段程式碼展示瞭如何定義一個結構描述,並使用它來讀取 JSON 檔案。
  • 結構描述定義了每個欄位的名稱、型別和是否可為空。
  • 使用結構描述可以確保讀取的資料符合預期的結構和約束條件。

結構描述的維護

結構描述需要根據資料源的變化進行更新,以確保其準確性和有效性。以下是一些維護結構描述的最佳實踐:

  1. 自動生成結構描述:可以根據樣本資料自動生成初始的結構描述。
  2. 定期更新結構描述:隨著資料源的變化,定期檢查和更新結構描述。
  3. 與資料生產者協調:與資料的生產者保持溝通,及時瞭解資料結構的變化。

資料驗證的重要性與實作方法

在資料處理流程中,資料驗證是一個至關重要的步驟。它確保了資料的正確性、完整性和一致性,從而避免了下游處理的錯誤和異常。本文將介紹使用Spark和JSON Schema進行資料驗證的方法。

使用Spark進行資料驗證

Spark提供了強大的資料處理能力,同時也支援資料驗證。我們可以透過定義Schema來驗證資料是否符合預期。

定義Schema

首先,我們需要定義一個Schema來描述資料的結構。下面是一個使用Spark定義Schema的例子:

df = (spark
.read
.option("inferSchema", True)
.json("initial_source_data.json"))
source_schema = df.schema

輸出的Schema如下:

StructType(
    [StructField("count", LongType(),True),
     StructField("description", StringType(),True),
     StructField("user", StringType(),False),
     StructField("img_files", ArrayType(StringType(),True),True),
     StructField("location", ArrayType(StringType(),True),False)]
)

修改Schema以進行驗證

如果我們希望確保某些欄位不為空,可以修改Schema中的nullable屬性。例如,將description欄位的nullable屬性設為False

StructField("description", StringType(),False)

比較推斷的Schema與預期的Schema

當讀取新的資料時,我們可以比較推斷的Schema與預期的Schema是否一致。如果不一致,則表示資料結構發生了變化。

df = (spark
.read
.option("inferSchema", True)
.json('string_location.json'))
inferred_schema = df.schema
inferred_schema == source_schema

如果比較結果為False,則表示Schema不匹配。我們可以進一步檢查哪些欄位不匹配:

source_info = {f.name: f for f in source_schema.fields}
for f in inferred_schema.fields:
    if f.name not in source_info.keys():
        print(f"New field in data source {f}")
    elif f != source_info[f.name]:
        source_field = source_info[f.name]
        print(f"Field mismatch for {f.name} Source schema: {source_field}, Inferred schema: {f}")

輸出結果如下:

Field mismatch for location
Source schema: StructField(location,ArrayType(StringType,true),true),
Inferred schema: StructField(location,StringType,true)

使用JSON Schema進行資料驗證

JSON Schema是一種用於描述JSON資料結構的規範。我們可以使用JSON Schema來驗證資料是否符合預期。

生成JSON Schema

可以使用線上工具或手動生成JSON Schema。下面是一個生成的JSON Schema例子:

{
    "type": "object",
    "properties": {
        "location": {
            "type": "array",
            "minItems": 2,
            "items": {"type": "string"}
        }
    }
}

使用JSON Schema進行驗證

使用Python的jsonschema函式庫可以對資料進行驗證:

from jsonschema import validate

validate(short_location, updated_schema)

如果資料不符合Schema,則會丟擲ValidationError異常。

內容解密:

  1. 定義和使用Schema:文中介紹瞭如何使用Spark和JSON Schema來定義資料結構,並用於資料驗證。
  2. 修改Schema屬性:展示瞭如何修改Spark Schema中的nullable屬性,以及如何使用JSON Schema的minItems屬性來限制陣列長度。
  3. 比較Schema:演示瞭如何比較推斷的Schema與預期的Schema,以檢測資料結構的變化。
  4. JSON Schema驗證:介紹瞭如何使用JSON Schema對JSON資料進行驗證,並給出了具體的錯誤示例。

進一步改進方向

  1. 自動化Schema維護:文中提到了自動化Schema維護的重要性,可以考慮使用自動化工具或流程來更新和維護Schema。
  2. 擴充套件驗證邏輯:除了基本的Schema驗證外,還可以考慮新增更多的驗證邏輯,例如業務規則驗證等。

經濟高效的資料管線基礎

在資料工程領域,確保資料管線的穩定性和可靠性至關重要。隨著雲端基礎設施、第三方服務和分散式軟體開發的複雜性增加,資料管線面臨著多種潛在的失敗機制。無論是源資料或管執行緒式碼的變更、錯誤的憑證、資源爭用還是雲端服務的可用性問題,都可能導致資料停機或資料品質問題。

架構設計的重要性

為瞭解決這些問題,我們需要建立具有冪等性(Idempotency)的資料管線。冪等性是指無論執行多少次相同的操作,其結果都保持一致。這對於支援重試機制和避免資料重複至關重要。

批次處理的冪等性

在批次處理中,可以透過刪除-寫入(delete-write)和資料函式庫交易來實作冪等性。這些方法確保批次要麼完全處理,要麼完全不處理,從而提供乾淨的重試起點,避免資料重複。此外,在資料接收端強制執行唯一性約束(如主鍵)也可以防止資料重複。

串流處理的冪等性

在串流處理中,冪等性可以透過建構保證根據源資料的唯一ID的生產者,以及只處理每個唯一鍵一次的消費者來實作。在建構這些系統時,必須考慮訊息如何被消費、確認和重試。持久儲存冪等性資料有助於在中斷和佈署期間保持冪等性。

檢查點機制(Checkpointing)

除了冪等性之外,檢查點機制是另一個基礎設計元素。檢查點提供了已知狀態,以便在發生故障時可以從該狀態重試,同時也為除錯提供了資訊。透過自動刪除檢查點資料,可以在不犧牲效能和雲端資源成本的前提下受益於此技術。

自動重試機制

有了冪等性和檢查點機制的基礎,就可以實作自動重試,從而使管線能夠從暫時性問題中還原。這減少了資源浪費和手動干預,並且可以利用廉價、可中斷的計算資源。

重試時機的選擇

在尋找使用重試機制的機會時,應考慮那些可能暫時失敗並在合理時間內還原的流程。例如,資料函式庫連線問題可能是暫時的,而無效的查詢則無論重試多少次都不會成功。

非阻塞重試

為了保持效能和避免浪費計算週期,應使用非阻塞重試。也就是說,重試之間應該有適當的等待時間,以避免立即重試導致的問題。

結語

總之,設計經濟高效且可靠的資料管線需要結合多種技術,包括冪等性、檢查點機制和自動重試。這些技術不僅可以減少資源浪費和手動干預,還可以提高資料品質和可靠性,從而為企業帶來更大的價值。透過仔細設計和實施這些技術,我們可以建立更強健、更高效的資料管線,以滿足現代資料驅動業務的需求。

圖表翻譯: 此圖表展示了設計具有冪等性的資料管線的流程。首先檢查系統是否具有冪等性,如果是,則執行操作並實施檢查點機制和自動重試。如果不是,則需要重新設計以實作冪等性。這種設計方法確保了資料管線的可靠性和效率。

建立有效的開發環境

如同其他軟體系統,資料管線需要在軟體開發生命週期中建立開發和測試環境。結合雲端服務、資料來源、接收端及其他相依元件,資料管線的環境包含許多變動因素,這些因素可能導致成本增加和管理混亂。本章將探討如何建立有效的開發環境,從本地開發技術到測試和預發布環境的設定,為資料管線的變更做好生產環境的準備。

本章首先概述資料環境和軟體環境之間的差異,並說明如何結合這些概念來為資料管線建立環境層級。你將瞭解如何在平衡成本、複雜度和功能需求的同時,滿足開發、測試和資料使用者的需求。

本章的第二部分重點介紹本地開發環境的設計,包括最佳實踐,以幫助你充分利用容器技術並避免常見陷阱。雖然本地開發一詞意味著一個完全在開發者機器上執行的環境,但實際上,使用資料管線和雲端服務時,你可能需要連線到外部資源。為了降低這些成本和複雜性,本章將介紹限制對外部服務依賴的策略。

當你需要為本地開發建立雲端資源時,必須有可靠的方法在資源不再需要時清理它們。本章最後提供了有關銷毀資源和限制持續性成本的建議。

環境

如果你從事軟體開發,你可能熟悉在不同環境中佈署程式碼,並在透過各種測試和品質要求後提升程式碼。同樣地,在開發資料管線時,除了考慮程式碼外,還需要考慮在這些環境中使用的資料。本文將說明如何結合資料環境和軟體開發環境的需求,從而確保從程式碼和資料處理的角度來看都是可靠的。

軟體環境

圖 5-1 展示了一個軟體環境層級的例子。這裡有四個層級——DEV、TEST、STAGING 和 PROD——程式碼會隨著透過額外的測試和驗證而逐步佈署,直到最終釋出到生產環境(PROD)。為了了解程式碼如何在各個層級之間移動,讓我們來看一個測試和提升程式碼的範例流程。

程式碼提升流程

提升流程通常始於合併請求。當你建立一個合併請求時,持續整合(CI)管線會執行單元測試,作為程式碼功能的第一道檢查。如果這些測試透過,程式碼就可以被合併。

一旦程式碼透過你的合併請求審核者的批准並被提交,它就會被佈署到 DEV 環境。DEV 環境可以用於執行整合測試,在這裡你驗證程式碼在連線到資料函式庫、API 和其他所需服務時的預期運作。一旦你的整合測試透過,程式碼就會被提升到 TEST 環境。進入 TEST 環境的程式碼足夠穩定,可以進行品質保證(QA)測試。

當你對 QA 結果感到滿意時,就可以建立一個新的釋出候選版本。程式碼從 TEST 環境移動到 STAGING,在這裡它可以執行一段額外的時間,然後再被佈署到 PROD。

圖表翻譯: 此圖示展示了程式碼在不同環境層級之間的流動過程,從 DEV 環境開始,透過單元測試和整合測試,最終佈署到 PROD 環境。

資料驗證的重要性

如同第4章所述,資料驗證是確保資料品質的關鍵步驟。在資料管線中實施自動化的資料驗證,可以幫助你在資料進入下游系統之前檢測和處理問題,從而避免了手動除錯的麻煩。

資料驗證技術

本章介紹了多種資料驗證技術,包括檢查資料形狀和格式、使用結構描述來驗證屬性名稱、型別和存在性等。這些技術可以幫助你確保資料的一致性和準確性。

環境規劃與資料管道開發的重要性

在軟體開發和資料處理領域中,建立有效的環境規劃是確保產品品質的關鍵。環境規劃不僅涉及程式碼的開發流程,也與資料邏輯的測試和驗證息息相關。適當的環境設計能夠提高開發效率、降低錯誤率,並確保最終產品符合預期。

資料環境的重要性

在開發資料邏輯時,資料環境的規劃同樣至關重要。以一個分析平台為例,開發團隊需要不同的資料環境層級來測試查詢陳述式的有效性。這些環境通常包括開發(DEV)、測試(TEST)、驗證(VALIDATION)和生產(PROD)等不同層級。

資料環境的分層架構

  1. 開發環境(DEV):包含少量的樣本資料,用於初步驗證查詢結果。
  2. 測試環境(TEST):具備較大的資料樣本,用於進一步測試和最佳化查詢陳述式。
  3. 驗證環境(VALIDATION):包含與生產環境相同或相似的資料,用於最終驗證查詢結果的正確性。
  4. 生產環境(PROD):最終上線的環境,承載實際的資料和查詢任務。

在這個案例中,分析師首先在DEV環境中驗證查詢陳述式,然後在TEST環境中進行更全面的測試,最後在VALIDATION環境中與PROD環境進行結果比對,以確保查詢結果的正確性。

資料管道環境的混合設計

資料管道的開發需要結合軟體開發和資料環境的最佳實踐。這種混合設計允許開發者逐步測試程式碼更新和資料處理的正確性。

資料管道環境的規劃

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 資料驗證模式與結構描述應用

package "系統架構" {
    package "前端層" {
        component [使用者介面] as ui
        component [API 客戶端] as client
    }

    package "後端層" {
        component [API 服務] as api
        component [業務邏輯] as logic
        component [資料存取] as dao
    }

    package "資料層" {
        database [主資料庫] as db
        database [快取] as cache
    }
}

ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取

note right of api
  RESTful API
  或 GraphQL
end note

@enduml

圖表翻譯: 此圖示展示了資料管道從開發到生產的流程,每一階段使用的資料量逐漸增加,以確保最終上線的穩定性和正確性。

在STAGING環境中,使用接近生產環境的資料進行測試,可以大幅提高對最終上線結果的信心。同時,這也提供了一個機會,在較長時間內執行管道,以評估效能問題。

環境規劃的挑戰與解決方案

在同時支援軟體開發和資料環境的需求下,規劃一個有效的環境架構至關重要。如果缺乏前瞻性的規劃,開發團隊可能會面臨諸多限制。

不足的環境設計帶來的問題

曾經有一個分析平台的開發團隊,由於環境設計不足而遇到了問題。最初,他們只有三個環境:DEV、TEST和PROD。DEV環境同時用於佈署合併後的程式碼和測試分支;TEST環境則被用作測試和預發布環境;PROD則是生產環境。

當分析師要求TEST環境的資料與PROD保持一致時,這種設計的缺陷就顯現出來了。這導致了資源競爭和功能衝突,最終影響了開發效率和產品品質。