Deequ 作為一個根據 Spark 的開源函式庫,提供了一套機制來確保資料的正確性和完整性。它與 AWS 生態系統緊密整合,具備高擴充套件性,並支援狀態計算和內建異常檢測功能。然而,Scala 的學習曲線、有限的整合測試適用性和缺乏直觀的使用者介面是其主要限制。在實際應用中,可以利用 Deequ 的 VerificationSuiteCheck 類別定義各種檢查條件,例如資料行數、欄位完整性、唯一性等。此外,Apache Airflow 可用於建立資料品品檢查和斷路器,透過 SLA Miss Callback 和 SQL 檢查運算元等功能,實作主動監控和預警。同時,結合機器學習演算法和統計方法,可以進一步提升資料監控和異常檢測的能力,確保資料的可靠性。

運用 Deequ 進行資料品質驗證與單元測試

在資料工程領域中,資料品質的管理至關重要。Deequ 是由 Amazon 開發的開源函式庫,專門用於資料品質驗證和單元測試。它根據 Apache Spark,並提供了一套完善的機制來檢查資料的正確性和完整性。

Deequ 的優勢

  1. 與 AWS 的整合: Deequ 能夠無縫地與 AWS 生態系統整合,特別是在使用 AWS Glue 時。由於其與 AWS 的緊密整合,對於已經大量採用 AWS 技術堆疊的團隊來說,Deequ 成為了一個非常有吸引力的選擇。

  2. 高擴充套件性: 由於 Deequ 是根據 Scala 開發的,它能夠充分利用 Scala 的任務協調和平行處理能力,使其在大規模資料集上表現出色。資料儲存在 Scala DataFrames 中,這使得 Deequ 能夠高效地處理大資料。

  3. 狀態計算: Deequ 能夠計算指標的後設資料,並將這些後設資料儲存起來。當有新的資料進入時,它能夠增量式地重新計算關鍵指標。對於動態且龐大的資料流,這一特性使得 Deequ 尤其具有優勢。

  4. 內建異常檢測: Deequ 的一大亮點是其內建的異常檢測功能。相較於 Great Expectations 這類別工具,Deequ 的異常檢測更加深入,能夠檢測執行中的指標平均值和偏差,提供了額外的精確度。

Deequ 的限制

  1. Scala 的學習曲線: 對於非資料工程領域的人員來說,Scala 並不是一門容易上手的語言。雖然 PyDeequ 為 Python 使用者提供了一個解決方案,但對於習慣使用 Python 的資料科學家來說,直接使用 Deequ 仍可能存在一定的門檻。

  2. 有限的整合測試適用性: 與 dbt testing 不同,Deequ 主要針對單元測試設計,雖然它能夠靈活地對任意批次的資料進行測試,但要將其用於類別似整合測試的場景,可能需要投入更多的開發時間。

  3. 缺乏直觀的使用者介面: Deequ 的設計更側重於功能性而非介面美觀。對於那些依賴於視覺化報表(如 Great Expectations 的 Data Docs)的組織來說,Deequ 可能顯得有些簡陋。

使用 Deequ 進行單元測試的範例

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}

val verificationResult = VerificationSuite()
  .onData(data)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ == 5) // 預期資料有5行
      .isComplete("id") // id欄位不應為NULL
      .isUnique("id") // id欄位應唯一
      .isComplete("productName") // productName欄位不應為NULL
      .isContainedIn("priority", Array("high", "low")) // priority欄位只應包含"high"和"low"
      .isNonNegative("numViews") // numViews欄位不應包含負值
      .containsURL("description", _ >= 0.5) // 至少一半的description欄位應包含URL
      .hasApproxQuantile("numViews", 0.5, _ <= 10) // 半數的numViews應小於或等於10
  )
  .run()

內容解密:

  1. 匯入必要的套件:首先,我們匯入了 Deequ 的 VerificationSuiteCheck 類別,這些是進行資料驗證的核心元件。

  2. 建立驗證套件:透過 VerificationSuite() 建立一個驗證例項,這是定義資料檢查的入口點。

  3. 指定待測資料:使用 .onData(data) 將待測資料傳入驗證套件中。

  4. 新增檢查條件:透過 .addCheck() 新增檢查條件。在這個範例中,我們定義了一系列的檢查,例如資料行數、欄位完整性、唯一性、數值範圍等。

  5. 執行驗證:最後,呼叫 .run() 執行驗證套件,Deequ 會將這些檢查轉換為一系列的 Spark 任務並執行,傳回驗證結果。

利用 Apache Airflow 管理資料品質

除了使用 Deequ 進行資料品質驗證外,Apache Airflow 也是一個非常有力的工具,可以用來建立資料品品檢查和斷路器。Apache Airflow 能夠以程式設計的方式編寫、排程和監控資料工作流程,提供了在資料管線中加入多個檢查點的能力。

排程服務層級協定(SLA)

在 Apache Airflow 中,可以為任務設定 SLA(Service Level Agreement),定義任務的最長執行時間。如果任務超出這個時間,就會被標記為“SLA Missed”,並且可以透過各種通知管道(如 Slack、電子郵件等)進行通知。

