在現代資料架構中,系統經常需要同時處理交易型操作與大規模批次處理。傳統上,這兩種需求由不同的工具鏈與介面來滿足,導致程式碼庫的複雜性與維護成本增加。本文旨在探討一種整合性 API 設計模式,將基於 doobie 的純函數式資料庫存取層,與 Apache Spark 的分散式資料處理框架進行結合。透過在既有的 Scala Database 介面中擴展 JDBC 讀寫功能,我們不僅保留了原有 API 的簡潔性與類型安全性,更賦予其直接操作 Spark DataFrame 的能力。這種設計策略實現了資料庫操作的雙重模式,讓應用程式能依據情境彈性選擇精細的記錄級操作或高效的平行資料傳輸,從而打造出更具擴展性與一致性的資料工程解決方案。

使用Spark讀寫更新資料庫API

隨著上一節討論的更改,玄貓更新後的Database API如下所示:

package com.blackcat.dewithscala.utils

import com.blackcat.dewithscala.Opaque
import doobie._
import doobie.implicits._
import cats.effect.unsafe.implicits.global // 引入全局執行上下文

sealed trait Database {
def driver: String
def scheme: String
def host: String
def port: String
def name: String
def jbdcURL: String
def username: Opaque
def password: Opaque

def records[T: Read](selectStatement: String): List[T]
def runDDL(statement: String): Database
def runDML(statement: String): Database = runDDL(statement) // DML與DDL的實現相同
}

此圖示:擴展後的資料庫介面

@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

package "com.blackcat.dewithscala.utils" {
interface Database {
+ driver: String
+ scheme: String
+ host: String
+ port: String
+ name: String
+ jbdcURL: String
+ username: Opaque
+ password: Opaque
+ records[T: Read](selectStatement: String): List<T>
+ runDDL(statement: String): Database
+ runDML(statement: String): Database
}
object Database {
+ apply(name: String): Database
}
class DatabaseImplementation {
- db: com.blackcat.dewithscala.DatabaseConfig
+ driver: String
+ scheme: String
+ host: String
+ port: String
+ name: String
+ jbdcURL: String
+ username: Opaque
+ password: Opaque
+ records[T: Read](selectStatement: String): List<T>
+ runDDL(statement: String): Database
+ runDML(statement: String): Database
}
}

package "com.blackcat.dewithscala" {
class DatabaseConfig <<case class>>
class Opaque
}

Database <|-- DatabaseImplementation
DatabaseImplementation ..> DatabaseConfig
DatabaseImplementation ..> Opaque
DatabaseImplementation ..> doobie.Read
DatabaseImplementation ..> doobie.Transactor
DatabaseImplementation ..> cats.effect.IO
Database "1" -- "1" DatabaseImplementation : 實現
Database "1" -- "1" Database.apply : 創建
@enduml

看圖說話:

此圖示展示了擴展後的資料庫介面Database及其實現DatabaseImplementationDatabase特徵現在包含了更多與資料庫操作相關的方法,如records用於查詢資料並映射到Scala類型,以及runDDLrunDML用於執行DDL和DML語句。DatabaseImplementation作為其具體實現,內部依賴於com.blackcat.dewithscala.DatabaseConfig來獲取連接細節,並利用doobie函式庫(如doobie.Readdoobie.Transactorcats.effect.IO)來執行實際的資料庫操作。這種設計將底層的doobie實現細節封裝在DatabaseImplementation中,對外只暴露簡潔且功能豐富的Database介面,極大地提高了程式碼的模組化和易用性,同時也保證了操作的函數式純粹性。

這是一個好的開始,但它缺乏與Spark協作的介面。如果玄貓可以透過資料庫物件執行Spark讀寫操作,那將會很好。為此,讓玄貓向介面添加以下方法:

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} // 引入Spark相關類別

