在資料密集型應用盛行的今日,確保資料管線的穩定性和資料品質至關重要。雖然端對端測試能直接驗證整個流程,但其測試時間長且測試覆寫率有限。因此,本文著重探討如何在資料管線中有效運用單元測試,提升程式碼品質和開發效率。單元測試能針對特定程式碼單元進行獨立驗證,快速發現錯誤並降低除錯成本。搭配模擬技術,更能減少對外部服務的依賴,加快測試速度並提升開發效率。此外,本文也將探討如何結合單元測試與整合測試,達到更全面的測試覆寫率。

資料管線單元測試的重要性與實務應用

在資料管線的開發過程中,測試是確保系統穩定性和資料品質的關鍵步驟。端對端測試(end-to-end testing)作為主要的測試策略,具有其吸引力,因為它不需要建立模擬物件(mocks)來模擬資料函式庫或雲端服務的互動;直接使用這些服務及其資料。然而,這種方法的缺點是測試時間較長,且對底層元件的測試覆寫率較低。

負載測試的價值

負載測試可以展示在處理大規模資料時,基礎設施的效能表現,幫助識別潛在的資源不足問題,避免在生產環境中遇到這些問題。有些資料管線中的錯誤只有在處理大量資料時才會出現。例如,曾經有一個處理客戶資料的管線,在遇到一個新的大型客戶資料時出現了問題,因為該客戶的資料具有之前未曾遇到的特性,導致資料轉換需要重新設計以支援大規模處理。

單元測試的需求評估

在探討資料管線單元測試的不同方面之前,讓我們透過一個例子來瞭解如何確定需要單元測試的部分。圖7-2展示了圖7-1中資料管線的一部分,其中「Temp Storage」儲存桶用於促進重試機制。假設臨時資料刪除步驟在生產環境中失敗,導致每次擷取作業後,資料在儲存桶中累積。

評估是否需要設定單元測試時,需要考慮以下幾點:

  • 該程式碼未按預期執行時會有什麼影響?
  • 是否可以透過整合測試充分驗證?
  • 是否可以有意義地測試該單元的功能?
  • 未來程式碼更新導致錯誤的可能性有多大?

單元測試的案例分析

刪除操作的單元測試

考慮「Enrich with social」步驟,它將社交媒體資料函式庫中的資料與「Extract species」步驟的結果進行左連線(left join)。如果刪除操作失敗,且「Enrich with social」讀取了「Temp Storage」儲存桶中的所有資料進行合併,就會導致舊資料與新的資料函式庫匹配結果重新擷取,產生重複的調查資料記錄,從而導致資料品質問題。此外,隨著「Temp Storage」儲存桶中的資料增長,執行連線操作的計算成本也會增加。

測試策略

可以透過以下方式測試刪除操作:

  • 在整合測試中,斷言管線完成時「Temp Storage」儲存桶為空。
  • 透過模擬雲端服務操作來進行單元測試,這樣可以加快測試速度、減少雲端資源的使用,並將測試整合到CI/CD流程中,防止程式碼更新破壞刪除操作。

理想情況下,應該同時使用單元測試和整合測試:單元測試幫助隔離驗證刪除功能,而整合測試則幫助發現基礎設施層面的問題,例如IAM策略不當導致無法刪除儲存桶中的資料。

資料管線中適合單元測試的區域

資料管線執行多種操作,包括資料處理、轉換、驗證、查詢、連線服務、建立/修改/刪除資料、與雲端元件互動,以及提供可觀察性和警示資訊。在制定單元測試計畫時,思考這些不同的區域有助於找出適合單元測試的候選專案。

資料邏輯

任何對流經管線的資料進行操作的程式碼都是單元測試的良好候選物件。這包括資料驗證(如圖7-1中的「Validate data」)和資料轉換程式碼(如「Extract species」和「Enrich with social」)。Lambda函式中識別夜鷺(night heron)資料的程式碼也是需要單元測試的資料邏輯之一。

