在資料工程領域中,構建穩健且可擴充套件的資料管線至關重要。本文將探討如何利用模組化設計和依賴反轉原則來提升資料管線的彈性、可維護性和可擴充套件性。我們將以 Python 和 PySpark 程式碼為例,示範如何將資料儲存邏輯抽象化,並透過依賴注入的方式支援多種雲端儲存平台,例如 AWS S3 和 Google Cloud Storage。同時,我們也將探討如何利用抽象儲存介面簡化單元測試,並根據不同的佈署環境動態選擇合適的儲存機制,提升程式碼的靈活性。
軟體開發策略:模組化設計與依賴反轉
在資料處理流程的開發中,模組化設計與依賴反轉是兩個重要的軟體開發策略。這些策略能夠提升程式碼的彈性、可維護性和可擴充套件性。
模組化設計的實踐
模組化設計的核心思想是將程式碼分解成獨立、可重複使用的模組。在資料處理流程中,這意味著將不同的功能,如資料擷取、轉換和儲存,分離成不同的模組。
例子:儲存資料到 S3
def write_to_s3(data, key):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket="bucket_name", Key=key, Body=gzip.compress(data))
內容解密:
write_to_s3函式封裝了將資料寫入 S3 的邏輯。- 使用
boto3客戶端與 S3 互動。 - 資料在寫入前使用
gzip壓縮,以減少儲存空間。
依賴反轉原則
依賴反轉原則強調應該依賴於抽象,而不是具體實作。這使得程式碼能夠更容易地適應變化的需求。
例子:抽象儲存介面
from abc import ABC, abstractmethod
class AbstractStorage(ABC):
@abstractmethod
def add(self, data, id):
pass
內容解密:
AbstractStorage是一個抽象基礎類別,定義了儲存資料的介面。add方法是抽象的,需要由子類別實作。
實作 S3 儲存
class S3Storage(AbstractStorage):
def __init__(self, bucket):
self.s3 = boto3.client('s3', region_name='us-east-1')
self.bucket = bucket
def add(self, data, id):
self.s3.put_object(Bucket=self.bucket, Key=id, Body=data)
內容解密:
S3Storage是AbstractStorage的一個實作,用於將資料儲存到 S3。add方法實作了將資料寫入 S3 的邏輯。
多雲支援的實作
透過依賴反轉原則,可以輕鬆地支援多個雲端服務提供者。
例子:GCS 儲存實作
class GCSStorage(AbstractStorage):
def __init__(self, bucket):
self.gcs = storage.Client()
self.bucket = self.gcs.bucket(bucket)
def add(self, data, id):
blob = self.bucket.blob(id)
blob.upload_from_string(data)
內容解密:
GCSStorage是另一個AbstractStorage的實作,用於將資料儲存到 Google Cloud Storage。add方法實作了將資料上傳到 GCS 的邏輯。
動態選擇儲存機制
class CloudStorage(AbstractStorage):
def __init__(self, deploy):
if deploy == 'gcs':
self.cloud_storage = GCSStorage()
elif deploy == 'aws':
self.cloud_storage = S3Storage()
def add(self, data, id):
self.cloud_storage.add(data, id)
內容解密:
CloudStorage類別根據佈署環境動態選擇合適的儲存機制。add方法委託給所選的儲存機制執行。
模組化設計與資料儲存抽象化
在開發資料處理管線時,模組化設計是確保系統可維護性、可擴充套件性和可測試性的關鍵。透過將系統分解為獨立的模組,可以更輕鬆地管理和更新個別元件,而不會影響整個系統的功能。
資料儲存抽象化
在處理資料儲存時,抽象化是實作模組化設計的重要技術。透過定義一個抽象基礎類別(如 AbstractStorage),可以為不同的資料儲存解決方案提供統一的介面。這使得開發者能夠在不同的雲端儲存服務或資料函式庫之間切換,而無需修改主要的業務邏輯程式碼。
實作範例:資料函式庫儲存
以下是一個將資料儲存到資料函式庫的 DatabaseStorage 類別實作範例:
class DatabaseStorage(AbstractStorage):
def __init__(self, connection_string, model_cls):
self.engine = create_engine(connection_string)
self.model_cls = model_cls
def add(self, data, id):
data['id'] = id
with Session(self.engine) as session:
model_inst = self.model_cls(**data)
session.add(model_inst)
session.commit()
在這個範例中,DatabaseStorage 類別繼承自 AbstractStorage,並實作了 add 方法以將資料儲存到指定的資料函式庫表中。
資料模型定義
為了使用 DatabaseStorage,需要定義一個與資料函式庫表對應的資料模型類別,如下所示:
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class AggData(Base):
__tablename__ = "aggregate_data"
id = Column(Integer, primary_key=True)
description = Column(String)
species = Column(String)
使用範例
要使用 DatabaseStorage 儲存聚合資料,可以按照以下方式建立 ProcessBirdData 例項:
bird_data_process = ProcessBirdData(DatabaseStorage(connection_string, AggData))
bird_data_process.create_aggregate_data(data)
測試與模擬
模組化設計的另一個好處是它使得測試變得更加容易。透過建立模擬物件(如 MockStorage),可以測試業務邏輯而不必依賴於實際的資料儲存服務。
MockStorage 實作範例
以下是一個簡單的 MockStorage 類別實作,用於測試目的:
class MockStorage(AbstractStorage):
def __init__(self, bucket):
self.bucket = bucket
def add(self, data, id):
print(f"Would have written {data} to {self.bucket} at {id}")
return (data, f"{self.bucket}/{id}")
測試範例
使用 MockStorage,可以撰寫測試案例來驗證 ProcessBirdData 的行為:
def test_bird_data_process(test_data, expected_data, expected_path):
storage = MockStorage("bucket_name")
bird_data_process = ProcessBirdData(storage)
result, object_path = bird_data_process.create_aggregate_data(test_data)
assert result == expected_data
assert object_path == expected_path
何時進行重構
當程式碼變得難以維護或擴充套件時,重構是必要的。一些跡象表明需要重構,例如:
- 程式碼中存在緊密耦合,使得修改變得困難。
- 資料結構不明確或過於通用,導致程式碼難以理解和維護。
- 無法進行單元測試,或需要連線到實時服務才能執行測試。
使用 DataFrame 的模組化設計
DataFrame 提供了一種功能性的介面,使得操作鏈式呼叫成為可能。這種設計既靈活又具有可擴充套件性。例如,以下是使用 PySpark DataFrame 的一個範例:
r_species = f".*({'|'.join(species_list)}).*"
df = (spark
.read.json('s3://bird_bucket/bird_data.json')
.withColumn("description_lower", f.lower('description'))
.withColumn("species", f.regexp_extract('description_lower', r_species, 1))
.drop("description_lower")
.groupBy("species")
.agg({"count":"sum"})
.write.mode("overwrite").json("s3://bird_bucket/result.json")
)
模組化設計與可組態設計在資料處理流程中的應用
在處理複雜的資料轉換任務時,如何設計出模組化且可組態的程式碼是提升開發效率和可維護性的關鍵。本篇文章將探討如何透過模組化設計和可組態設計來改善資料處理流程,並以具體的 PySpark 程式碼範例來說明其優勢。
模組化設計的重要性
當資料處理流程變得越來越複雜時,將整個流程分解成多個獨立的模組可以顯著提高程式碼的可讀性和可測試性。以下是一個使用 PySpark 處理 JSON 資料的範例:
def apply_species_label(species_list, df):
r_species = f".*({'|'.join(species_list)}).*"
return (df
.withColumn("description_lower", f.lower('description'))
.withColumn("species", f.regexp_extract('description_lower', r_species, 1))
.drop("description_lower")
)
def create_aggregate_data(df):
return (df
.groupBy("species")
.agg({"count":"sum"})
)
內容解密:
apply_species_label函式接收一個物種列表和一個 DataFrame,根據描述欄位提取物種名稱。- 使用
withColumn方法建立一個新的欄位description_lower,將description欄位轉換為小寫。 - 使用正規表示式提取物種名稱並存入
species欄位。 - 最後刪除臨時的
description_lower欄位。
- 使用
create_aggregate_data函式對 DataFrame 進行分組匯總,計算每個物種的總數。- 使用
groupBy方法根據species欄位進行分組。 - 使用
agg方法匯總每個物種的count欄位總和。
- 使用
透過模組化設計提升可測試性
將資料轉換邏輯分解成獨立的函式後,可以對每個函式進行獨立測試,大大簡化了測試案例的編寫和維護。
def test_species_label(spark_context):
data = [{'description': "Saw a night heron"}]
data_df = spark_context.parallelize(data).toDF()
species_list = ['night heron']
result_df = apply_species_label(species_list, data_df)
result = result_df.toPandas().to_dict('list')
assert result['species'][0] == 'night heron'
內容解密:
- 測試
apply_species_label函式,驗證其是否能正確提取物種名稱。- 建立一個測試資料集,包含一筆描述為 “Saw a night heron” 的資料。
- 將測試資料轉換為 DataFrame 並呼叫
apply_species_label函式。 - 將結果轉換為 Pandas DataFrame 並驗證
species欄位的值是否正確。
可組態設計的優勢
透過使用外部組態來控制資料處理流程,可以在不修改程式碼的情況下新增或修改功能。例如,可以根據組態動態生成 Airflow DAGs 和任務。
def apply_species_label(species_list, df):
return df.withColumn(
"species",
f.when(f.col("species_conf") > 0.8, f.col("ml_species"))
.otherwise(<previous implementation>)
)
內容解密:
- 更新
apply_species_label函式,使其根據機器學習模型的信心度決定是否使用模型的預測結果。- 使用
when和otherwise方法根據species_conf欄位的值選擇不同的處理邏輯。 - 當信心度大於 0.8 時,使用機器學習模型的預測結果;否則,使用之前的實作邏輯。
- 使用
軟體開發策略在資料管線中的應用
在建立資料管線的過程中,軟體開發策略扮演著至關重要的角色。為了成功地應用可組態化技術,需要滿足兩個條件:
- 目標流程可以被表達為組態:這意味著我們需要能夠將流程的變化部分抽象出來,形成可組態的引數。
- 程式碼能夠支援所需的組態水平:這要求我們的程式碼具有足夠的靈活性,能夠根據不同的組態進行調整。
以玄貓鳥類別辨識服務為例
讓我們透過一個例子來瞭解如何評估這些條件並建立可組態的管線。假設玄貓團隊決定提供蒼鷺辨識即服務(HIaaS),允許公司將自己的資料帶到服務中進行物種提取,然後儲存在客戶選擇的資料函式庫中。
關鍵步驟的可組態化
在 HIaaS 管線中,「提取物種」步驟可以根據客戶的不同進行組態。這裡的變化量包括源資料桶和輸出資料函式庫。
def run_extract_species(bucket, database):
source_data = read_from_bucket(bucket)
extracted = extract_species(source_data)
store_data(extracted, database)
構建可組態的程式碼函式庫
為了使程式碼函式庫能夠根據桶和資料函式庫資訊進行組態,我們可以利用 AbstractStorage 類別的不同實作來支援不同的雲端儲存提供商和資料函式庫型別。
def run_extract_species(bucket, db, customer_id):
raw = get_data(bucket)
extracted = extract_species(raw)
connection_string = get_connection(customer_id, db)
store_data(extracted, connection_string)
組態示例
{
"customer_id": "1235",
"bucket": "gs://bestco/bird_data",
"db": "postgres"
}
多客戶組態管理
configs = [
{"customer_id": "1235", "bucket": "gs://bestco/bird_data", "db": "postgres"},
{"customer_id": "3423", "bucket": "gs://for_the_birds", "db": "mysql"},
{"customer_id": "0953", "bucket": "s3://dtop324", "db": "postgres"},
]
for config in configs:
run_extract_species(**config)
重要注意事項
- 每個對
run_extract_species的呼叫都會生成一個新的 DAG 執行,這意味著每個客戶的提取過程都是獨立執行的。 - 確保組態不會超出基礎設施的承載能力。在新增新組態之前,評估客戶源資料的大小是非常重要的。
重點回顧
- 模組化設計:透過建立離散的構建塊,使程式碼更具可擴充套件性、易於測試和除錯。
- 可組態化:將變化部分抽象為組態引數,使管線能夠根據不同需求進行調整。
- 資源評估:在新增新組態之前,評估對資源的影響,以避免超出基礎設施承載能力。
透過遵循這些策略,玄貓團隊能夠有效地管理和擴充套件其資料管線,以滿足不斷變化的業務需求。