from datetime import timedelta

# 設定SLA為1小時
sla = timedelta(hours=1)

# 定義sla_miss_callback
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    # 自定義SLA錯過時的處理邏輯
    pass

# 在Task中使用sla和sla_miss_callback
my_task = PythonOperator(
    task_id='my_task',
    python_callable=my_python_callable,
    sla=sla,
    sla_miss_callback=sla_miss_callback
)

內容解密:

  1. 匯入必要的模組:首先,我們從 datetime 模組匯入 timedelta,用於定義時間間隔。

  2. 定義SLA:透過 timedelta 定義了一個 SLA,例如設定為1小時。

  3. 定義SLA錯過回呼函式:定義了一個 sla_miss_callback 函式,用於自定義當 SLA 被錯過時的處理邏輯。

  4. 在任務中使用SLA:在定義 Apache Airflow 任務時,將定義好的 SLA 和回呼函式傳入相應的引數中。

使用Apache Airflow進行資料品質管理

在現代資料驅動的企業中,資料品質的管理至關重要。資料下線(data downtime)可能導致嚴重的業務中斷和財務損失。Apache Airflow作為一個強大的工作流程管理工具,不僅能夠協調資料處理流程,還能整合資料品品檢查,以主動預防資料問題的發生。

SLA Miss Callback:主動監控資料處理流程

Apache Airflow提供了SLA Miss Callback功能,允許使用者定義當任務未在預定時間內完成時觸發的回撥函式。這使得團隊能夠及時發現並回應資料處理中的延遲或失敗。

SLA Miss Callback範例程式碼

from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.decorators import task

def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(
        "The callback arguments are: ",
        {
            "dag": dag,
            "task_list": task_list,
            "blocking_task_list": blocking_task_list,
            "slas": slas,
            "blocking_tis": blocking_tis,
        },
    )

@dag(
    schedule_interval="*/2 * * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    sla_miss_callback=sla_callback,
    default_args={'email': "email@example.com"},
)
def example_sla_dag():
    @task(sla=timedelta(seconds=10))
    def sleep_20():
        """Sleep for 20 seconds"""
        time.sleep(20)

    @task
    def sleep_30():
        """Sleep for 30 seconds"""
        time.sleep(30)

    sleep_20() >> sleep_30()

dag = example_sla_dag()

內容解密:

  1. sla_callback函式:定義了一個回撥函式,當任務錯過SLA時會被呼叫。該函式列印出相關的回撥引數,包括DAG、任務列表、阻塞任務列表、SLA物件和阻塞的任務例項。
  2. @dag裝飾器:定義了一個DAG,設定了排程間隔、開始日期、是否補跑歷史任務、SLA Miss Callback函式以及預設引數。
  3. sleep_20sleep_30任務:兩個簡單的任務,分別睡眠20秒和30秒。sleep_20任務設定了SLA為10秒,如果該任務執行時間超過10秒,將觸發SLA Miss Callback。

電路斷路器(Circuit Breaker)機制:預防資料品質問題

電路斷路器是一種設計模式,用於防止系統在檢測到潛在問題時繼續執行可能導致更大問題的操作。在資料處理流程中,電路斷路器可以在檢測到資料品質問題時暫停資料管道的執行,從而防止錯誤資料的傳播。

電路斷路器運作狀態

  • 電路關閉(Circuit Closed):資料正常流經管道。
  • 電路開啟(Circuit Open):資料流被中斷,管道停止執行。

實作電路斷路器的關鍵要素

  1. 資料血緣(Data Lineage):瞭解資料的來源、處理過程和去向。
  2. 資料剖析(Data Profiling):對資料進行統計分析,以識別潛在的品質問題。
  3. 自動觸發機制:根據資料剖析的結果自動觸發電路斷路器。

在Apache Airflow中安裝電路斷路器

要在Apache Airflow中實作電路斷路器,可以採用以下方法:

  1. 設定DAG的catchup引數為False
  2. 使用LatestOnlyOperator運算元來控制DAG的執行。
  3. 編寫自定義的Python程式碼,在資料品品檢查失敗時觸發電路斷路器。

SQL檢查運算元:主動檢查資料品質

SQL檢查運算元是Apache Airflow中用於檢查資料品質的另一種機制。它允許使用者定義SQL查詢來驗證資料是否符合預期的品質標準。

SQL檢查運算元範例

from airflow.providers.common.sql.operators.sql import SQLCheckOperator

SQLCheckOperator(
    task_id="orange_card_data_row_quality_check",
    sql="row_quality_blue_bank_data_check.sql",
    params={"dropoff_datetime": "2021-01-01"},
)

內容解密:

  1. SQLCheckOperator:建立了一個SQL檢查運算元,用於執行指定的SQL查詢來檢查資料品質。
  2. task_id:該任務的唯一識別碼。
  3. sql:要執行的SQL查詢檔案。
  4. params:傳遞給SQL查詢的引數。

資料監控與異常檢測:確保資料可靠性的關鍵技術