對於資料轉換程式碼,需要測試不同的輸入資料可能性並驗證結果。通常在開發資料邏輯時,會使用理想的樣本資料來制定正確的程式碼,這些樣本資料模擬了預期的「快樂路徑」(happy path)。然而,除了這些預期的情況外,還可能遇到其他情況,例如:

  • 資料集中是否存在空值?如果存在,程式碼應該如何處理?
  • 資料損壞(corrupt data)也是需要考慮的問題。

監控與驗證的重要性

需要記住的是,測試只是整體策略的一部分,其他部分還包括監控和驗證。試圖在單元測試中模擬每一個邊界案例可能會導致過度設計或陷入困境。其他策略,如在下游管線步驟失敗時引發異常、記錄錯誤資料並觸發警示,可以幫助表面化管線問題。

import unittest
from unittest.mock import MagicMock
import boto3

def delete_temp_data(bucket_name):
    s3 = boto3.client('s3')
    s3.delete_objects(Bucket=bucket_name, Delete={'Objects': [{'Key': 'temp_data.txt'}]})

class TestDataPipeline(unittest.TestCase):

    @unittest.mock.patch('boto3.client')
    def test_delete_temp_data(self, mock_boto3_client):
        # #### 內容解密:
        # 此段程式碼模擬了刪除 S3 儲存桶中特設定檔案的操作。首先,我們使用 `unittest.mock.patch` 裝飾器來模擬 `boto3.client` 方法。
        # 在測試函式內部,我們呼叫 `delete_temp_data` 函式,並傳入儲存桶名稱。
        # 然後,我們斷言 `s3.delete_objects` 方法被正確呼叫,包含預期的引數,如儲存桶名稱和要刪除的物件鍵。
        
        mock_s3 = MagicMock()
        mock_boto3_client.return_value = mock_s3
        delete_temp_data('test-bucket')
        mock_s3.delete_objects.assert_called_once_with(
            Bucket='test-bucket',
            Delete={'Objects': [{'Key': 'temp_data.txt'}]}
        )

if __name__ == '__main__':
    unittest.main()

圖表翻譯:

此圖示展示了調查管線中臨時資料的使用情況。臨時資料儲存在「Temp Storage」儲存桶中,用於在「Enrich with social」步驟失敗時進行重試。該步驟將社交媒體資料函式庫中的資料與「Extract species」步驟的結果進行左連線。如果刪除操作失敗,且「Enrich with social」讀取了「Temp Storage」儲存桶中的所有資料,就會導致舊資料與新的匹配結果重新擷取,從而產生重複的調查資料記錄。

此範例突顯了在設計和實施資料管線時仔細考慮各種可能的失敗案例以及相關風險的重要性,同時結合適當的監控和驗證機制,以確保資料品質和系統可靠性。透過適當的單元測試和整合測試,可以有效地發現並解決潛在問題,從而提升整體資料處理流程的穩健性。

單元測試在資料管線中的重要性與實踐

在軟體開發的過程中,單元測試是確保程式碼品質的重要環節。尤其是在資料管線的開發中,單元測試能夠幫助開發者驗證程式碼的正確性、提高程式碼的可維護性,並且促進團隊成員之間的溝通。

在修正程式碼前先進行失敗測試

根據《Pragmatic Programmer》一書中的建議,在嘗試修復錯誤之前,應先建立針對該錯誤的測試案例。這種做法同樣適用於資料驗證的程式碼。開發者應該設計涵蓋驗證檢查透過與失敗兩種情況的測試案例。

範例程式碼與測試

def validate_data(data):
    # 簡化的資料驗證邏輯
    if 'required_field' not in data:
        return False
    return True

def test_validate_data():
    # 測試資料驗證函式
    invalid_data = {'optional_field': 'value'}
    assert not validate_data(invalid_data)
    
    valid_data = {'required_field': 'value'}
    assert validate_data(valid_data)

