資料工程中,管線協調至關重要。本篇以 AWS Step Function 示範如何建構事件驅動的資料管線,並整合 Lambda 函式處理資料,搭配 SNS 傳送狀態通知。首先,我們利用 EventBridge 監聽 S3 儲存桶的檔案上傳事件,觸發 Step Function 狀態機執行預先定義的流程。過程中,Lambda 函式負責實際的資料處理邏輯,若發生錯誤則透過 SNS 傳送通知。此外,文章也探討 Amazon Athena 的應用,包含如何使用 SQL 查詢資料湖中的資料、使用 CTAS 進行資料格式轉換、以及如何透過分割和分桶策略最佳化查詢效能。透過這些技巧,可以有效地管理和最佳化資料湖中的資料查詢,降低成本並提升效率。
使用 AWS Step Function 協調資料管線的實作練習
在前面的章節中,我們已經建立了一個簡單的資料處理管線。在本文中,我們將使用 AWS Step Function 來協調這個管線。Step Function 是一種無伺服器的函式,可以用來協調多個 AWS 服務。
建立 Step Function 狀態機
- 首先,我們需要建立一個新的 Step Function 狀態機。狀態機是一種用來描述工作流程的抽象概念。
- 在 AWS 管理主控台中,導航到 Step Function 服務。
- 點選「建立狀態機」按鈕。
- 選擇「設計您的工作流程」選項。
設定狀態機
- 在設計工作流程的畫面中,我們可以看到一個初始狀態。點選「新增狀態」按鈕來新增一個新的狀態。
- 將第一個狀態命名為「處理 CSV」。這個狀態將會呼叫一個 Lambda 函式來處理 CSV 檔案。
- 設定「處理 CSV」狀態的下一個狀態為「SNS 發布」。這個狀態將會在「處理 CSV」狀態失敗時被觸發。
新增錯誤處理
- 我們現在可以為「處理 CSV」狀態新增錯誤處理。點選「處理 CSV」狀態,然後在右側點選「錯誤處理」標籤。
- 在「捕捉錯誤」下,點選「新增捕捉器」按鈕。選擇「States.ALL」作為錯誤型別,並選擇「SNS 發布」作為回退狀態。將結果路徑設定為「$.Payload」。這意味著如果 Lambda 函式失敗,錯誤訊息將會被新增到 JSON 物件中的「Payload」鍵下,並傳遞給 SNS 通知狀態。
設定成功和失敗狀態
- 在左側,點選「流程」標籤,然後拖曳「成功」狀態到「處理 CSV」狀態下方。
- 然後,拖曳「失敗」狀態到「SNS 發布」狀態下方。這樣,如果工作流程失敗,我們的 Step Function 將會顯示為失敗。
建立 EventBridge 規則
為了觸發 Step Function 狀態機,我們需要建立一個 EventBridge 規則。EventBridge 是一種無伺服器的事件匯流排,可以用來建立事件驅動的工作流程。
設定 CloudTrail 和 EventBridge
- 首先,我們需要設定 CloudTrail 來記錄 S3 物件層級的事件。
- 導航到 CloudTrail 服務,然後點選「建立追蹤」按鈕。
- 設定追蹤名稱為「s3-data-events」,並選擇客戶管理的 AWS KMS 金鑰。
設定 S3 資料事件
- 在「事件型別」下,取消選擇「管理事件」,並選擇「資料事件」。
- 在「S3 資料事件」下,取消選擇「讀取」和「寫入」選項,並選擇特定的儲存桶。
建立 EventBridge 規則觸發 Step Function
- 導航到 EventBridge 服務,然後點選「規則」。
- 點選「建立規則」按鈕,並設定規則名稱為「dataeng-s3-trigger-rule」。
- 在「定義模式」下,選擇「事件模式」,然後選擇「由服務預先定義的模式」。
設定事件模式和目標
- 選擇 AWS 作為服務提供者,並選擇 S3 作為服務名稱。
- 選擇物件層級的操作,並選擇特定的操作(例如 PutObject、CopyObject 和 CompleteMultipartUpload)。
- 在「選擇目標」下,選擇 Step Function 狀態機作為目標。
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title AWS Step Function 協調資料管線實作
package "資料庫架構" {
package "應用層" {
component [連線池] as pool
component [ORM 框架] as orm
}
package "資料庫引擎" {
component [查詢解析器] as parser
component [優化器] as optimizer
component [執行引擎] as executor
}
package "儲存層" {
database [主資料庫] as master
database [讀取副本] as replica
database [快取層] as cache
}
}
pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中
master --> replica : 資料同步
note right of cache
Redis/Memcached
減少資料庫負載
end note
@enduml此圖示說明瞭 Step Function 狀態機的工作流程。
透過以上步驟,我們成功地建立了一個使用 AWS Step Function 和 EventBridge 的事件驅動資料管線。當新的檔案上傳到 S3 儲存桶時,EventBridge 規則將會觸發 Step Function 狀態機,進而執行資料處理工作流程。
使用 AWS Step Function 協調資料管線
在上一章中,我們探討了資料工程師工作的重要組成部分:設計和協調資料管線。首先,我們檢視了資料管線的一些核心概念,例如排程和根據事件的管線,以及如何處理失敗和重試。
然後,我們研究了四種不同的 AWS 服務,可用於建立和協調資料管線。這包括 Amazon Data Pipeline、AWS Glue Workflows、Amazon Managed Workflows for Apache Airflow(MWAA)和 AWS Step Function。我們討論了這些服務的適用案例,以及它們的優缺點。
接著,在本章的實作部分,我們建立了一個根據事件的管線。我們使用了兩個 AWS Lambda 函式進行處理,並使用 Amazon SNS 主題傳送有關失敗的通知。然後,我們將這些資料管線的元件組合到一個由 AWS Step Function 協調的狀態機中。我們還研究瞭如何處理錯誤。
到目前為止,我們已經研究瞭如何設計資料管線的高層架構,並檢視了用於攝取、轉換和消費資料的服務。在本章中,我們將這些概念結合起來,形成了一個協調的資料管線。
在本文的剩餘章節中,我們將探討一些用於資料消費的服務,包括用於臨時 SQL 查詢的服務、用於資料視覺化的服務,以及用於從資料中提取額外洞察的機器學習和人工智慧服務的概述。
測試根據事件的資料協調管線
要測試我們的管線,需要將檔案上傳到我們的 clean-zone S3 儲存桶。一旦檔案上傳,Amazon EventBridge 中建立的規則將觸發我們的 Step Function 狀態機:
- 瀏覽至 Amazon S3 服務,位於 https://s3.console.aws.amazon.com/s3。
- 從儲存桶清單中,點選
dataeng-clean-zone-<initials>儲存桶。 - 可以選擇在此儲存桶中建立一個新的資料夾進行測試。
- 點選「上傳」,然後「新增檔案」。瀏覽您的電腦以尋找具有 CSV 副檔名的檔案(如果找不到,請建立一個新的空檔案,並確保該檔案以 CSV 副檔名儲存)。
- 保留其他設定不變,然後點選「上傳」。
- 瀏覽至 AWS Step Function 服務,位於 https://console.aws.amazon.com/states。
- 點選我們先前建立的狀態機(ProcessFilesStateMachine)。從「執行」清單中,檢視狀態機是否成功或失敗。點選執行的「名稱」屬性以取得更多詳細資訊。
程式碼例項與解析
import random
def lambda_handler(event, context):
random_number = random.randint(0, 2)
if random_number == 0:
# 模擬失敗
return {
'statusCode': 500,
'statusMessage': 'Failed'
}
else:
# 模擬成功
return {
'statusCode': 200,
'statusMessage': 'Succeeded'
}
程式碼解密:
random.randint(0, 2):生成一個介於0到2之間的隨機整數,用於模擬任務的成功或失敗。if random_number == 0:當隨機數為0時,表示任務失敗,傳回500狀態碼。else:當隨機數不為0時,表示任務成功,傳回200狀態碼。
下一章預覽
在下一章中,我們將探討 Amazon Athena 服務,該服務用於對資料湖中的資料進行臨時 SQL 查詢。
第11章:使用 Amazon Athena 進行臨時查詢
Amazon Athena 是一種無伺服器、完全託管的服務,允許您使用 SQL 直接查詢資料湖中的資料,以及查詢其他各種資料函式庫。它不需要任何設定,成本完全根據完成查詢所需的資料掃描量。
在本章中,我們將探討 Athena,研究如何使用 Athena 直接查詢資料湖中的資料,使用 Query Federation 查詢其他資料來源的資料,以及 Athena 如何提供工作群組功能以協助治理和成本管理。
Amazon Athena:資料湖的即時 SQL 分析
簡介 Amazon Athena
結構化查詢語言(SQL)於 1970 年代在 IBM 發明,至今仍是查詢資料的極為流行的語言。每天,全球數百萬人直接使用 SQL 探索各種資料函式庫中的資料,還有更多人使用在底層使用 SQL 查詢資料函式庫的應用程式(無論是商業應用程式、行動應用程式或其他應用程式)。
多年間,美國國家標準協會(ANSI)制定了各種版本的 ANSI-SQL 標準,供資料函式庫供應商用於建立符合 ANSI-SQL 標準的資料函式庫。資料函式庫供應商通常聲稱其資料函式庫與大部分 ANSI-SQL 相容,這意味著不同的資料函式庫引擎支援 ANSI-SQL 標準的不同方面。
社群媒體網路 Facebook 擁有非常龐大的資料集和複雜的資料分析需求,發現 Hadoop 生態系統中的現有工具無法滿足其需求。因此,Facebook 建立了一個內部解決方案,以便能夠使用標準的 ANSI SQL 語義在其龐大的資料集上執行 SQL 查詢,並於 2013 年將其作為開源解決方案 Presto 發布。
2016 年底,AWS 宣佈推出 Amazon Athena,一項新服務,使客戶能夠直接查詢存在於 Amazon S3 中的結構化和半結構化資料。在發布公告中,Amazon 表示 Athena 是 Presto 的託管版本,具有完整的標準 SQL 支援。這為 AWS 客戶提供了 Presto SQL 分析引擎的功能作為無伺服器服務。
SQL 大致分為兩個部分:
- 資料定義語言(DDL),用於建立和修改資料函式庫物件。
- 資料操作語言(DML),用於查詢和操作資料。
2021 年,AWS 將 Amazon Athena 引擎升級至 v2,該版本根據 HiveQL 用於 DDL 陳述式,Presto 版本 0.217 用於 DML 陳述式。
Amazon Athena 需要一個相容 Hive 的資料目錄來提供被查詢資料的中繼資料。目錄提供了一個邏輯檢視(包含表格的資料函式庫,這些表格由列和具有特定資料型別的欄組成),並將其對映到儲存在 Amazon S3 中的實體檔案。Athena 最初有自己的資料目錄,但現在需要使用 AWS Glue 資料目錄作為其相容 Hive 的資料儲存。
最佳化 Amazon Athena 查詢的技巧和竅門
將原始資料攝入資料湖後,我們可以立即在 AWS Glue 資料目錄中為該資料建立一個表格(使用 Glue 爬蟲或使用 Athena 執行 DDL 陳述式來定義表格)。建立表格後,我們可以開始使用 Amazon Athena 對資料執行 SQL 查詢來探索表格。
然而,原始資料通常以純文字格式(如 CSV 或 JSON)攝入。雖然我們可以查詢此格式的資料以進行臨時資料探索,但如果我們需要對大型資料集執行複雜查詢,則這些原始格式的查詢效率不高。還有一些方法可以最佳化我們編寫的 SQL 查詢,以充分利用底層的 Athena 查詢引擎。
Amazon Athena 的成本根據為解析查詢而掃描的壓縮資料量,因此任何可以減少掃描資料量的方法都可以提高查詢效能並降低查詢成本。
在本文中,我們將回顧幾種最佳化分析以提高效能和降低成本的方法。
常見的檔案格式和佈局最佳化
資料工程師可以對原始檔案套用的最具影響力和最容易的轉換是將原始檔案轉換為最佳化的檔案格式,並以最佳化的方式組織檔案佈局。
將原始來源檔案轉換為最佳化的檔案格式
正如我們在第 7 章《轉換資料以最佳化分析》中所討論的,諸如 Apache Parquet 之類別的檔案格式是為分析而設計的,比原始資料格式(如 CSV 或 JSON)更具效能。因此,將原始來源檔案轉換為 Parquet 之類別的格式是資料工程師可以做的最重要的事情之一,以提高 Athena 查詢的效能。請參閱第 7 章《轉換資料以最佳化分析》的「最佳化檔案格式」一節,以更全面地瞭解 Apache Parquet 檔案的好處。
-- 建立一個新的 Parquet 格式表格
CREATE TABLE my_table_parquet (
column1 string,
column2 int,
column3 double
)
STORED AS PARQUET
LOCATION 's3://my-bucket/my-table-parquet/';
內容解密:
上述 SQL 陳述式展示瞭如何使用 CREATE TABLE 陳述式建立一個新的 Parquet 格式表格。其中,STORED AS PARQUET 指定了儲存格式為 Parquet,而 LOCATION 指定了表格在 S3 中的儲存位置。這種格式比 CSV 或 JSON 更適合於分析查詢,能夠提高 Athena 查詢的效能。
管理治理和成本與 Amazon Athena Workgroups
Amazon Athena Workgroups 提供了一種管理查詢和成本的方法,可以根據不同的使用者或團隊分配不同的 Workgroup,以實作資源的有效管理和隔離。
建立和管理 Athena Workgroup
要建立一個新的 Athena Workgroup,您可以使用 AWS 管理主控台、AWS CLI 或 SDK。以下是一個使用 AWS CLI 建立新 Workgroup 的範例:
aws athena create-work-group --name my-workgroup --configuration "ResultConfiguration={OutputLocation=s3://my-bucket/athena-results/}"
切換 Workgroup 和執行查詢
建立 Workgroup 後,您可以切換到該 Workgroup 以執行查詢。以下是一個使用 AWS CLI 切換 Workgroup 的範例:
aws athena start-query-execution --query-string "SELECT * FROM my_table" --work-group my-workgroup
內容解密:
上述 AWS CLI 命令展示瞭如何建立一個新的 Athena Workgroup 以及如何切換到該 Workgroup 以執行查詢。其中,create-work-group 命令用於建立新的 Workgroup,而 start-query-execution 命令用於在指定的 Workgroup 中執行查詢。這使得管理和隔離不同使用者或團隊的查詢變得更加容易。
最佳化 Amazon Athena 查詢的技巧與訣竅
在第 7 章「轉換資料以最佳化分析」中,我們回顧瞭如何使用 AWS Glue 服務將檔案轉換為最佳化格式。然而,Amazon Athena 也可以使用稱為「建立表作為選擇」(CTAS)的概念來轉換檔案。
使用 CTAS 進行資料轉換
透過 CTAS 陳述式,您可以指示 Athena 根據對不同表的 SQL 選擇陳述式建立新表。以下是一個例子,假設 customers_csv 是根據從資料函式庫匯入資料湖的資料建立的表,且資料以 CSV 格式儲存。如果我們想建立該表的 Parquet 版本以有效查詢它,可以使用 Athena 執行以下 SQL 陳述式:
CREATE TABLE customers_parquet
WITH (
format = 'Parquet',
parquet_compression = 'SNAPPY')
AS SELECT *
FROM customers_csv;
內容解密:
CREATE TABLE customers_parquet:建立一個名為customers_parquet的新表。WITH (format = 'Parquet', parquet_compression = 'SNAPPY'):指定新表的格式為 Parquet,並使用 Snappy 壓縮演算法進行壓縮。AS SELECT * FROM customers_csv:將customers_csv表中的所有資料複製到新表中。
如果您定期匯入特定的資料集(例如每天晚上),在大多數場景中,組態和排程 AWS Glue 作業來執行轉換到 Parquet 格式是有意義的。但是,如果您正在對各種資料集進行臨時探索性工作,或是一次性的資料載入,那麼您可能需要考慮使用 Amazon Athena 來執行轉換。
分割資料集
分割是另一種可以顯著提高分析查詢效能的方法。常見的資料分割策略是根據與日期相關的列進行分割。例如,在我們的銷售表中,我們可以有 YEAR、MONTH 和 DAY 列,分別反映特定銷售交易的年、月和日。當資料寫入 S3 時,所有與特定日期相關的銷售資料將被寫入相同的 S3 字首路徑。
我們的分割資料集可能如下所示:
/datalake/transform_zone/sales/YEAR=2021/MONTH=9/DAY=29/sales1.parquet
/datalake/transform_zone/sales/YEAR=2021/MONTH=9/DAY=30/sales1.parquet
/datalake/transform_zone/sales/YEAR=2021/MONTH=10/DAY=1/sales1.parquet
/datalake/transform_zone/sales/YEAR=2021/MONTH=10/DAY=2/sales1.parquet
內容解密:
- 分割資料集的結構清晰地反映了日期層級,便於根據日期進行查詢最佳化。
當使用 WHERE 子句根據一個或多個分割列過濾查詢結果時,分割提供了顯著的效能優勢。例如,如果資料分析師需要查詢 2021 年 9 月最後一天的總銷售額,他們可以執行以下查詢:
SELECT sum(SALE_AMOUNT)
FROM SALES
WHERE YEAR = '2021' AND MONTH = '9' AND DAY = '30'
內容解密:
SELECT sum(SALE_AMOUNT):計算SALE_AMOUNT列的總和。FROM SALES:指定查詢的表為SALES。WHERE YEAR = '2021' AND MONTH = '9' AND DAY = '30':根據YEAR、MONTH和DAY列過濾資料,只選擇 2021 年 9 月 30 日的銷售資料。
根據我們的分割策略,前面的查詢只需要讀取單個 S3 字首 /datalake/transform_zone/sales/YEAR=2021/MONTH=9/DAY=30 中的檔案。
其他根據檔案的最佳化
除了使用最佳化的檔案格式(如 Apache Parquet)和分割資料外,還有多種其他策略可以微調效能。
最佳化檔案大小
為了最佳化分析查詢,應避免有大量的小檔案。對於 S3 中的每個檔案,分析引擎需要開啟檔案、讀取 Parquet 後設資料、掃描檔案內容並關閉檔案。列出大量檔案並處理每個檔案會導致顯著的 I/O 開銷。最佳化分析的最佳檔案大小應在 128 MB 至 1024 MB 之間。
分桶
分桶是一種與分割相關的概念,您可以根據指定列的雜湊值將資料行分組到指定數量的桶中。目前,Athena 與 Spark 中使用的分桶實作不相容,因此您應該使用 Athena CTAS 陳述式來分桶您的資料。
分割投影
在具有大量分割的場景中,Athena 從 Glue 目錄讀取所有關於分割的資訊會導致顯著的開銷。為了提高效能,您可以組態分割投影,提供一個組態模式來反映您的分割。Athena 可以使用此組態資訊來確定可能的分割值,而無需從目錄中讀取分割資訊。