在資料工程中,資料驗證是確保資料品質的關鍵步驟。本文將介紹 Spark 提供的三種資料驗證模式:PERMISSIVE、DROPMALFORMED 和 FAILFAST,並說明如何使用它們處理格式不正確或損壞的資料。此外,我們也會探討結構描述(Schema)的應用,包含如何定義、使用及維護結構描述,以確保資料符合預期的格式和型別,進而提升資料處理效率及資料分析的準確性。透過結構描述的建立與應用,能有效提升資料處理的可靠度,避免因為資料格式錯誤造成後續流程的錯誤。理解和正確運用這些驗證機制和工具,能有效提升資料管線的穩定性和資料分析的可靠性。
資料驗證的重要性與實務應用
在資料處理的過程中,資料驗證是一個至關重要的步驟。良好的資料驗證機制可以確保資料的品質和可靠性,從而提高資料分析的準確性和效率。本文將探討資料驗證的不同模式、結構描述(Schemas)的應用,以及如何建立和維護結構描述。
資料驗證模式
在處理資料時,經常會遇到格式不正確或損壞的資料。Apache Spark 提供了多種模式來處理這種情況,包括 PERMISSIVE、DROPMALFORMED 和 FAILFAST。
PERMISSIVE模式:此模式允許讀取損壞的資料,並將損壞的記錄放入一個特定的欄位中。這種模式適合於需要檢查和除錯損壞記錄的情況。# 使用 PERMISSIVE 模式讀取資料 df = spark.read.option("mode", "PERMISSIVE").json("data.json")
內容解密:
這段程式碼展示瞭如何使用
PERMISSIVE模式讀取 JSON 檔案。PERMISSIVE模式會嘗試讀取儘可能多的資料,並將無法解析的記錄標記為損壞。這種模式對於需要檢查和除錯損壞記錄的情況非常有用。
DROPMALFORMED模式:此模式會丟棄損壞的記錄,只保留正確的資料。# 使用 DROPMALFORMED 模式讀取資料 df = spark.read.option("mode", "DROPMALFORMED").json("data.json")
內容解密:
這段程式碼展示瞭如何使用
DROPMALFORMED模式讀取 JSON 檔案。DROPMALFORMED模式會丟棄無法解析的記錄,只保留正確的資料。這種模式適合於不需要處理損壞記錄的情況。
FAILFAST模式:此模式在遇到損壞的記錄時立即丟擲異常,停止資料讀取。# 使用 FAILFAST 模式讀取資料 df = spark.read.option("mode", "FAILFAST").json("data.json")
內容解密:
- 這段程式碼展示瞭如何使用
FAILFAST模式讀取 JSON 檔案。 FAILFAST模式在遇到無法解析的記錄時立即丟擲異常。- 這種模式適合於對資料品質要求非常高的情況。
結構描述(Schemas)的應用
結構描述是用於定義資料的結構和約束條件的重要工具。它可以幫助進行資料驗證,確保資料符合預期的格式和型別。
結構描述的作用
- 資料型別檢查:結構描述可以檢查資料欄位的型別是否正確。
- 必填欄位檢查:可以定義哪些欄位是必填的,從而檢查資料是否完整。
- 限制資料範圍:透過結構描述,可以限制資料的範圍,例如數值範圍或字串長度。
如何建立結構描述
建立結構描述的方法有多種,可以根據具體的需求選擇合適的方法。
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
# 定義結構描述
schema = StructType([
StructField("user", StringType(), nullable=False),
StructField("location", ArrayType(StringType()), nullable=False),
StructField("image_files", StringType(), nullable=True),
StructField("description", StringType(), nullable=True),
StructField("count", IntegerType(), nullable=True)
])
# 使用結構描述讀取資料
df = spark.read.schema(schema).json("data.json")
內容解密:
- 這段程式碼展示瞭如何定義一個結構描述,並使用它來讀取 JSON 檔案。
- 結構描述定義了每個欄位的名稱、型別和是否可為空。
- 使用結構描述可以確保讀取的資料符合預期的結構和約束條件。
結構描述的維護
結構描述需要根據資料源的變化進行更新,以確保其準確性和有效性。以下是一些維護結構描述的最佳實踐:
- 自動生成結構描述:可以根據樣本資料自動生成初始的結構描述。
- 定期更新結構描述:隨著資料源的變化,定期檢查和更新結構描述。
- 與資料生產者協調:與資料的生產者保持溝通,及時瞭解資料結構的變化。
資料驗證的重要性與實作方法
在資料處理流程中,資料驗證是一個至關重要的步驟。它確保了資料的正確性、完整性和一致性,從而避免了下游處理的錯誤和異常。本文將介紹使用Spark和JSON Schema進行資料驗證的方法。
使用Spark進行資料驗證
Spark提供了強大的資料處理能力,同時也支援資料驗證。我們可以透過定義Schema來驗證資料是否符合預期。
定義Schema
首先,我們需要定義一個Schema來描述資料的結構。下面是一個使用Spark定義Schema的例子:
df = (spark
.read
.option("inferSchema", True)
.json("initial_source_data.json"))
source_schema = df.schema
輸出的Schema如下:
StructType(
[StructField("count", LongType(),True),
StructField("description", StringType(),True),
StructField("user", StringType(),False),
StructField("img_files", ArrayType(StringType(),True),True),
StructField("location", ArrayType(StringType(),True),False)]
)
修改Schema以進行驗證
如果我們希望確保某些欄位不為空,可以修改Schema中的nullable屬性。例如,將description欄位的nullable屬性設為False:
StructField("description", StringType(),False)
比較推斷的Schema與預期的Schema
當讀取新的資料時,我們可以比較推斷的Schema與預期的Schema是否一致。如果不一致,則表示資料結構發生了變化。
df = (spark
.read
.option("inferSchema", True)
.json('string_location.json'))
inferred_schema = df.schema
inferred_schema == source_schema
如果比較結果為False,則表示Schema不匹配。我們可以進一步檢查哪些欄位不匹配:
source_info = {f.name: f for f in source_schema.fields}
for f in inferred_schema.fields:
if f.name not in source_info.keys():
print(f"New field in data source {f}")
elif f != source_info[f.name]:
source_field = source_info[f.name]
print(f"Field mismatch for {f.name} Source schema: {source_field}, Inferred schema: {f}")
輸出結果如下:
Field mismatch for location
Source schema: StructField(location,ArrayType(StringType,true),true),
Inferred schema: StructField(location,StringType,true)
使用JSON Schema進行資料驗證
JSON Schema是一種用於描述JSON資料結構的規範。我們可以使用JSON Schema來驗證資料是否符合預期。
生成JSON Schema
可以使用線上工具或手動生成JSON Schema。下面是一個生成的JSON Schema例子:
{
"type": "object",
"properties": {
"location": {
"type": "array",
"minItems": 2,
"items": {"type": "string"}
}
}
}
使用JSON Schema進行驗證
使用Python的jsonschema函式庫可以對資料進行驗證:
from jsonschema import validate
validate(short_location, updated_schema)
如果資料不符合Schema,則會丟擲ValidationError異常。
內容解密:
- 定義和使用Schema:文中介紹瞭如何使用Spark和JSON Schema來定義資料結構,並用於資料驗證。
- 修改Schema屬性:展示瞭如何修改Spark Schema中的
nullable屬性,以及如何使用JSON Schema的minItems屬性來限制陣列長度。 - 比較Schema:演示瞭如何比較推斷的Schema與預期的Schema,以檢測資料結構的變化。
- JSON Schema驗證:介紹瞭如何使用JSON Schema對JSON資料進行驗證,並給出了具體的錯誤示例。
進一步改進方向
- 自動化Schema維護:文中提到了自動化Schema維護的重要性,可以考慮使用自動化工具或流程來更新和維護Schema。
- 擴充套件驗證邏輯:除了基本的Schema驗證外,還可以考慮新增更多的驗證邏輯,例如業務規則驗證等。
經濟高效的資料管線基礎
在資料工程領域,確保資料管線的穩定性和可靠性至關重要。隨著雲端基礎設施、第三方服務和分散式軟體開發的複雜性增加,資料管線面臨著多種潛在的失敗機制。無論是源資料或管執行緒式碼的變更、錯誤的憑證、資源爭用還是雲端服務的可用性問題,都可能導致資料停機或資料品質問題。
架構設計的重要性
為瞭解決這些問題,我們需要建立具有冪等性(Idempotency)的資料管線。冪等性是指無論執行多少次相同的操作,其結果都保持一致。這對於支援重試機制和避免資料重複至關重要。
批次處理的冪等性
在批次處理中,可以透過刪除-寫入(delete-write)和資料函式庫交易來實作冪等性。這些方法確保批次要麼完全處理,要麼完全不處理,從而提供乾淨的重試起點,避免資料重複。此外,在資料接收端強制執行唯一性約束(如主鍵)也可以防止資料重複。
串流處理的冪等性
在串流處理中,冪等性可以透過建構保證根據源資料的唯一ID的生產者,以及只處理每個唯一鍵一次的消費者來實作。在建構這些系統時,必須考慮訊息如何被消費、確認和重試。持久儲存冪等性資料有助於在中斷和佈署期間保持冪等性。
檢查點機制(Checkpointing)
除了冪等性之外,檢查點機制是另一個基礎設計元素。檢查點提供了已知狀態,以便在發生故障時可以從該狀態重試,同時也為除錯提供了資訊。透過自動刪除檢查點資料,可以在不犧牲效能和雲端資源成本的前提下受益於此技術。
自動重試機制
有了冪等性和檢查點機制的基礎,就可以實作自動重試,從而使管線能夠從暫時性問題中還原。這減少了資源浪費和手動干預,並且可以利用廉價、可中斷的計算資源。
重試時機的選擇
在尋找使用重試機制的機會時,應考慮那些可能暫時失敗並在合理時間內還原的流程。例如,資料函式庫連線問題可能是暫時的,而無效的查詢則無論重試多少次都不會成功。
非阻塞重試
為了保持效能和避免浪費計算週期,應使用非阻塞重試。也就是說,重試之間應該有適當的等待時間,以避免立即重試導致的問題。
結語
總之,設計經濟高效且可靠的資料管線需要結合多種技術,包括冪等性、檢查點機制和自動重試。這些技術不僅可以減少資源浪費和手動干預,還可以提高資料品質和可靠性,從而為企業帶來更大的價值。透過仔細設計和實施這些技術,我們可以建立更強健、更高效的資料管線,以滿足現代資料驅動業務的需求。
圖表翻譯: 此圖表展示了設計具有冪等性的資料管線的流程。首先檢查系統是否具有冪等性,如果是,則執行操作並實施檢查點機制和自動重試。如果不是,則需要重新設計以實作冪等性。這種設計方法確保了資料管線的可靠性和效率。
建立有效的開發環境
如同其他軟體系統,資料管線需要在軟體開發生命週期中建立開發和測試環境。結合雲端服務、資料來源、接收端及其他相依元件,資料管線的環境包含許多變動因素,這些因素可能導致成本增加和管理混亂。本章將探討如何建立有效的開發環境,從本地開發技術到測試和預發布環境的設定,為資料管線的變更做好生產環境的準備。
本章首先概述資料環境和軟體環境之間的差異,並說明如何結合這些概念來為資料管線建立環境層級。你將瞭解如何在平衡成本、複雜度和功能需求的同時,滿足開發、測試和資料使用者的需求。
本章的第二部分重點介紹本地開發環境的設計,包括最佳實踐,以幫助你充分利用容器技術並避免常見陷阱。雖然本地開發一詞意味著一個完全在開發者機器上執行的環境,但實際上,使用資料管線和雲端服務時,你可能需要連線到外部資源。為了降低這些成本和複雜性,本章將介紹限制對外部服務依賴的策略。
當你需要為本地開發建立雲端資源時,必須有可靠的方法在資源不再需要時清理它們。本章最後提供了有關銷毀資源和限制持續性成本的建議。
環境
如果你從事軟體開發,你可能熟悉在不同環境中佈署程式碼,並在透過各種測試和品質要求後提升程式碼。同樣地,在開發資料管線時,除了考慮程式碼外,還需要考慮在這些環境中使用的資料。本文將說明如何結合資料環境和軟體開發環境的需求,從而確保從程式碼和資料處理的角度來看都是可靠的。
軟體環境
圖 5-1 展示了一個軟體環境層級的例子。這裡有四個層級——DEV、TEST、STAGING 和 PROD——程式碼會隨著透過額外的測試和驗證而逐步佈署,直到最終釋出到生產環境(PROD)。為了了解程式碼如何在各個層級之間移動,讓我們來看一個測試和提升程式碼的範例流程。
程式碼提升流程
提升流程通常始於合併請求。當你建立一個合併請求時,持續整合(CI)管線會執行單元測試,作為程式碼功能的第一道檢查。如果這些測試透過,程式碼就可以被合併。
一旦程式碼透過你的合併請求審核者的批准並被提交,它就會被佈署到 DEV 環境。DEV 環境可以用於執行整合測試,在這裡你驗證程式碼在連線到資料函式庫、API 和其他所需服務時的預期運作。一旦你的整合測試透過,程式碼就會被提升到 TEST 環境。進入 TEST 環境的程式碼足夠穩定,可以進行品質保證(QA)測試。
當你對 QA 結果感到滿意時,就可以建立一個新的釋出候選版本。程式碼從 TEST 環境移動到 STAGING,在這裡它可以執行一段額外的時間,然後再被佈署到 PROD。
圖表翻譯: 此圖示展示了程式碼在不同環境層級之間的流動過程,從 DEV 環境開始,透過單元測試和整合測試,最終佈署到 PROD 環境。
資料驗證的重要性
如同第4章所述,資料驗證是確保資料品質的關鍵步驟。在資料管線中實施自動化的資料驗證,可以幫助你在資料進入下游系統之前檢測和處理問題,從而避免了手動除錯的麻煩。
資料驗證技術
本章介紹了多種資料驗證技術,包括檢查資料形狀和格式、使用結構描述來驗證屬性名稱、型別和存在性等。這些技術可以幫助你確保資料的一致性和準確性。
環境規劃與資料管道開發的重要性
在軟體開發和資料處理領域中,建立有效的環境規劃是確保產品品質的關鍵。環境規劃不僅涉及程式碼的開發流程,也與資料邏輯的測試和驗證息息相關。適當的環境設計能夠提高開發效率、降低錯誤率,並確保最終產品符合預期。
資料環境的重要性
在開發資料邏輯時,資料環境的規劃同樣至關重要。以一個分析平台為例,開發團隊需要不同的資料環境層級來測試查詢陳述式的有效性。這些環境通常包括開發(DEV)、測試(TEST)、驗證(VALIDATION)和生產(PROD)等不同層級。
資料環境的分層架構
- 開發環境(DEV):包含少量的樣本資料,用於初步驗證查詢結果。
- 測試環境(TEST):具備較大的資料樣本,用於進一步測試和最佳化查詢陳述式。
- 驗證環境(VALIDATION):包含與生產環境相同或相似的資料,用於最終驗證查詢結果的正確性。
- 生產環境(PROD):最終上線的環境,承載實際的資料和查詢任務。
在這個案例中,分析師首先在DEV環境中驗證查詢陳述式,然後在TEST環境中進行更全面的測試,最後在VALIDATION環境中與PROD環境進行結果比對,以確保查詢結果的正確性。
資料管道環境的混合設計
資料管道的開發需要結合軟體開發和資料環境的最佳實踐。這種混合設計允許開發者逐步測試程式碼更新和資料處理的正確性。
資料管道環境的規劃
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title 資料驗證模式與結構描述應用
package "系統架構" {
package "前端層" {
component [使用者介面] as ui
component [API 客戶端] as client
}
package "後端層" {
component [API 服務] as api
component [業務邏輯] as logic
component [資料存取] as dao
}
package "資料層" {
database [主資料庫] as db
database [快取] as cache
}
}
ui --> client : 使用者操作
client --> api : HTTP 請求
api --> logic : 處理邏輯
logic --> dao : 資料操作
dao --> db : 持久化
dao --> cache : 快取
note right of api
RESTful API
或 GraphQL
end note
@enduml圖表翻譯: 此圖示展示了資料管道從開發到生產的流程,每一階段使用的資料量逐漸增加,以確保最終上線的穩定性和正確性。
在STAGING環境中,使用接近生產環境的資料進行測試,可以大幅提高對最終上線結果的信心。同時,這也提供了一個機會,在較長時間內執行管道,以評估效能問題。
環境規劃的挑戰與解決方案
在同時支援軟體開發和資料環境的需求下,規劃一個有效的環境架構至關重要。如果缺乏前瞻性的規劃,開發團隊可能會面臨諸多限制。
不足的環境設計帶來的問題
曾經有一個分析平台的開發團隊,由於環境設計不足而遇到了問題。最初,他們只有三個環境:DEV、TEST和PROD。DEV環境同時用於佈署合併後的程式碼和測試分支;TEST環境則被用作測試和預發布環境;PROD則是生產環境。
當分析師要求TEST環境的資料與PROD保持一致時,這種設計的缺陷就顯現出來了。這導致了資源競爭和功能衝突,最終影響了開發效率和產品品質。