內容解密:

  1. validate_data 函式:此函式檢查輸入的 data 是否包含必要的欄位。如果缺少必要的欄位,則傳回 False,否則傳回 True
  2. test_validate_data 函式:這是一個單元測試案例,用於驗證 validate_data 函式的正確性。它檢查了兩種情況:當資料缺少必要欄位時,函式應該傳回 False;當資料包含必要欄位時,函式應該傳回 True

連線外部服務的測試

在資料管線中,與外部服務(如資料函式庫、API、雲端服務等)的連線是潛在的故障點。除了整合測試外,單元測試也能夠幫助驗證這些連線的行為。

考慮可能的錯誤情況

  • 當 Geocoding 服務無法存取時,系統應該如何反應?
  • 如果服務傳回錯誤程式碼或429(請求過多)的狀態碼,應該如何處理?
  • 如果回應中缺少郵政編碼,該如何處理?

範例程式碼與測試

import requests

def get_zip_code(address):
    try:
        response = requests.get(f'https://geocoding-service.com/zipcode?q={address}')
        response.raise_for_status()
        return response.json().get('zip_code')
    except requests.RequestException as e:
        # 處理請求異常
        return None

def test_get_zip_code():
    # 測試取得郵政編碼的函式
    # 模擬成功的回應
    assert get_zip_code('valid_address') is not None
    
    # 模擬服務無法存取的情況
    # 這裡可以使用mock來模擬requests.get的行為
    assert get_zip_code('invalid_address') is None

內容解密:

  1. get_zip_code 函式:此函式向 Geocoding 服務傳送請求以取得指定地址的郵政編碼。它處理了請求過程中可能出現的異常。
  2. test_get_zip_code 函式:這個測試案例驗證了 get_zip_code 函式在不同情況下的行為,包括服務存取成功和失敗的情況。

可觀察性與日誌檢查

單元測試也可以用於驗證日誌訊息和監控資訊的正確性。例如,可以使用 caplog pytest fixture 來檢查日誌訊息中是否包含重要的除錯資訊。

範例程式碼與測試

import logging

def process_data(data):
    try:
        # 處理資料的邏輯
        logging.info('Data processed successfully')
    except Exception as e:
        logging.error(f'Error processing data: {e}')

def test_process_data(caplog):
    # 測試資料處理函式的日誌輸出
    process_data({'valid': 'data'})
    assert 'Data processed successfully' in caplog.text
    
    # 可以新增更多的測試案例來檢查錯誤情況下的日誌輸出

內容解密:

  1. process_data 函式:此函式處理資料並記錄相應的日誌訊息。
  2. test_process_data 函式:這個測試案例使用 caplog fixture 來捕捉日誌輸出,並驗證是否包含預期的訊息。

資料修改操作的測試

對於涉及資料建立、刪除和修改的操作,單元測試需要能夠模擬或實際操作這些資料。

範例程式碼與測試

def save_data_to_storage(data, storage_bucket):
    # 將資料儲存到指定的儲存桶中
    storage_bucket.upload(data)

def test_save_data_to_storage():
    # 測試儲存資料到儲存桶的功能
    test_bucket = create_test_bucket()  # 假設有這個函式來建立測試用的儲存桶
    test_data = {'key': 'value'}
    save_data_to_storage(test_data, test_bucket)
    assert test_bucket.contains(test_data)

內容解密:

  1. save_data_to_storage 函式:此函式將資料儲存到指定的雲端儲存桶中。
  2. test_save_data_to_storage 函式:這個測試案例驗證了 save_data_to_storage 函式能夠正確地將資料儲存到儲存桶中。

資料管道單元測試的最佳實踐

減少測試的依賴性與提升效率