隨著資料驅動的決策在企業中的重要性日益增加,資料的可靠性成為了企業成功的關鍵因素之一。資料異常可能源於多種原因,包括系統故障、人為錯誤或外部因素。為了應對這些挑戰,資料監控和異常檢測技術應運而生。本文將探討資料監控和異常檢測的核心概念、技術原理以及如何在實際資料管道中實施這些技術。

為什麼需要資料監控和異常檢測?

在資料管道的各個階段,資料可能因為多種原因而出現問題。無論是在資料收集、轉換還是儲存階段,資料問題都可能導致下游的分析結果錯誤,進而影響企業的決策。傳統的資料測試和品品檢查雖然能夠解決部分已知問題,但對於未知的異常情況卻無能為力。這時,資料監控和異常檢測技術就顯得尤為重要。

異常檢測的核心概念

異常檢測是指識別出資料中不符合預期模式的事件或觀察結果。這些異常可能由多種原因引起,例如技術故障、資料漂移或系統變更。異常檢測技術透過分析資料的歷史行為,建立預期模型,從而識別出與這些預期不符的資料。

常見的異常型別

  1. 資料量異常:資料量的突然增加或減少,可能指示資料收集或處理過程中的問題。
  2. 資料更新頻率異常:資料更新頻率的變化,可能指示資料來源或處理流程中的問題。
  3. 資料分佈異常:資料分佈的變化,例如某些欄位的數值範圍或類別分佈發生變化,可能影響下游分析結果。
  4. 結構異常:資料結構的變化,例如欄位名稱或資料型別的變更,可能導致資料處理流程的中斷。

如何實施資料監控和異常檢測?

實施資料監控和異常檢測需要結合技術工具和業務理解。以下是一些關鍵步驟:

  1. 定義監控指標:根據業務需求和資料特性,定義需要監控的指標,例如資料量、更新頻率、資料分佈等。
  2. 選擇合適的工具:選擇適合的技術工具來實施監控和異常檢測,例如使用機器學習演算法或統計方法來識別異常。
  3. 建立預警機制:當檢測到異常時,能夠及時通知相關人員進行處理。
  4. 持續最佳化和調整:根據監控結果和業務變化,不斷最佳化和調整監控策略。

實施資料監控和異常檢測的技術細節

使用機器學習進行異常檢測

機器學習是實施異常檢測的一種強大技術。透過訓練模型學習資料的正常模式,機器學習模型能夠有效地識別出異常資料。常見的機器學習演算法包括 Isolation Forest、One-Class SVM 和 Autoencoders。

from sklearn.ensemble import IsolationForest
import pandas as pd

# 假設 df 是我們的資料框
df = pd.read_csv('data.csv')

# 初始化 Isolation Forest 模型
model = IsolationForest(contamination=0.01)

# 擬合模型
model.fit(df)

# 預測異常
df['anomaly'] = model.predict(df)

# 列出異常資料
anomalies = df[df['anomaly'] == -1]
print(anomalies)

統計方法進行異常檢測

統計方法透過建立資料的統計模型來檢測異常。例如,可以使用統計過程控制(SPC)中的控制圖來監控資料的變化。

import numpy as np
import matplotlib.pyplot as plt

# 假設 data 是我們的資料序列
data = np.random.normal(size=1000)

# 計算平均值和標準差
mean = np.mean(data)
std_dev = np.std(data)

# 繪製控制圖
plt.plot(data)
plt.axhline(mean, color='red', linestyle='--')
plt.axhline(mean + 3*std_dev, color='green', linestyle='--')
plt.axhline(mean - 3*std_dev, color='green', linestyle='--')
plt.show()

#### 內容解密:

上述 Python 程式碼展示瞭如何使用 Isolation Forest 演算法和統計方法進行異常檢測。Isolation Forest 是一種無監督學習演算法,透過構建多個決策樹來隔離異常點。統計方法則透過建立資料的統計模型來檢測異常,例如使用控制圖監控資料是否超出預期範圍。

實施資料監控和異常檢測的挑戰

  1. 資料複雜性:現代資料管道通常涉及多種資料來源和複雜的處理流程,這增加了監控和異常檢測的難度。
  2. 動態環境:業務需求和資料特性可能會隨時間變化,需要監控策略能夠動態調整。
  3. 假陽性和假陰性:異常檢測系統需要平衡假陽性(將正常資料誤判為異常)和假陰性(將異常資料誤判為正常)的風險。

隨著技術的進步,資料監控和異常檢測技術將變得更加智慧和自動化。未來,我們可以預見以下幾個發展趨勢:

  1. 更先進的機器學習技術:新的機器學習演算法和模型將進一步提高異常檢測的準確性和效率。
  2. 自動化監控:監控系統將變得更加自動化,能夠自動調整引數和策略以適應變化的資料環境。
  3. 整合多源資料:未來的監控系統將能夠整合來自多個來源的資料,提供更全面的監控視野。

透過不斷地技術創新和實踐,企業將能夠更好地應對資料挑戰,確保資料的可靠性和可用性。