在資料工程中,資料轉換是不可或缺的環節。AWS 提供了多種服務來滿足不同規模和複雜度的資料轉換需求。對於輕量級的轉換任務,AWS Lambda 提供了無伺服器的執行環境,可以快速佈署和執行程式碼。而對於大規模資料處理,AWS Glue 則提供了無伺服器的 Spark 和 Python shell 環境,可以高效地處理大量資料。此外,AWS Glue Data Catalog 和爬蟲可以幫助我們更好地管理和理解資料結構,簡化 ETL 流程。在管線協調方面,AWS Glue 工作流程和 Step Functions 提供了不同的選擇,可以根據需求選擇合適的工具來構建和管理複雜的資料處理流程。
AWS 資料轉換服務
一旦資料被擷取到適當的 AWS 服務(如 Amazon S3),管線的下一個階段需要對資料進行轉換,以最佳化其用於分析並使其可供資料消費者使用。
AWS Lambda:輕量級資料轉換
AWS Lambda 提供了一個無伺服器的環境,用於執行程式碼。您可以透過與超過 140 個其他 AWS 服務整合來觸發 Lambda 函式執行程式碼,並且只需為程式碼執行的時間付費,計費單位為毫秒,並根據為函式分配的記憶體量計算。
使用案例
在資料工程領域,Lambda 的常見使用案例包括對傳入資料進行驗證或輕量級處理和轉換。例如,如果合作夥伴在一天中傳送 CSV 檔案,您可以觸發 Lambda 函式在每次接收到新檔案時執行,驗證檔案是否為有效的 CSV 檔案,對其中一列進行計算並更新資料函式庫,然後將檔案移至不同的儲存桶,以便稍後進行批次處理。
import boto3
import csv
s3 = boto3.client('s3')
def lambda_handler(event, context):
# 從事件中取得檔案名稱
file_name = event['Records'][0]['s3']['key']
# 從 S3 下載檔案
s3.download_file('source-bucket', file_name, '/tmp/' + file_name)
# 開啟檔案並驗證其內容
with open('/tmp/' + file_name, 'r') as file:
reader = csv.reader(file)
for row in reader:
# 對每一列進行處理
print(row)
# 將檔案移至不同的儲存桶
s3.copy_object(CopySource='source-bucket/' + file_name, Bucket='destination-bucket', Key=file_name)
s3.delete_object(Bucket='source-bucket', Key=file_name)
return {
'statusCode': 200,
'statusMessage': 'OK'
}
內容解密:
- 事件觸發:Lambda 函式由 S3 事件觸發,當有新檔案上傳到指定的 S3 儲存桶時,會自動執行 Lambda 函式。
- 檔案下載:使用
s3.download_file方法從 S3 下載新上傳的檔案到 Lambda 函式的臨時目錄/tmp/。 - CSV 驗證與處理:使用 Python 的
csv模組開啟下載的檔案,並逐行讀取內容進行驗證或處理。 - 檔案轉移:處理完成後,使用
s3.copy_object將檔案複製到另一個 S3 儲存桶,並使用s3.delete_object從原始儲存桶中刪除檔案,以完成檔案的轉移。
AWS Glue:無伺服器的 Spark 處理
AWS Glue 提供了一個無伺服器的環境,用於執行 Apache Spark 或 Python shell 工作,用於進行資料轉換和處理。Apache Spark 是用於分散式處理大型資料集的開源引擎,非常適合需要高效能和高效處理的大規模資料處理任務。
使用案例
AWS Glue 可用於對大型資料集進行 ETL(提取、轉換、載入)操作,將分散在不同來源的資料整合、清理和轉換後載入到目標系統,如 Amazon S3 或 Amazon Redshift 中。
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 從 S3 中讀取 CSV 資料
datasource = glueContext.create_dynamic_frame.from_catalog(database="my_database", table_name="my_table")
# 對資料進行轉換
transformed_data = ApplyMapping.apply(frame=datasource, mappings=[("column1", "string", "column1", "string")])
# 將轉換後的資料寫入 S3
glueContext.write_dynamic_frame.from_options(frame=transformed_data, connection_type="s3", connection_options={"path": "s3://my-bucket/output/"}, format="parquet")
job.commit()
內容解密:
- 初始化 Glue Context:建立 GlueContext 物件,這是與 AWS Glue 服務互動的主要介面。
- 讀取資料:使用
create_dynamic_frame.from_catalog方法從 AWS Glue 資料目錄中讀取指定的表格(table),這裡假設表格已經被爬蟲(Crawler)建立。 - 套用對映:使用
ApplyMapping.apply對讀取的 DynamicFrame 進行欄位對映,將來源欄位對映到目標欄位,可以在此步驟進行必要的欄位型別轉換。 - 寫出資料:使用
write_dynamic_frame.from_options將轉換後的 DynamicFrame 以 Parquet 格式寫入指定的 S3 路徑。 - 提交任務:呼叫
job.commit()提交 Glue Job,表示任務成功完成。
AWS 資料工程師工具箱:資料處理與轉換服務
AWS 提供多種服務來支援資料工程師進行資料的處理與轉換。這些服務包括 AWS Glue、Amazon EMR 等,旨在簡化資料處理流程並提高效率。
AWS Glue:無伺服器資料處理引擎
AWS Glue 是一種無伺服器的資料處理服務,支援兩種不同的執行引擎:單節點的 Glue Python shell 和多節點的 Glue Apache Spark 叢集。這兩種引擎都能夠處理存放在 Amazon S3 中的資料,並與 AWS Glue Data Catalog 整合。
圖 3.1:Glue Python shell 和 Glue Spark 引擎
這兩種引擎的使用取決於具體的資料處理需求。Glue Python shell 適合輕量級的資料處理任務,而 Glue Apache Spark 叢集則適合大規模的資料處理和分析任務。
資料處理單位(DPU)與計費
AWS Glue 的計費根據所組態的資料處理單位(DPU)數量以及程式碼執行的時間長度。使用者無需自行佈署或管理伺服器,只需指定所需的 DPU 數量即可。
AWS Glue Data Catalog:資料的邏輯檢視
AWS Glue Data Catalog 提供了一個邏輯檢視,用於描述存放在 Amazon S3 中的資料。它允許使用者直接在 ETL(Extract, Transform, Load)程式碼中參照目錄中的物件。
圖 3.2:Amazon S3 儲存桶中的 CSV 檔案
圖 3.3:AWS Glue Data Catalog 中的員工表格邏輯檢視
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title AWS 資料轉換服務與協調工具
package "AWS 雲端架構" {
package "網路層" {
component [VPC] as vpc
component [Subnet] as subnet
component [Security Group] as sg
component [Route Table] as rt
}
package "運算層" {
component [EC2] as ec2
component [Lambda] as lambda
component [ECS/EKS] as container
}
package "儲存層" {
database [RDS] as rds
database [DynamoDB] as dynamo
storage [S3] as s3
}
package "服務層" {
component [API Gateway] as apigw
component [ALB/NLB] as lb
queue [SQS] as sqs
}
}
apigw --> lambda
apigw --> lb
lb --> ec2
lb --> container
lambda --> dynamo
lambda --> s3
ec2 --> rds
container --> rds
vpc --> subnet
subnet --> sg
sg --> rt
@endumlAWS Glue Data Catalog 相容於 Hive metastore,能夠與多種 AWS 服務和第三方產品整合,如 Amazon Athena 和 Amazon EMR。
AWS Glue 爬蟲:自動推斷資料結構
AWS Glue 爬蟲可以自動檢查資料來源(如 S3 中的路徑),推斷資料結構和其他相關資訊,並將這些資訊填充到 Glue Data Catalog 中。
使用爬蟲自動填充目錄
- 設定爬蟲指向 S3 中的特定路徑。
- 爬蟲檢查檔案,識別檔案型別(如 CSV 或 Parquet)。
- 使用分類別器推斷檔案結構(欄位名稱和型別)。
- 將推斷出的結構資訊新增到 Glue Data Catalog。
Amazon EMR:Hadoop 生態系統處理概述
Amazon EMR 提供了一個受管理的平台,用於執行開源的大資料處理工具,如 Apache Spark、Apache Hive 等。使用者可以根據需求選擇適合的工具和組態。
為何選擇 Amazon EMR 或 AWS Glue?
- AWS Glue 提供無伺服器的環境,適合快速佈署和執行 Apache Spark 程式碼,對使用者經驗要求較低。
- Amazon EMR 提供更細緻的組態選項,適合需要高度自訂和控制 Apache Spark 環境的使用者。
內容解密:
- AWS Glue 和 Amazon EMR 都支援 Apache Spark,但它們在組態彈性、成本和易用性方面存在差異。
- 使用者應根據具體需求選擇適合的服務,以達到最佳的效能和成本效益。
AWS 資料工程師工具箱:大資料管線協調工具
在前一章節中,我們探討了用於分析的資料管理架構。在本章節中,我們將深入瞭解 AWS 提供的用於協調大資料管線的服務。
AWS Glue 工作流程:協調 Glue 元件
AWS Glue 是一種服務,包含多個元件,例如伺服器端的 Apache Spark 或 Python shell 環境,用於執行 ETL 轉換;Glue 資料目錄,提供 Amazon S3 資料湖中資料的集中邏輯表示;以及 Glue 爬蟲,可組態為檢查特定位置的檔案,自動推斷檔案結構,並將檔案新增至 AWS Glue 資料目錄。
圖 3.5:AWS Glue 工作流程
AWS Glue 工作流程是一種功能,旨在協助協調各種 AWS Glue 元件。工作流程包含一系列有序步驟,可以執行 Glue 爬蟲和 Glue ETL 作業(Spark 或 Python shell)。下圖顯示了一個簡單的 Glue 工作流程視覺表示:
此工作流程協調以下任務:
- 執行 Glue 爬蟲,將新擷取的資料從資料湖的原始區域新增至 Glue 資料目錄。
- Glue 爬蟲完成後,觸發 Glue ETL 作業,將原始 CSV 資料轉換為 Parquet 格式,並寫入資料湖的整理區域。
- Glue 作業完成後,觸發另一個 Glue 爬蟲,將新轉換的資料在整理區域新增至 Glue 資料目錄。
每個工作流程步驟都可以檢索和更新工作流程的狀態資訊,使一個步驟能夠提供狀態資訊,供後續步驟使用。例如,工作流程可能執行多個 ETL 作業,每個 ETL 作業都可以更新狀態資訊,例如輸出檔案的位置,供後續工作流程步驟使用。
程式碼範例:定義 AWS Glue 工作流程
import boto3
glue = boto3.client('glue')
# 定義工作流程
workflow_name = 'my_workflow'
response = glue.create_workflow(
Name=workflow_name,
Description='My workflow'
)
# 定義工作流程圖
workflow_graph = {
'Nodes': [
{
'Type': 'Crawler',
'Name': 'my_crawler',
'CrawlerName': 'my_crawler_name'
},
{
'Type': 'Job',
'Name': 'my_etl_job',
'JobName': 'my_etl_job_name'
}
],
'Edges': [
{
'Source': 'my_crawler',
'Destination': 'my_etl_job'
}
]
}
# 建立工作流程圖
response = glue.create_workflow_graph(
Name=workflow_name,
Graph=workflow_graph
)
#### 內容解密:
1. **建立 AWS Glue 使用者端**:使用 `boto3` 函式庫建立與 AWS Glue 的連線,以便操作 AWS Glue 資源。
2. **定義工作流程名稱和描述**:指定工作流程的名稱和描述,以便於識別和管理。
3. **建立工作流程**:使用 `create_workflow` 方法建立新的工作流程。
4. **定義工作流程圖**:描述工作流程中的節點(例如爬蟲和 ETL 作業)以及它們之間的邊(即執行順序)。
5. **建立工作流程圖**:使用 `create_workflow_graph` 方法將定義好的工作流程圖與工作流程關聯起來。
### AWS Step Functions:複雜工作流程
AWS Step Functions 是另一種用於協調大資料管線的服務。它允許您建立複雜的工作流程,可以與多個 AWS 服務整合。Step Functions 是無伺服器的,這意味著您不需要佈署或管理任何基礎設施,並且只需根據使用量付費。
#### 圖 3.6:AWS Step Functions 狀態機
下圖顯示了一個 Step Functions 狀態機的範例:
此狀態機執行以下步驟:
1. 當檔案上傳到特定的 Amazon S3 儲存桶時,CloudWatch 事件被觸發,並啟動狀態機,傳入包含新上傳檔案位置的 JSON 物件。
2. 第一步驟「處理傳入檔案」執行一個 Glue Python shell 作業,該作業接收上傳檔案的位置作為輸入,並處理傳入檔案(例如,將 CSV 轉換為 Parquet 格式)。Python 函式的輸出指示檔案處理是否成功,如果成功,還包括寫入 Parquet 檔案的 S3 路徑。此資訊包含在傳遞給下一步驟的 JSON 中。
3. 「作業是否成功?」步驟是選擇型別。它檢查傳遞給該步驟的 JSON 資料,如果 `jobStatus` 欄位設定為 `succeeded`,則分支到「執行 AWS Glue 爬蟲」。如果 `jobStatus` 欄位設定為 `failed`,則分支到「作業失敗」。
#### 程式碼範例:定義 AWS Step Functions 狀態機
```json
{
"StartAt": "ProcessIncomingFile",
"States": {
"ProcessIncomingFile": {
"Type": "Task",
"Resource": "arn:aws:glue:REGION:ACCOUNT_ID:job/my_etl_job",
"Next": "DidJobSucceed?"
},
"DidJobSucceed?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.jobStatus",
"StringEquals": "succeeded",
"Next": "RunAWSGlueCrawler"
},
{
"Variable": "$.jobStatus",
"StringEquals": "failed",
"Next": "JobFailed"
}
]
}
}
}
#### 內容解密:
1. **定義狀態機起始步驟**:指定狀態機的起始步驟為「處理傳入檔案」。
2. **描述狀態機中的狀態**:定義狀態機中的每個狀態,包括任務型別、使用的資源以及下一步驟。
3. **使用選擇狀態進行條件分支**:根據前一步驟的輸出進行條件判斷,決定後續執行的步驟。
4. **整合 AWS Glue ETL 作業**:在狀態機中使用 AWS Glue ETL 作業進行資料處理。