在資料串流平台如 AWS Kinesis 和 Apache Kafka 中,即使資料處於原始狀態,仍可透過內建機制和自定義邏輯進行監控和品品檢查。AWS Kinesis 可利用 Lambda 函式進行預處理和資料驗證,例如檢查資料格式或特定條件。Apache Kafka 則可透過 JMX 規範取得串流指標,並使用 JConsole 或程式碼直接存取指標資料。分析資料轉換 (ETL) 過程中的資料品質確保也至關重要。ETL 的三個步驟:提取、轉換和載入,其中轉換步驟最為關鍵,涉及資料清理、格式轉換、欄位計算等操作。確保 ETL 流程的資料品質,需要測試和警示系統,例如 dbt、WhereScape 或 Informatica 等工具。資料測試在早期發現資料品質問題方面至關重要,常見的測試包括空值檢查、數量驗證、分佈分析、唯一性檢查和已知不變數驗證。dbt 和 Great Expectations 是常用的資料測試工具,dbt 提供 SQL 驅動的單元測試和通用測試,而 Great Expectations 則使用 Python 定義資料驗證邏輯。
管理運算元據轉換於AWS Kinesis與Apache Kafka
在處理運算元據轉換時,雖然資料處於原始狀態,但並不意味著完全無法監控。許多資料串流和處理應用程式提供內建的警示功能,並且可以根據需求組態更複雜的警示。以下章節將介紹一些具體的技術範例,展示如何在流行的資料處理工具中實作內建的資料品品檢查。
AWS Kinesis
AWS Kinesis串流是由AWS Lambda函式管理的。你可以組態Lambda函式進行各種預處理任務,它們的普遍性使得資料品質保證可以內建於這些預處理過程中。AWS Lambda函式可以用.NET(PowerShell、C#)、Go、Java、Node.js、Python和Ruby等語言撰寫,只需將其上傳到AWS控制檯即可呼叫。
要將AWS Lambda連線到正在執行的AWS Kinesis例項,你需要在Kinesis應用程式頁面中選擇“連線到來源”,然後選擇“使用AWS Lambda進行記錄預處理”。在那裡,你將有機會建立一個新的Lambda函式,該函式在任何應用程式SQL程式碼執行之前執行,或在Amazon建立傳入資料的架構快照之前執行。
import boto3
# 定義Lambda函式處理Kinesis事件
def lambda_handler(event, context):
# 遍歷Kinesis記錄
for record in event['Records']:
# 取得記錄的資料
data = record['kinesis']['data']
# 在這裡新增資料品品檢查邏輯
# 例如:檢查資料是否符合特定的格式或條件
if not validate_data(data):
# 如果資料無效,則丟擲異常或進行其他錯誤處理
raise Exception("無效的資料")
return {
'statusCode': 200,
'statusMessage': 'OK'
}
# 資料驗證函式範例
def validate_data(data):
# 假設資料需要以特定字元開頭
return data.startswith('VALID')
內容解密:
此段程式碼展示了一個用於處理AWS Kinesis事件的AWS Lambda函式範例。該函式遍歷傳入的Kinesis記錄,對每條記錄的資料進行檢查。在lambda_handler
函式中,你可以新增自定義的資料品品檢查邏輯,例如呼叫validate_data
函式來驗證資料是否符合預期。如果資料無效,可以丟擲異常或執行其他錯誤處理措施。
Apache Kafka
Apache Kafka是一個具有高學習曲線的應用程式,它為Kafka Streams、Producers和Consumers提供了大量的粒度設定。Confluent、Instacluster和AWS提供了Apache Kafka的完全託管版本,使得團隊能夠更容易地上手使用這個強大的串流框架,並且通常能夠在開箱即用的情況下處理一些必要的資料停機預防。
Apache Kafka預設透過JMX(Java Management Extensions)規範報告串流指標。你可以使用JConsole等圖形工具視覺化JMX資料。或者,你可以直接存取KafkaStreams Java類別例項,並使用KafkaStreams#metrics()方法存取指標。
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
// 初始化Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams = new KafkaStreams(getTopology(), props);
// 取得Kafka Streams的指標
streams.metrics().forEach((metricName, metricValue) -> {
System.out.println(metricName + ": " + metricValue);
});
內容解密:
此Java程式碼片段展示瞭如何初始化一個Kafka Streams應用程式並取得其指標。首先,組態Kafka Streams應用程式的屬性,包括應用程式ID和引導伺服器地址。然後,建立KafkaStreams例項,並透過呼叫streams.metrics()
方法取得指標。這些指標可以用於監控應用程式的效能和資料品質。
執行分析資料轉換
我們使用“分析資料轉換”這個術語來指代對分析資料進行的轉換。這也可以應用於運算元據和分析資料來源之間的資料整合層,例如在S3資料湖和Redshift資料倉儲之間組態的AWS Glue。由於分析資料與運算元據在幾個關鍵方面有所不同,因此在轉換這些資料時,也需要關注不同的方面。
確保ETL過程中的資料品質
在許多情況下,“ETL”這個術語被用作分析資料轉換的同義詞。ETL代表“提取-轉換-載入”,描述了一個在具有複雜資料的組織中變得越來越普遍的三步驟過程:
在提取步驟中,原始資料從多個上游來源匯出並移動到一個臨時區域。這類別來源的例子包括MySQL和NoSQL伺服器、CRM系統或資料湖中的原始檔案。
接下來,在轉換步驟中,這是ETL中最為關鍵的部分,臨時區域中的資料按照資料工程師的規範進行合併和處理。在某些情況下,轉換步驟可能很簡單,幾乎等同於複製源資料。在其他情況下,轉換可能相當密集。我們將在下一節討論這些轉換可能包含的內容。
最後,在載入步驟中,我們將轉換後的資料從臨時區域移動到目的地,通常是資料倉儲中的特定表。
-- ETL過程中的資料轉換範例
CREATE TABLE transformed_data AS
SELECT
column1,
column2,
-- 對資料進行轉換,例如:資料型別轉換、欄位計算等
CAST(column3 AS INTEGER) AS transformed_column3
FROM
raw_data;
內容解密:
此SQL陳述式展示了在ETL過程中的資料轉換範例。透過建立一個新的表transformed_data
,並從raw_data
表中選擇資料,同時對某些欄位進行必要的轉換,例如資料型別的轉換,以確保資料的品質和一致性。
資料轉換過程中的資料品質確保
正如我們之前所說,無論是在ETL還是ELT過程中,「轉換」步驟可能是最為密集的,並且在不同的應用中會有所不同。ETL是指首先將資料載入暫存伺服器,然後再載入目標系統的過程,而ELT(提取-載入-轉換)則要求資料直接載入目標系統。ETL為資料工程師提供了在資料推播到生產環境之前進行驗證的機會,而ELT則實作了更快的處理——如果你不適當地進行測試和監控,資料品質就會降低。
資料轉換的原因
進行來源資料轉換可能有以下幾個原因:
- 可能只是為了重新命名欄位以符合目標位置的schema要求。
- 可能需要過濾、聚合和總結、去重複或其他方式清理和合並來源資料。
- 可能需要執行型別和單位轉換,例如將不同的貨幣欄位標準化為美元和浮點數型別。
- 可能在此步驟中對敏感資料欄位進行加密,以滿足行業或政府法規要求。
- 對我們來說最重要的,我們可能在此步驟中進行資料治理稽核或資料品品檢查。
警示與測試
像所有軟體和資料應用程式一樣,像dbt、WhereScape或Informatica這樣的ETL系統也容易出現故障。您需要一個強大的測試和警示系統來在高容量生產環境中執行這些應用程式。在本文中,我們將討論ETL/ELT系統典型的警示型別以及一些資料品質的最佳實踐。許多資料轉換系統具有內建的資料品質機制。這些機制可能採用單元測試的形式、對管線健康狀況的可見性指標、警示或其他形式。在接下來的章節中,我們將介紹一些流行的轉換工具的內建工具,以及一些提供資料品質的附加工具。
資料測試的重要性
測試資料在發現資料品質問題方面發揮著至關重要的作用,甚至在資料進入生產資料管線之前就已進行。透過測試,工程師預測可能會出現的問題,並編寫邏輯來預先檢測問題。資料測試是驗證組織對資料的假設的過程,無論是在生產之前還是生產過程中。編寫檢查諸如唯一性和非空值等基本測試是組織測試其對來源資料的基本假設的一種方式。組織通常還會確保資料符合其團隊的工作格式,並且資料滿足其業務需求。
常見的資料品質測試
一些最常見的資料品質測試包括:
- 空值:是否有任何值是未知的(NULL)?
- 數量:我是否獲得了任何資料?資料量是否過多或過少?
- 分佈:我的資料是否在可接受的範圍內?給定欄位中的值是否在範圍內?
- 唯一性:是否有任何值是重複的?
- 已知不變數:這兩個物件是否基本不同,例如,利潤是否始終是收入和成本之間的差額?
測試工具
根據我們的經驗,目前最好的兩個測試資料的工具是dbt測試和Great Expectations(作為一個更通用的工具)。這兩個工具都是開源的,可以在資料品質問題到達利益相關者手中之前發現它們。雖然dbt不是一個測試解決方案,但如果您已經使用該框架來建模和轉換資料,那麼它的開箱即用測試功能效果很好。
要執行資料品質測試,您需要做兩件簡單的事情:
- 將轉換後的資料載入臨時暫存表/資料集。
- 執行測試以確保暫存表中的資料符合生產環境的要求(即,您需要回答「這是否看起來像可靠的資料?」)。
如果資料品質測試失敗,則會向負責該資產的資料工程師或分析師傳送警示,並且不會執行管線。這使資料工程師能夠在影響終端使用者/系統之前捕捉意外的資料品質問題。資料測試可以在轉換之前和轉換過程中的每個步驟之後進行。
dbt單元測試
dbt是現代ELT最流行的選擇之一,其工具擴充套件了向轉換後的表格新增單元測試的功能。dbt run
命令使用SQL執行模型轉換,而dbt test
則執行轉換後模型的單元測試。dbt單元測試可以在自定義SQL查詢中定義,並分配給.yml
schema檔案中的個別模型。
dbt單元測試範例
-- Refunds have a negative amount, so the total amount should
-- always be >=0.
-- Therefore, return records where this isn't true to make the test fail.
SELECT
order_id,
SUM(amount) AS total_amount
FROM
{{ ref('fct_payments') }}
GROUP BY
1
HAVING
NOT(total_amount >= 0)
內容解密:
在這個範例中,我們定義了一個dbt單元測試來驗證fct_payments
模型中的付款記錄是否沒有負值。測試查詢會找出總金額小於0的訂單記錄,如果存在這樣的記錄,測試就會失敗。這個測試確保了資料的正確性和一致性。
dbt測試型別
dbt中有兩種測試型別:
- 單一測試:這些是參考特定模型的獨立SQL測試。如果您用SQL編寫一個單一測試並將其儲存到測試目錄(由
test-paths
組態變數指示),則每次呼叫dbt test
時都會執行它。 - 通用測試:這些測試定義在
.yml
檔案中,並對模型中的資料進行斷言。
結語
在資料轉換過程中確保資料品質至關重要。透過使用dbt等工具進行資料測試和警示,可以在資料進入生產環境之前捕捉潛在的資料品質問題,從而確保資料的可靠性和一致性。在實際應用中,根據具體的業務需求和資料特性選擇合適的測試策略和工具,是確保資料品質的關鍵。
資料測試工具比較:dbt、Great Expectations與Deequ的技術深度分析
在現代資料工程領域中,資料測試是確保資料品質的關鍵步驟。本文將探討三種主流的資料測試工具:dbt、Great Expectations和Deequ。我們將分析它們的技術特點、優勢、限制以及實際應用場景。
dbt測試:SQL驅動的資料驗證
dbt(Data Build Tool)提供了一套根據SQL的測試框架,允許資料工程師定義和執行資料品品檢查。dbt的測試機制主要根據以下四個預設的通用測試:
- unique:驗證特定欄位的值是否唯一
- not_null:檢查特定欄位是否包含空值
- accepted_values:驗證欄位值是否屬於預定義的集合
- relationships:檢查表之間的參照完整性
dbt測試的實作範例
{% test not_null(model, column_name) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} IS NULL
{% endtest %}
內容解密:
此範例展示瞭如何使用dbt的測試巨集來檢查特定欄位是否包含空值。測試邏輯如下:
model
引數指定要測試的資料模型column_name
引數指定要檢查的欄位名稱- 查詢邏輯是選取指定模型中指定欄位為空的所有記錄
- 如果查詢結果不為空,則測試失敗,代表資料中存在空值
dbt測試的優點在於其與資料轉換流程緊密整合,但其侷限性包括:
- 需要手動維護測試邏輯
- 複雜測試可能增加工程資源負擔
- 測試失敗時需要深入分析根本原因
Great Expectations:Python驅動的資料驗證
Great Expectations是一個開源的Python函式庫,提供了一套靈活的資料驗證機制。它允許資料工程師使用Python定義資料的預期狀態,並進行驗證。
Great Expectations驗證範例
expect_column_values_to_be_between(
column="zip_code",
min_value=1,
max_value=99999
)
內容解密:
此範例展示瞭如何使用Great Expectations驗證zip_code
欄位的值是否在指定範圍內。驗證邏輯如下:
column
引數指定要驗證的欄位名稱min_value
和max_value
引數定義了預期的值範圍- 驗證邏輯會檢查欄位中的每個值是否落在指定範圍內
- 如果有值超出範圍,驗證將失敗並記錄相關統計資訊
Great Expectations的優勢包括:
- 使用Python定義驗證邏輯,靈活性高
- 提供Data Docs功能,視覺化驗證結果
- 支援多種資料來源的整合
但同時也存在一些限制:
- 需要Python環境,可能不適合純SQL的資料處理流程
- 相較於dbt,Great Expectations是一個獨立的工具,需要額外的學習曲線
Deequ:根據Apache Spark的資料單元測試
Deequ是由AWS開發的開源函式庫,專門用於大規模資料的單元測試。它根據Apache Spark構建,能夠處理各種格式的資料。
Deequ測試範例
val verificationResult = VerificationSuite()
.onData(df)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ == 1000)
.isComplete("id")
.isUnique("id")
).run()
內容解密:
此範例展示瞭如何使用Deequ對DataFrame進行多重驗證:
hasSize(_ == 1000)
驗證資料集的記錄數是否為1000isComplete("id")
檢查"id"欄位是否完整(不包含空值)isUnique("id")
驗證"id"欄位的值是否唯一- 驗證結果將彙總在
verificationResult
中,提供詳細的檢查報告
Deequ的主要優勢在於其根據Apache Spark,能夠高效處理大規模資料。但其使用Scala作為主要開發語言,可能對Python或SQL使用者造成一定的學習門檻。
技術選型考量與最佳實踐
在選擇合適的資料測試工具時,需要考慮以下因素:
- 現有的技術堆疊:選擇與現有系統整合度最高的工具
- 資料規模:大規模資料建議使用Deequ
- 開發語言:Python使用者適合Great Expectations
- SQL主導的環境:dbt可能是最佳選擇
比較分析圖表
graph LR A[資料測試工具] --> B[dbt] A --> C[Great Expectations] A --> D[Deequ] B --> B1[根據SQL] B --> B2[與轉換流程整合] C --> C1[根據Python] C --> C2[提供Data Docs] D --> D1[根據Apache Spark] D --> D2[支援大規模資料]
圖表翻譯: 此圖表呈現了三種主要資料測試工具的技術特性:
- dbt:根據SQL,與資料轉換流程緊密整合
- Great Expectations:根據Python,提供視覺化的驗證結果(Data Docs)
- Deequ:根據Apache Spark,適合大規模資料處理