在資料工程中,資料轉換是不可或缺的環節。AWS 提供了多種服務來滿足不同規模和複雜度的資料轉換需求。對於輕量級的轉換任務,AWS Lambda 提供了無伺服器的執行環境,可以快速佈署和執行程式碼。而對於大規模資料處理,AWS Glue 則提供了無伺服器的 Spark 和 Python shell 環境,可以高效地處理大量資料。此外,AWS Glue Data Catalog 和爬蟲可以幫助我們更好地管理和理解資料結構,簡化 ETL 流程。在管線協調方面,AWS Glue 工作流程和 Step Functions 提供了不同的選擇,可以根據需求選擇合適的工具來構建和管理複雜的資料處理流程。

AWS 資料轉換服務

一旦資料被擷取到適當的 AWS 服務(如 Amazon S3),管線的下一個階段需要對資料進行轉換,以最佳化其用於分析並使其可供資料消費者使用。

AWS Lambda:輕量級資料轉換

AWS Lambda 提供了一個無伺服器的環境,用於執行程式碼。您可以透過與超過 140 個其他 AWS 服務整合來觸發 Lambda 函式執行程式碼,並且只需為程式碼執行的時間付費,計費單位為毫秒,並根據為函式分配的記憶體量計算。

使用案例

在資料工程領域,Lambda 的常見使用案例包括對傳入資料進行驗證或輕量級處理和轉換。例如,如果合作夥伴在一天中傳送 CSV 檔案,您可以觸發 Lambda 函式在每次接收到新檔案時執行,驗證檔案是否為有效的 CSV 檔案,對其中一列進行計算並更新資料函式庫,然後將檔案移至不同的儲存桶,以便稍後進行批次處理。

import boto3
import csv

s3 = boto3.client('s3')

def lambda_handler(event, context):
    # 從事件中取得檔案名稱
    file_name = event['Records'][0]['s3']['key']
    
    # 從 S3 下載檔案
    s3.download_file('source-bucket', file_name, '/tmp/' + file_name)
    
    # 開啟檔案並驗證其內容
    with open('/tmp/' + file_name, 'r') as file:
        reader = csv.reader(file)
        for row in reader:
            # 對每一列進行處理
            print(row)

    # 將檔案移至不同的儲存桶
    s3.copy_object(CopySource='source-bucket/' + file_name, Bucket='destination-bucket', Key=file_name)
    s3.delete_object(Bucket='source-bucket', Key=file_name)

    return {
        'statusCode': 200,
        'statusMessage': 'OK'
    }

內容解密:

  1. 事件觸發:Lambda 函式由 S3 事件觸發,當有新檔案上傳到指定的 S3 儲存桶時,會自動執行 Lambda 函式。
  2. 檔案下載:使用 s3.download_file 方法從 S3 下載新上傳的檔案到 Lambda 函式的臨時目錄 /tmp/
  3. CSV 驗證與處理:使用 Python 的 csv 模組開啟下載的檔案,並逐行讀取內容進行驗證或處理。
  4. 檔案轉移:處理完成後,使用 s3.copy_object 將檔案複製到另一個 S3 儲存桶,並使用 s3.delete_object 從原始儲存桶中刪除檔案,以完成檔案的轉移。

AWS Glue:無伺服器的 Spark 處理

AWS Glue 提供了一個無伺服器的環境,用於執行 Apache Spark 或 Python shell 工作,用於進行資料轉換和處理。Apache Spark 是用於分散式處理大型資料集的開源引擎,非常適合需要高效能和高效處理的大規模資料處理任務。

使用案例

AWS Glue 可用於對大型資料集進行 ETL(提取、轉換、載入)操作,將分散在不同來源的資料整合、清理和轉換後載入到目標系統,如 Amazon S3 或 Amazon Redshift 中。

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 從 S3 中讀取 CSV 資料
datasource = glueContext.create_dynamic_frame.from_catalog(database="my_database", table_name="my_table")

# 對資料進行轉換
transformed_data = ApplyMapping.apply(frame=datasource, mappings=[("column1", "string", "column1", "string")])

# 將轉換後的資料寫入 S3
glueContext.write_dynamic_frame.from_options(frame=transformed_data, connection_type="s3", connection_options={"path": "s3://my-bucket/output/"}, format="parquet")

job.commit()

內容解密:

  1. 初始化 Glue Context:建立 GlueContext 物件,這是與 AWS Glue 服務互動的主要介面。
  2. 讀取資料:使用 create_dynamic_frame.from_catalog 方法從 AWS Glue 資料目錄中讀取指定的表格(table),這裡假設表格已經被爬蟲(Crawler)建立。
  3. 套用對映:使用 ApplyMapping.apply 對讀取的 DynamicFrame 進行欄位對映,將來源欄位對映到目標欄位,可以在此步驟進行必要的欄位型別轉換。
  4. 寫出資料:使用 write_dynamic_frame.from_options 將轉換後的 DynamicFrame 以 Parquet 格式寫入指定的 S3 路徑。
  5. 提交任務:呼叫 job.commit() 提交 Glue Job,表示任務成功完成。

