傳統資料湖在提供大規模、低成本儲存的同時,卻長期面臨資料可靠性與管理複雜度的挑戰,例如缺乏事務支持導致的資料不一致、難以執行的更新與刪除操作。湖倉一體(Lakehouse)架構的出現,旨在結合資料倉儲的可靠性與資料湖的靈活性。本文將聚焦於實現此架構的關鍵技術——Delta Lake。我們將從其底層設計出發,深入探討交易日誌(Transaction Log)如何在物件儲存之上,構建出具備 ACID 事務、Schema 控管與版本控制能力的資料表格式。透過分析其 MERGE、UPDATE 等操作的實現原理,以及時間旅行等進階功能,文章揭示了 Delta Lake 如何將被動的資料儲存轉化為主動、可靠的資料資產,為現代資料工程與分析應用奠定穩固基礎。
第五章:物件儲存與資料湖
深入探討湖倉一體
此圖示:Delta Lake 交易日誌結構範例
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
folder "S3 Bucket/Delta Table Root" as DeltaRoot {
folder "_delta_log" as DeltaLogFolder {
file "00000000000000000000.json" as Log0
file "00000000000000000001.json" as Log1
file "00000000000000000002.json" as Log2
file "00000000000000000003.checkpoint.parquet" as Checkpoint
file "00000000000000000004.json" as Log4
}
file "part-00000-guid1.parquet" as DataFile1
file "part-00001-guid2.parquet" as DataFile2
file "part-00002-guid3.parquet" as DataFile3
file "part-00003-guid4.parquet" as DataFile4
}
Log0 --> DataFile1 : 添加
Log1 --> DataFile2 : 添加
Log2 --> DataFile3 : 刪除 DataFile1, 添加 DataFile3
Checkpoint --> Log0 : 包含 Log0-Log2 的匯總資訊
Checkpoint --> Log1
Checkpoint --> Log2
Log4 --> DataFile4 : 添加 DataFile4
note right of Log0
JSON 格式
記錄操作 (add, remove, metadata, protocol)
包含檔案路徑、大小、統計資訊等
end note
note right of Checkpoint
Parquet 格式
優化中繼資料讀取速度
定期生成
end note
@enduml看圖說話:
此圖示展示了Delta Lake在物件儲存上的交易日誌檔案結構範例。在Delta Table的根目錄下,除了實際的資料檔案(如part-*.parquet)外,還有一個關鍵的_delta_log資料夾。這個資料夾包含了一系列JSON格式的交易日誌檔案(如00000000000000000000.json),每個檔案都記錄了一次對資料表的操作(例如添加、刪除資料檔案、更新Schema等)。這些JSON檔案構成了資料表的完整操作歷史。為了優化中繼資料的讀取效率,Delta Lake會定期將這些JSON日誌檔案匯總並儲存為Parquet格式的檢查點檔案(如00000000000000000003.checkpoint.parquet)。當應用程式需要讀取資料表時,它會先讀取最新的檢查點檔案,然後再處理檢查點之後的JSON日誌檔案,從而快速重建資料表的最新狀態。這種結構是Delta Lake實現ACID事務、時間旅行和Schema演進的基礎。
透過交易日誌,應用程式可以獲取該版本中包含的所有物件。預設行為是讀取當前交易,這不過是資料表的當前資料。這解決了兩個早期的問題:
- 讀取者將獲得資料表的一致隔離讀取,其他寫入者無法再次修改該版本。即使寫入者不斷寫入資料表,讀取者也將被固定在他們發出讀取時正在讀取的當前版本。
- 這解決了中繼資料擴展問題。讀取者將獲得所需物件的列表,並且能夠以透明的方式完全跳過檔案列表步驟。這允許讀取者以分散式方式啟動,並完全繞過驅動程式瓶頸。
由於Schema作為交易日誌的一部分儲存,因此現在可以在Delta位置強制執行。這意味著寫入者不能再盲目地將資料推送到不符合當前Schema的位置。讀取者可以安全地讀取他們不理解的資料。當然,某些情況需要一種安全的方式來演進Schema,因為資料會發生變化。Delta允許選擇更改資料表並添加欄位。或者,透過設定Spark配置,寫入者可以隨著新欄位的添加自動演進Schema。
交易日誌允許讀取者與資料表的寫入者隔離。這使得寫入者能夠擁有更強大的操作能力。Delta資料表可以接受傳統的語句,例如DELETE、UPDATE和MERGE,而不會導致不一致的行為。交易要麼完全成功,要麼失敗並回滾。此外,可以對包含小檔案的資料表發出壓縮,以減少許多小檔案的讀取延遲。這種壓縮還可以與在資料表上添加叢集索引相結合,提供超越簡單分割區的另一層讀取增強功能。然後,管線可以根據高基數的欄位進行叢集,以共同定位資料並提供比傳統日期欄位分割區更快的資料跳過機制。
時間旅行等額外邊緣功能透過Delta Lake的內建功能得以實現,這允許查詢資料表的舊快照視圖,並在處理管線中出現問題(例如錯誤)時將資料表回滾到舊版本。資料表的每次更改也可以選擇性地用於處理增量更改,而不僅僅是簡單的追加。將資料表中所有更改捕獲為饋送稱為變更資料饋送(Change Data Feed)。如果啟用此功能,則每個更新、刪除和插入操作都將被記錄下來,以進行更細粒度的下游處理,這在串流應用程式中通常很有用。請參閱以下程式碼,了解如何在現有Delta資料表上啟用此功能:
ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
讓玄貓看看一些Spark處理命令來了解一些Delta範例。為了寫入Delta資料表,寫入命令只需將格式指定為Delta:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("fs.s3a.aws.credentials.provider","com.amazonaws.auth.profile.ProfileCredentialsProvider")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.master("local[*]")
.getOrCreate()
val DF = spark.read.option("header","true").csv("s3a://scala-data-engineering/my/first/")
DF.write.format("delta").mode(SaveMode.Overwrite).save("s3a://scala-data-engineering/my/first_delta")
這段程式碼展示了如何初始化一個支援Delta Lake的SparkSession,並將一個CSV檔案讀取為DataFrame,然後以Delta格式將其寫入S3。關鍵配置包括spark.sql.extensions和spark.sql.catalog.spark_catalog,它們分別註冊了Delta SparkSession擴展和Delta Catalog,使得Spark能夠理解和操作Delta表。
要更新資料表中的條目,可以使用更新命令:
第五章:物件儲存與資料湖
深入探討湖倉一體
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(spark, "s3a://scala-data-engineering/my/first_delta")
deltaTable.update(
col("id") === 1,
Map("name" -> lit("jane"))
)
spark.read.format("delta").load("s3a://scala-data-engineering/my/first_delta").show()
這段程式碼首先透過DeltaTable.forPath方法獲取一個DeltaTable實例,指向之前創建的Delta表。然後,它使用update方法更新id為1的記錄,將name欄位的值更新為"jane"。最後,再次讀取Delta表並顯示其內容,以驗證更新是否成功。
插入或更新操作稱為Upsert,可以從來源DataFrame使用merge命令完成。這個更新DataFrame可以來自任何來源,但對於這個簡單的範例,玄貓創建了一個靜態的:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}
val rowData = Seq(Row(1,"john"),
Row(2,"jane"))
val schema = StructType( Array(
StructField("id", IntegerType,true),
StructField("name", StringType,true)
))
val updatesDF = spark.createDataFrame(rowData,schema)
val deltaTable = DeltaTable.forPath(spark, "s3a://scala-data-engineering/my/first_delta").as("target")
deltaTable.merge(
updatesDF.as("updates"),
"target.id = updates.id")
.whenMatched
.updateExpr(
Map("name" -> "updates.name"))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"name" -> "updates.name"))
.execute()
spark.read.format("delta").load("s3a://scala-data-engineering/my/first_delta").show()
這段程式碼展示了Delta Lake的強大MERGE操作,它結合了插入和更新的功能。首先,玄貓創建了一個包含更新資料的updatesDF。然後,使用deltaTable.merge方法,將updatesDF與目標Delta表進行合併。合併條件是target.id = updates.id。whenMatched子句指定當ID匹配時執行更新操作,這裡更新了name欄位。whenNotMatched子句指定當ID不匹配時執行插入操作,插入新的id和name。這使得資料同步變得非常高效和原子化。
最後,當移除謂詞條件時,資料可以選擇性地刪除或完全刪除:
deltaTable.delete(col("id") === 1) // 刪除 id 為 1 的記錄
這段程式碼使用delete方法,並透過col("id") === 1指定了一個謂詞,表示只刪除id為1的記錄。如果沒有提供謂詞,delete操作將刪除表中的所有記錄。
除了簡單的資料操作命令之外,Delta還提供了更進階的功能,例如時間旅行、還原、叢集索引和壓縮等。為了查看時間旅行和還原命令,首先了解Delta資料表的歷史機制會很有幫助。讓玄貓使用以下程式碼來查看Delta資料表的歷史:
deltaTable.history()
.select("version","timestamp","operation","operationParameters","operationMetrics")
.show(false)
此命令將顯示資料表上發生的所有操作以及有關這些操作的中繼資料,例如添加、更新或刪除了多少行。
此圖示:操作輸出
這段程式碼執行後,會輸出一個表格,類似於:
+-------+--------------------+---------+--------------------+--------------------+
|version| timestamp|operation| operationParameters| operationMetrics|
+-------+--------------------+---------+--------------------+--------------------+
| 0|2023-10-27 10:00:...| WRITE|{mode -> Overwrite...|{numFiles -> 1, n...|
| 1|2023-10-27 10:05:...| UPDATE|{predicate -> (id...|{numUpdatedRows ->...|
| 2|2023-10-27 10:10:...| MERGE|{predicate -> (ta...|{numTargetRowsCop...|
| 3|2023-10-27 10:15:...| DELETE|{predicate -> (id...|{numDeletedRows ->...|
+-------+--------------------+---------+--------------------+--------------------+
透過歷史資訊,Delta資料表可以透過版本號或時間戳還原到以前的版本:
deltaTable.restoreToVersion(0) // 還原到版本 0
// 或者
// deltaTable.restoreToTimestamp("2023-10-27 10:00:00") // 還原到指定時間戳
上述功能的組合匯集了物件儲存的可擴展性、可靠性和成本效益,同時還提供了以一致和可靠的方式對其執行穩健操作命令的能力。這使得資料工程管線可以擺脫傳統的資料庫處理機制,這些機制可能昂貴且難以擴展。相反,需要緩慢變更維度、刪除和更新等功能的管線可以在商用儲存介質上擴展。
湖倉一體需要超越能夠在各種資料上進行穩健的資料工程。僅僅Delta本身只能提供一致性和實現資料庫功能的機制。資料轉換後,它需要能夠用於商業智慧 (BI) 和機器學習 (ML) 用例,而無需將資料複製到不同的系統。透過湖倉一體架構,組織可以消除資料重複、整合治理模型、降低複雜性並降低成本。
讓玄貓繼續討論串流資料以及如何將其與物件儲存一起使用。
串流資料
串流資料通常是一個被誤解的話題,因為串流通常被認為是即時資料處理所必需的。這種處理級別需要某種類型的計算資源持續運行,以使資料盡可能保持最新,並且被認為非常昂貴。
一些工程團隊會因為預算限制而避免這種架構,並且因為用例只需要資料以某種頻率保持最新,例如每天、每小時、每天兩次等等。雖然這在許多情況下是正確的,但它忽略了串流架構的主要目的,即增量處理。這種處理是資料工程的聖杯,因為處理的資料越少通常意味著管線的成本越低。
本節將展示如何從不同來源進行串流,並將這些串流處理到不同的目的地或接收器。
在當今的資料格局中,有許多不同的方法來建立串流架構。為了。
此圖示:Delta Lake 的核心特性
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
package "Delta Lake 核心特性" as DeltaFeatures {
component "ACID 事務保證" as ACID
component "Schema 強制與演進" as SchemaEvolution
component "時間旅行 (Time Travel)" as TimeTravel
component "統一批次與串流" as UnifiedBatchStream
component "資料版本控制" as DataVersioning
component "優化操作 (Upsert/Delete/Merge)" as OptimizedOps
component "資料壓縮與索引" as CompactionIndexing
component "變更資料饋送 (Change Data Feed)" as CDF
}
package "底層技術" as UnderlyingTech {
component "物件儲存" as ObjectStorage
component "Parquet 檔案" as ParquetFiles
component "交易日誌" as TransactionLog
}
UnderlyingTech --> DeltaFeatures : 提供基礎
ObjectStorage --> TransactionLog : 儲存日誌
ObjectStorage --> ParquetFiles : 儲存資料
TransactionLog --> ACID : 實現事務
TransactionLog --> SchemaEvolution : 管理 Schema
TransactionLog --> TimeTravel : 記錄版本
TransactionLog --> DataVersioning : 實現版本控制
UnifiedBatchStream -- DataVersioning : 依賴版本控制
OptimizedOps -- ACID : 依賴事務
CompactionIndexing -- TransactionLog : 透過日誌管理
CDF -- TransactionLog : 捕獲日誌變更
note right of ACID
原子性、一致性、隔離性、持久性
end note
note right of TimeTravel
查詢歷史版本
回滾到舊版本
end note
note right of UnifiedBatchStream
批次和串流處理同一份資料
end note
@enduml看圖說話:
此圖示清晰地描繪了Delta Lake的核心特性及其與底層技術的關係。Delta Lake建立在物件儲存之上,並利用Parquet檔案作為實際資料的儲存格式,而其創新的核心在於交易日誌。這個交易日誌是實現所有進階功能的關鍵,包括:提供ACID事務保證,確保資料操作的可靠性;實現Schema強制與演進,有效管理資料結構的變化;提供時間旅行和資料版本控制,允許查詢歷史資料和回滾;以及支持優化的資料操作如Upsert、Delete和Merge。此外,Delta Lake還能進行資料壓縮與索引以提升查詢效能,並透過變更資料饋送功能捕獲所有資料變更,為串流應用提供增量處理能力。這些特性共同使得Delta Lake能夠統一批次與串流處理,為湖倉一體架構提供了堅實的技術基礎,極大地提升了資料處理的效率、可靠性和靈活性。
縱觀現代資料工程的演進軌跡,Delta Lake 的出現不僅是技術的升級,更是一次典範轉移的關鍵突破。它巧妙地解決了傳統物件儲存「僅可追加」的先天限制,與資料倉儲成本高昂、擴展性不足的兩難困境。透過引入以交易日誌為核心的ACID事務層,Delta Lake 在保留物件儲存的成本效益與橫向擴展能力的同時,賦予了資料湖過去難以企及的可靠性與操作彈性,例如更新、刪除與合併等細粒度操作。
這項創新不僅統一了批次與串流處理的底層基礎,更預示著資料架構將朝向更簡潔、更即時的「湖倉一體」型態深度整合。未來,基於變更資料饋送(Change Data Feed)的增量處理模式,將可能從特定場景的應用,演變為資料管線設計的主流思維,大幅降低端到端延遲與運算成本。
綜合評估後,玄貓認為,掌握以交易日誌為核心的儲存格式,已非單純的技術選項,而是現代資料工程師構築高效能、高可靠性資料解決方案的關鍵基石,也是實現真正意義上湖倉一體的必要前提。