在資料管道的開發過程中,我們面臨了一個核心挑戰:如何在確保測試全面性的同時,減少對外部服務的依賴並提升測試效率。我們的團隊在處理一個涉及多個雲端元件的核心流程時,最初嘗試使用整合測試來驗證行為。然而,這種方法需要建立舊有的表格來測試淘汰流程,並且需要組態引數初始值並驗證其在「更新指標」步驟中的變化。由於需要啟動 EMR 叢集來執行作業,這種測試方式不僅設定複雜,而且耗時較長。

為了降低開銷並加快測試流程,我們為 DynamoDB 和引數儲存建立了模擬(mock)。這種方法使我們能夠進行單元測試,將測試執行時間從半小時縮短到幾秒鐘。透過模擬,我們可以建立所需的任何狀態,並驗證程式碼是否正確地建立、修改或刪除了 DynamoDB 表格和引數儲存中的內容。

處理依賴關係

在資料管道測試中,我們需要在最小化依賴的同時管理眾多的依賴關係。如圖 7-5 所示,我們的團隊最初嘗試使用 DynamoDB 進行測試,但最終決定用模擬來替代這個依賴。

在將資料管道分解為單元進行測試時,我們需要考慮每個單元執行的環境,包括與雲端服務、資料來源和接收端以及測試所需的資料的互動。在前期進行充分的思考,將使我們意識到需要使用或替換哪些依賴項。

以「豐富社交資料」步驟為例(如圖 7-6 所示),這個步驟涉及多個依賴關係,包括從「提取物種」步驟接收資料、與 HoD 資料函式庫進行資料聯結,並將結果儲存在「豐富調查資料」儲存桶中。如果發生錯誤,該步驟會使用來自臨時儲存桶的資料重試計算。

介面

介面突出了執行單元測試所需的元件或連線。介面包括資料來源、資料儲存位置、使用的服務以及管道階段之間的過渡。在「豐富社交資料」步驟中,介面涉及雲端儲存、HoD 資料函式庫和「提取物種」步驟。

資料

考慮單元與之互動的資料將有助於確定如何測試以及需要建立什麼樣的測試資料。「豐富社交資料」步驟使用了來自「提取物種」步驟和 HoD 資料函式庫的資料。在思考資料依賴時,我們需要考慮需要什麼樣的資料來執行被測程式碼。例如,為了測試資料聯結操作,我們需要在兩個資料來源之間有共同的資料。

單元測試計劃示例

識別待測元件

首先,根據「管道區域單元測試」的指導原則,確定資料管道中哪些部分需要進行單元測試。對於資料邏輯,任何在資料透過管道時修改資料的程式碼都需要被測試,例如「驗證資料」和「取得郵政編碼」。

def validate_data(data):
    # 驗證資料邏輯
    if 'required_field' not in data:
        return False
    return True

def get_zip_code(api_response):
    # 從 API 回應中提取郵政編碼
    try:
        zip_code = api_response['zip_code']
        return zip_code
    except KeyError:
        return None

內容解密:

  1. validate_data 函式檢查輸入資料是否包含必要的欄位。如果缺少必要的欄位,則傳回 False,否則傳回 True
  2. get_zip_code 函式嘗試從 API 回應中提取郵政編碼。如果 API 回應中包含 zip_code 欄位,則傳回該值;否則傳回 None

圖表說明

@startuml
skinparam backgroundColor #FEFEFE
skinparam defaultTextAlignment center
skinparam rectangleBackgroundColor #F5F5F5
skinparam rectangleBorderColor #333333
skinparam arrowColor #333333

title 圖表說明

rectangle "資料" as node1
rectangle "豐富後資料" as node2
rectangle "錯誤發生" as node3
rectangle "重試成功" as node4

node1 --> node2
node2 --> node3
node3 --> node4

@enduml

圖表翻譯: 此圖展示了「豐富社交資料」步驟的流程。首先,「提取物種」步驟將資料傳遞給「豐富社交資料」。然後,「豐富社交資料」將豐富後的資料儲存在「豐富調查資料」儲存桶中。如果發生錯誤,它會從臨時儲存桶中重試計算,並在成功後將結果儲存。