AWS 資料工程師工具箱:資料處理與轉換服務

AWS 提供多種服務來支援資料工程師進行資料的處理與轉換。這些服務包括 AWS Glue、Amazon EMR 等,旨在簡化資料處理流程並提高效率。

AWS Glue:無伺服器資料處理引擎

AWS Glue 是一種無伺服器的資料處理服務,支援兩種不同的執行引擎:單節點的 Glue Python shell 和多節點的 Glue Apache Spark 叢集。這兩種引擎都能夠處理存放在 Amazon S3 中的資料,並與 AWS Glue Data Catalog 整合。

圖 3.1:Glue Python shell 和 Glue Spark 引擎

這兩種引擎的使用取決於具體的資料處理需求。Glue Python shell 適合輕量級的資料處理任務,而 Glue Apache Spark 叢集則適合大規模的資料處理和分析任務。

資料處理單位(DPU)與計費

AWS Glue 的計費根據所組態的資料處理單位(DPU)數量以及程式碼執行的時間長度。使用者無需自行佈署或管理伺服器,只需指定所需的 DPU 數量即可。

AWS Glue Data Catalog:資料的邏輯檢視

AWS Glue Data Catalog 提供了一個邏輯檢視,用於描述存放在 Amazon S3 中的資料。它允許使用者直接在 ETL(Extract, Transform, Load)程式碼中參照目錄中的物件。

圖 3.2:Amazon S3 儲存桶中的 CSV 檔案

圖 3.3:AWS Glue Data Catalog 中的員工表格邏輯檢視

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title AWS 資料轉換服務與協調工具

package "AWS 雲端架構" {
    package "網路層" {
        component [VPC] as vpc
        component [Subnet] as subnet
        component [Security Group] as sg
        component [Route Table] as rt
    }

    package "運算層" {
        component [EC2] as ec2
        component [Lambda] as lambda
        component [ECS/EKS] as container
    }

    package "儲存層" {
        database [RDS] as rds
        database [DynamoDB] as dynamo
        storage [S3] as s3
    }

    package "服務層" {
        component [API Gateway] as apigw
        component [ALB/NLB] as lb
        queue [SQS] as sqs
    }
}

apigw --> lambda
apigw --> lb
lb --> ec2
lb --> container
lambda --> dynamo
lambda --> s3
ec2 --> rds
container --> rds
vpc --> subnet
subnet --> sg
sg --> rt

@enduml

AWS Glue Data Catalog 相容於 Hive metastore,能夠與多種 AWS 服務和第三方產品整合,如 Amazon Athena 和 Amazon EMR。

AWS Glue 爬蟲:自動推斷資料結構

AWS Glue 爬蟲可以自動檢查資料來源(如 S3 中的路徑),推斷資料結構和其他相關資訊,並將這些資訊填充到 Glue Data Catalog 中。

使用爬蟲自動填充目錄

  1. 設定爬蟲指向 S3 中的特定路徑。
  2. 爬蟲檢查檔案,識別檔案型別(如 CSV 或 Parquet)。
  3. 使用分類別器推斷檔案結構(欄位名稱和型別)。
  4. 將推斷出的結構資訊新增到 Glue Data Catalog。

Amazon EMR:Hadoop 生態系統處理概述

Amazon EMR 提供了一個受管理的平台,用於執行開源的大資料處理工具,如 Apache Spark、Apache Hive 等。使用者可以根據需求選擇適合的工具和組態。

為何選擇 Amazon EMR 或 AWS Glue?

  • AWS Glue 提供無伺服器的環境,適合快速佈署和執行 Apache Spark 程式碼,對使用者經驗要求較低。
  • Amazon EMR 提供更細緻的組態選項,適合需要高度自訂和控制 Apache Spark 環境的使用者。

內容解密:

  • AWS Glue 和 Amazon EMR 都支援 Apache Spark,但它們在組態彈性、成本和易用性方面存在差異。
  • 使用者應根據具體需求選擇適合的服務,以達到最佳的效能和成本效益。

AWS 資料工程師工具箱:大資料管線協調工具

在前一章節中,我們探討了用於分析的資料管理架構。在本章節中,我們將深入瞭解 AWS 提供的用於協調大資料管線的服務。

AWS Glue 工作流程:協調 Glue 元件

AWS Glue 是一種服務,包含多個元件,例如伺服器端的 Apache Spark 或 Python shell 環境,用於執行 ETL 轉換;Glue 資料目錄,提供 Amazon S3 資料湖中資料的集中邏輯表示;以及 Glue 爬蟲,可組態為檢查特定位置的檔案,自動推斷檔案結構,並將檔案新增至 AWS Glue 資料目錄。

圖 3.5:AWS Glue 工作流程