sealed trait Database {
// ... (現有方法) ...

def singlePartitionRead(session: SparkSession, dbTable: String): DataFrame
def multiPartitionRead(
session: SparkSession,
dbTable: String,
partitionCol: String,
upperBound: String,
lowerBound: String,
numPartitions: Int
): DataFrame
def singlePartitionWrite(
session: SparkSession,
dbTable: String,
df: DataFrame,
saveMode: SaveMode
): Database
def multiPartitionWrite(
session: SparkSession,
dbTable: String,
df: DataFrame,
numPartitions: Int,

這裡向Database特徵添加了四個新方法,旨在提供與Spark JDBC API整合的讀寫功能。singlePartitionReadmultiPartitionRead用於從資料庫讀取資料到Spark DataFrame,其中multiPartitionRead支援多分割區讀取以提高效能。singlePartitionWritemultiPartitionWrite則用於將Spark DataFrame寫入資料庫,同樣multiPartitionWrite支援多分割區寫入。這些方法的引入使得Database介面不僅能處理doobie的純函數式資料庫操作,還能無縫地與Spark的資料處理能力結合,為大數據場景下的資料庫互動提供了強大的支援。

數據工程核心理論與實踐

第四章:資料庫操作與Spark JDBC API

使用Spark讀寫更新資料庫API
saveMode: SaveMode
): Database

這些方法的實現非常直接,如下所示:

def singlePartitionRead(session: SparkSession, dbTable: String):
DataFrame =
session.read
.format("jdbc")
.option("url", jbdcURL)
.option("user", username.value)
.option("password", password.value)
.option("dbtable", dbTable)
.load()

def multiPartitionRead(
session: SparkSession,
dbTable: String,
partitionCol: String,
upperBound: String,
lowerBound: String,
numPartitions: Int
) = session.read
.format("jdbc")
.option("url", jbdcURL)
.option("user", username.value)
.option("password", password.value)
.option("dbtable", dbTable)
.option("partitionColumn", partitionCol)
.option("numPartitions", numPartitions)
.option("upperBound", upperBound)
.option("lowerBound", lowerBound)
.load()

def singlePartitionWrite(
session: SparkSession,
dbTable: String,
df: DataFrame,
saveMode: SaveMode
) = {
df.write
.format("jdbc")
.option("url", jbdcURL)
.option("user", username.value)
.option("password", password.value)
.option("dbtable", dbTable)
.mode(saveMode)
.save()
this
}

def multiPartitionWrite(
session: SparkSession,
dbTable: String,
df: DataFrame,
numPartitions: Int,
saveMode: SaveMode
) = {
df.write
.format("jdbc")
.option("url", jbdcURL)
.option("user", username.value)
.option("password", password.value)
.option("dbtable", dbTable)
.option("numPartitions", numPartitions)
.mode(saveMode)
.save()
this
}

這些實現直接利用了Spark DataFrameReaderDataFrameWriter的JDBC功能。它們將Database物件中儲存的jbdcURLusernamepassword等連接資訊傳遞給Spark。對於多分割區讀寫,還會額外設定partitionColumnnumPartitionsupperBoundlowerBound等選項,以優化資料的平行處理。寫入方法返回this,支援方法鏈接。

透過這些更新,玄貓將能夠執行單分割區和多分割區讀取,將資料儲存到資料表等,如下例所示:

// 單分割區讀取
db.singlePartitionRead(
session,
"(select * from my_db.flight_count order by number_of_flights desc limit 5) qry"
).show()

// 儲存上限和下限
case class MaxMin(max: Int, min: Int)

// 上限和下限
val bounds = db
.records[MaxMin](
"select max(number_of_flights) max, min(number_of_flights) min from my_db.flight_count"
)
.head

// 多分割區讀取
val df = db.multiPartitionRead(
session = session,
dbTable = "my_db.flight_count",
partitionCol = "number_of_flights",
upperBound = bounds.max.toString,
lowerBound = bounds.min.toString,
numPartitions = 10
)

// 將df寫入兩個新資料表
// 一個使用單分割區寫入
// 另一個使用多分割區寫入
db
.singlePartitionWrite(
session,
"my_db.flight_count_2",
df,
SaveMode.Overwrite
)
.multiPartitionWrite(
session,
"my_db.flight_count_3",
df,
numPartitions = 10,
SaveMode.Overwrite
)

這個範例展示了如何利用Database介面中新增的Spark讀寫方法。它首先執行一個單分割區讀取,從flight_count資料表中讀取前5條航班記錄。接著,它使用doobie的records方法查詢flight_count資料表中number_of_flights的最大值和最小值,以確定多分割區讀取的邊界。然後,它執行一個多分割區讀取,將整個flight_count資料表載入到一個DataFrame中,並指定了分割區列、上下限和分割區數量。最後,它將這個DataFrame分別使用單分割區和多分割區寫入方式,以Overwrite模式寫入兩個新的資料表my_db.flight_count_2my_db.flight_count_3。這整個流程清晰地展示了Database介面如何整合doobie的底層資料庫操作和Spark的強大資料處理能力。

第五章:物件儲存與資料湖

數十年來,企業一直嚴重依賴資料庫和資料倉儲。大約在千禧年之交,網際網路時代開始興起。聯網設備的普及開始呈現出傳統資料庫和資料倉儲無法應對的資料量和多樣性。

在開發一個使用這種大量資料的網頁索引解決方案時,Google在2003年發表了一篇名為**Google檔案系統(GFS)**的論文,這篇論文在接下來的二十年裡塑造了業界的解決方案。

這個解決方案促成了資料湖的發展,進而導致了**湖倉一體(Lakehouses)**的出現。資料湖是一種分散式檔案系統,它提供了一種經濟高效的方法來儲存結構化、非結構化和半結構化資料。

縱觀現代數據工程的多元挑戰,將不同特性的數據處理框架進行無縫整合,已成為提升開發效率與系統韌性的關鍵。本文提出的Database API擴展,正是此趨勢下的卓越實踐。它巧妙地將Doobie的精準、型別安全特性(用於邊界查詢等控制面操作)與Spark JDBC的規模化、平行處理能力(用於大數據量的讀寫)封裝於統一介面下。

這種設計不僅大幅簡化了應用層的調用邏輯,更體現了「為特定任務選擇最佳工具」的架構智慧。然而,我們也必須清醒地認識到,儘管此API優化了互動層,其效能瓶頸最終仍受限於傳統關聯式資料庫的單體架構。當數據的量體、速度與多樣性超越了RDBMS所能承載的極限時,即使是最高效的JDBC封裝也將顯得力不從心。

玄貓認為,此整合模式是優化現有RDBMS與Spark協作的成熟方案,但其內在限制也預示著數據基礎設施的下一次演進。這正是我們即將深入探討的——以Google檔案系統為起點,邁向更具擴展性的物件儲存與資料湖時代。