AWS Glue 工作流程是一種功能,旨在協助協調各種 AWS Glue 元件。工作流程包含一系列有序步驟,可以執行 Glue 爬蟲和 Glue ETL 作業(Spark 或 Python shell)。下圖顯示了一個簡單的 Glue 工作流程視覺表示:

此工作流程協調以下任務:

  • 執行 Glue 爬蟲,將新擷取的資料從資料湖的原始區域新增至 Glue 資料目錄。
  • Glue 爬蟲完成後,觸發 Glue ETL 作業,將原始 CSV 資料轉換為 Parquet 格式,並寫入資料湖的整理區域。
  • Glue 作業完成後,觸發另一個 Glue 爬蟲,將新轉換的資料在整理區域新增至 Glue 資料目錄。

每個工作流程步驟都可以檢索和更新工作流程的狀態資訊,使一個步驟能夠提供狀態資訊,供後續步驟使用。例如,工作流程可能執行多個 ETL 作業,每個 ETL 作業都可以更新狀態資訊,例如輸出檔案的位置,供後續工作流程步驟使用。

程式碼範例:定義 AWS Glue 工作流程

import boto3

glue = boto3.client('glue')

# 定義工作流程
workflow_name = 'my_workflow'
response = glue.create_workflow(
    Name=workflow_name,
    Description='My workflow'
)

# 定義工作流程圖
workflow_graph = {
    'Nodes': [
        {
            'Type': 'Crawler',
            'Name': 'my_crawler',
            'CrawlerName': 'my_crawler_name'
        },
        {
            'Type': 'Job',
            'Name': 'my_etl_job',
            'JobName': 'my_etl_job_name'
        }
    ],
    'Edges': [
        {
            'Source': 'my_crawler',
            'Destination': 'my_etl_job'
        }
    ]
}

# 建立工作流程圖
response = glue.create_workflow_graph(
    Name=workflow_name,
    Graph=workflow_graph
)

#### 內容解密:
1. **建立 AWS Glue 使用者端**使用 `boto3` 函式庫建立與 AWS Glue 的連線以便操作 AWS Glue 資源
2. **定義工作流程名稱和描述**指定工作流程的名稱和描述以便於識別和管理
3. **建立工作流程**使用 `create_workflow` 方法建立新的工作流程
4. **定義工作流程圖**描述工作流程中的節點例如爬蟲和 ETL 作業以及它們之間的邊即執行順序)。
5. **建立工作流程圖**使用 `create_workflow_graph` 方法將定義好的工作流程圖與工作流程關聯起來

### AWS Step Functions:複雜工作流程

AWS Step Functions 是另一種用於協調大資料管線的服務它允許您建立複雜的工作流程可以與多個 AWS 服務整合Step Functions 是無伺服器的這意味著您不需要佈署或管理任何基礎設施並且只需根據使用量付費

#### 圖 3.6:AWS Step Functions 狀態機

下圖顯示了一個 Step Functions 狀態機的範例

此狀態機執行以下步驟
1. 當檔案上傳到特定的 Amazon S3 儲存桶時CloudWatch 事件被觸發並啟動狀態機傳入包含新上傳檔案位置的 JSON 物件
2. 第一步驟處理傳入檔案執行一個 Glue Python shell 作業該作業接收上傳檔案的位置作為輸入並處理傳入檔案例如將 CSV 轉換為 Parquet 格式)。Python 函式的輸出指示檔案處理是否成功如果成功還包括寫入 Parquet 檔案的 S3 路徑此資訊包含在傳遞給下一步驟的 JSON 中
3. 作業是否成功?」步驟是選擇型別它檢查傳遞給該步驟的 JSON 資料如果 `jobStatus` 欄位設定為 `succeeded`,則分支到執行 AWS Glue 爬蟲」。如果 `jobStatus` 欄位設定為 `failed`,則分支到作業失敗」。

#### 程式碼範例:定義 AWS Step Functions 狀態機
```json
{
  "StartAt": "ProcessIncomingFile",
  "States": {
    "ProcessIncomingFile": {
      "Type": "Task",
      "Resource": "arn:aws:glue:REGION:ACCOUNT_ID:job/my_etl_job",
      "Next": "DidJobSucceed?"
    },
    "DidJobSucceed?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.jobStatus",
          "StringEquals": "succeeded",
          "Next": "RunAWSGlueCrawler"
        },
        {
          "Variable": "$.jobStatus",
          "StringEquals": "failed",
          "Next": "JobFailed"
        }
      ]
    }
  }
}

#### 內容解密:
1. **定義狀態機起始步驟**指定狀態機的起始步驟為處理傳入檔案」。
2. **描述狀態機中的狀態**定義狀態機中的每個狀態包括任務型別使用的資源以及下一步驟
3. **使用選擇狀態進行條件分支**根據前一步驟的輸出進行條件判斷決定後續執行的步驟
4. **整合 AWS Glue ETL 作業**在狀態機中使用 AWS Glue ETL 作業進行資料處理