在資料驅動的時代,有效率的 ETL 流程至關重要。AWS 提供豐富的雲端服務,協助工程師構建穩健且可擴充套件的資料管道。本文將介紹 AWS Glue、Lambda、Step Functions 等核心服務,並結合程式碼範例與實務操作步驟,引導讀者逐步掌握 AWS 雲端 ETL 的精髓。從本地開發環境的建置,到 S3 儲存桶的操作,再到 Lambda 函式的設計與 Step Functions 工作流程的協調,本文提供全方位的技術指引,協助讀者在 AWS 雲端上開發高效的資料處理流程。

AWS 雲端 ETL 流程工具概覽

在現代資料處理和分析的領域中,AWS 提供了一系列強大的工具來支援 ETL(提取、轉換、載入)流程。這些工具不僅能夠有效地處理大規模資料集,還能實作自動化和可擴充套件的資料工作流程。

AWS Glue:全託管的 ETL 服務

AWS Glue 是 AWS 提供的一種全託管的 ETL 服務,能夠簡化資料的提取、轉換和載入流程。它支援多種資料來源和目的地,包括 Amazon S3、Amazon RDS 和 Amazon DynamoDB。Glue 提供了視覺化的介面來設計和管理 ETL 工作流程,並且能夠自動生成所需的程式碼。

程式碼範例:使用 AWS Glue 處理資料

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 從 S3 讀取資料
datasource = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table")

# 對資料進行轉換
transformed_data = ApplyMapping.apply(frame = datasource, mappings = [("column1", "string", "column1", "string")])

# 將轉換後的資料寫入 S3
glueContext.write_dynamic_frame.from_options(frame = transformed_data, connection_type = "s3", connection_options = {"path": "s3://my-bucket/output/"}, format = "csv")

job.commit()

內容解密:

  1. 初始化 AWS Glue 環境:首先,我們需要初始化 AWS Glue 的執行環境,包括 SparkContext 和 GlueContext。
  2. 從 S3 讀取資料:使用 create_dynamic_frame.from_catalog 方法從指定的資料函式庫和表格中讀取資料。
  3. 對資料進行轉換:使用 ApplyMapping.apply 方法對資料進行轉換,這裡我們簡單地將 column1 的資料型別從 string 對映到 string
  4. 將轉換後的資料寫入 S3:使用 write_dynamic_frame.from_options 方法將轉換後的資料寫入指定的 S3 路徑。

AWS Lambda:無伺服器運算服務

AWS Lambda 是一種無伺服器的運算服務,能夠在不需管理伺服器的情況下執行程式碼。它支援多種程式語言,包括 Python、Node.js 和 Java,並且能夠與其他 AWS 服務(如 S3、DynamoDB 和 RDS)進行整合。Lambda 適合用於簡單的 ETL 工作流程,需要即時處理、過濾或豐富資料。

AWS Step Functions:工作流程自動化服務

AWS Step Functions 是一種工作流程自動化服務,能夠協調多個 Lambda 函式、服務和任務,形成一個狀態機。它提供了視覺化的介面來定義和監控狀態機,並且支援多種狀態型別,如任務、選擇、等待和平行。Step Functions 適合用於構建複雜和長時間執行的 ETL 工作流程。

AWS 大資料工具

除了上述的 ETL 工具外,AWS 還提供了一系列大資料工具,包括 Amazon EMR 和 Amazon Kinesis。這些工具能夠有效地處理和分析大規模的資料集,為企業提供有價值的洞察和資料驅動的決策支援。

Amazon EMR:大資料處理平台

Amazon EMR(Elastic MapReduce)是一種大資料處理平台,能夠使用 Apache Hadoop、Apache Spark 等開源工具處理大規模的資料集。EMR 提供了可擴充套件和成本效益高的解決方案,能夠平行處理大量資料。

Amazon Kinesis:即時串流資料處理

Amazon Kinesis 是一種即時串流資料處理服務,能夠處理和分析即時的資料串流。它提供了可擴充套件和持久的解決方案,能夠使用 Kinesis Data Streams、Kinesis Data Firehose 或 Kinesis Data Analytics 等工具。

AWS 工具在 ETL 流程中的應用

建立免費的 AWS 帳戶以進行學習與開發

在學習新的雲端工具時,彈性可擴充套件的環境固然方便,但也很容易導致意外的費用支出。為了在學習過程中控制成本,我們將透過建立 AWS 免費層(Free Tier)環境來進行探索。

建立 AWS 免費層帳戶的步驟:

  1. 前往 https://aws.amazon.com/free 並點選 Create a Free Account 按鈕,接著選擇 Create a new AWS account
  2. 使用您的電子郵件和自訂的 AWS 帳戶名稱建立根使用者帳戶。
  3. 為了練習目的,請確保建立一個標記為 Always Free 的帳戶,這樣可以在指定的處理能力限制內使用 AWS 的所有工具,以避免意外的費用。

若需進一步瞭解 AWS 免費層的詳細資訊,請參考:https://aws.amazon.com/free/free-tier-faqs/

將 AWS 帳戶連線到本地開發環境

為了進行 ETL 流程的開發,我們需要將 AWS 帳戶與本地環境連線。以下是所需的命令列工具:

本地開發環境所需的工具:

  • AWS CLI:用於與 AWS 服務互動的命令列介面。
  • AWS Serverless Application Model CLI (AWS SAM CLI):用於建置和佈署無伺服器應用程式。
  • Docker:容器化平台,用於封裝和執行應用程式。
  • LocalStack:本地 AWS 雲端環境,用於測試和開發應用程式。

這些工具能夠模擬在本地裝置上存取 AWS 線上介面的體驗,並且可以在佈署到雲端之前測試程式碼,從而節省成本。

安裝與驗證本地開發工具

AWS CLI 的安裝與驗證

AWS CLI 提供了一個統一的命令列介面來控制您的 AWS 帳戶。安裝步驟如下:

  1. 前往 https://aws.amazon.com/cli/ 下載並安裝 AWS CLI。
  2. 在命令提示字元中執行以下命令以驗證安裝:

aws –version


#### Docker 的安裝與驗證

Docker 提供了一個容器化的環境,用於隔離應用程式的執行。安裝步驟如下:

1.  前往 [https://www.docker.com/products/personal/](https://www.docker.com/products/personal/) 下載並安裝 Docker Personal 版本。
2.  在終端機中執行以下命令以驗證安裝:
    ```bash
docker --version
#### Docker容器運作原理:
Docker容器是一種輕量級、可移植的運算環境,能夠封裝應用程式及其依賴項。這種設計使得開發人員能夠在不同環境中一致地佈署和執行應用程式。

```dockerfile

使用官方 Python 映像作為基礎

FROM python:3.9-slim

設定工作目錄

WORKDIR /app

複製 requirements.txt 檔案到容器中

COPY requirements.txt .

安裝 Python 依賴項

RUN pip install –no-cache-dir -r requirements.txt

複製應用程式碼到容器中

COPY . .

開放應用程式的連線埠

EXPOSE 8000

設定容器啟動時執行的命令

CMD [“python”, “app.py”]

    #### 內容解密:
    上述Dockerfile定義了一個Python應用的容器化流程。首先,它使用了官方的Python 3.9映像作為基礎,接著設定了工作目錄並複製了`requirements.txt`檔案到容器中。然後,它安裝了所需的Python依賴項,並將應用程式的程式碼複製到容器內。最後,它開放了8000連線埠並定義了容器啟動時執行的命令。

#### LocalStack 的安裝與驗證

LocalStack 提供了一個本地的 AWS 雲端環境模擬。安裝步驟如下:

1.  在終端機中執行以下命令以安裝 LocalStack:
    ```bash
docker run --rm -it -p 4566:4566 -p 4571:4571 localstack/localstack
  1. 在命令提示字元中執行以下命令以驗證安裝:

aws –endpoint-url=http://localhost:4566 s3 ls


#### AWS SAM CLI 的安裝與驗證

AWS SAM CLI 是用於建置和佈署無伺服器應用程式的框架。安裝步驟如下:

1.  在終端機中執行以下命令以安裝 AWS SAM CLI:
    ```bash
pip install aws-sam-cli
  1. 在命令提示字元中執行以下命令以驗證安裝:

sam –version


## 在AWS上建立ETL Pipeline的教學

在當今的雲端運算環境中,Amazon Web Services(AWS)提供了一套工具,讓資料工程師能夠建立強健、可擴充套件且高效的ETL(Extract, Transform, Load)資料管道。在前一章中,我們介紹了AWS的一些常見資源,並設定了您的本地開發環境以便與AWS工具整合。本章將引導您瞭解如何利用這些工具,在AWS環境中架構和實作有效的ETL資料管道。我們將使用Python Lambda函式和AWS Step Functions建立一個可佈署的ETL資料管道。最後,我們將使用Bonobo、EC2和RDS建立一個可擴充套件的資料管道。這些工具將幫助您的所有資料管道利用雲端的強大功能。

### 本章涵蓋以下主題
* 使用AWS Lambda和Step Functions建立Python資料管道
 + 在本地環境中設定AWS CLI
 + 透過AWS控制檯在AWS中建立S3儲存桶
 + 為每個Lambda函式建立Python指令碼
 + 為Step Functions的狀態機建立JSON指令碼
* 使用Bonobo、EC2和RDS建立可擴充套件的ETL資料管道簡介
 + S3和EC2例項 - 將Python程式碼儲存到EC2例項

### 使用Amazon S3、Lambda和Step Functions建立Python資料管道
在本文中,我們將使用AWS Lambda和Step Functions建立一個簡單的ETL資料管道。AWS Lambda是一種無伺服器運算服務,允許您在不組態或管理伺服器的情況下執行程式碼,而Step Functions提供了一種方法來協調無伺服器的Lambda函式和其他AWS服務成為工作流程。

#### 使用AWS CLI設定環境
點選進入本文GitHub儲存函式庫中的chapter_10目錄,在PyCharm終端機中執行以下命令以組態AWS CLI:
```bash
aws configure

然後,您將被提示輸入存取金鑰ID、秘密存取金鑰、預設區網域名稱和預設輸出格式。使用您的網頁瀏覽器登入到您的AWS管理控制檯以取得以下憑證:

AWS Access Key ID [None]: <您的存取金鑰ID>
AWS Secret Access Key [None]: <您的秘密金鑰ID>
Default region name [None]: us-east-2
Default output format [None]: json

完成這些步驟後,您的AWS CLI就已經組態好了,可以開始執行AWS CLI命令。

在Python中建立“概念驗證”資料管道

使用Python與AWS服務結合是一種本地開發和雲端開發的結合。在將程式碼移到AWS之前,先在本地Python指令碼中編寫並測試每個程式碼塊是一個好主意。這樣,您可以在新增與雲端環境合作的複雜性之前,對程式碼進行預先的健全性檢查。

例如,讓我們編寫一個簡單的資料管道,它匯入三個CSV檔案:traffic_crashes.csv、traffic_crash_vehicle.csv和traffic_crash_people.csv,作為輸入資料。該管道將把檔案匯入為個別的DataFrame,然後根據每個DataFrame的特定列列表匯入每個DataFrame,並傳回一個包含每個DataFrame的字典。在下面的程式碼塊中,您可以看到可以在本地PyCharm環境中建立的“概念驗證”管道程式碼。建議將以下程式碼片段的所有版本儲存在GitHub跟蹤的儲存函式庫中:

import pandas as pd

# 步驟1:讀取資料
crashes_df = pd.read_csv('path/to/my/traffic_crashes.csv')

## 對vehicles_df和people_df重複此操作

# 步驟2:按列列表過濾資料框架
filtered_crashes_df = crashes_df[['CRASH_UNIT_ID', 'CRASH_ID', 'CRASH_DATE']]
filtered_vehicles_df = vehicles_df[['VEHICLE_ID', 'VEHICLE_MAKE', 'VEHICLE_MODEL', 'VEHICLE_YEAR', 'VEHICLE_TYPE']]
filtered_people_df = people_df[['PERSON_ID', 'CRASH_ID', 'CRASH_DATE', 'PERSON_TYPE', 'VEHICLE_ID', 'PERSON_SEX', 'PERSON_AGE']]

# 步驟3:將轉換後的資料放入字典中輸出
transformed_content = {
    'crashes_df': filtered_crashes_df.to_csv(index=False),
    ## 對vehicles_df和people_df重複此操作
}

內容解密:

  1. 匯入必要的函式庫:首先,我們匯入了pandas函式庫,用於資料操作和分析。
  2. 讀取CSV檔案:我們使用pd.read_csv()函式讀取三個CSV檔案,分別代表交通事故、車輛和人員資料。
  3. 資料過濾:我們根據特定的列名過濾了DataFrame,以提取所需的資料。
  4. 轉換資料為CSV格式:我們將過濾後的DataFrame轉換為CSV格式,並將其儲存在字典中,以便於後續處理。

很好!我們已經成功定義了資料管道需要執行的三個主要步驟。現在,讓我們使其適應AWS環境。第一步是將我們的檔案上傳到S3儲存。

在AWS上建立ETL Pipeline的教學

使用Boto3和Amazon S3讀取資料

在第9章中,我們已經介紹了Amazon S3。Amazon S3是亞馬遜的網頁式雲端儲存服務,旨在為開發者提供線上備份和封存資料及應用程式的功能,以簡化網頁規模的運算。

以下步驟提供了透過AWS GUI設定S3儲存桶的高階概述:

  1. 登入AWS管理控制檯,導航至S3服務,並點選「建立儲存桶」按鈕。
  2. 為您的儲存桶輸入一個唯一的名稱(在本章中,我們將參考my-bucket-name)。此名稱在所有AWS資源(Lambda函式、Step Functions等)中必須是全域唯一的。
  3. 選擇要在其中建立儲存桶的區域。這應該與其他AWS資源相同的區域,以最小化延遲和資料傳輸成本。
  4. 為您的儲存桶選擇任何其他選項,例如版本控制、記錄或加密。這些選項將取決於您的特定使用案例和需求。
  5. 點選「建立儲存桶」以建立您的儲存桶。

設定好S3儲存桶後,我們現在可以匯入三個CSV檔案,以便在AWS環境中存取它們。為此,我們可以建立一個新的檔案upload_to_s3.py,並使用boto3 Python套件傳輸檔案:

import boto3

# 步驟1:建立boto3 S3客戶端
s3 = boto3.client('s3')
bucket_name = 'your-bucket-name'

# 步驟2:定義本地環境中的檔案路徑
crashes_path = 'path/to/my/traffic_crashes.csv'
# 為vehicles_path和people_path重複此步驟

# 步驟3:定義“my-bucket-name”S3儲存桶中“traffic”目錄內的輸出檔案路徑
crashes_key = 'traffic/traffic_crashes.csv'
# 為vehicles_key和people_key重複此步驟

# 上傳檔案
s3.upload_file(Filename=crashes_path, Bucket=bucket_name, Key=crashes_key)
# 為vehicles和people重複此步驟

內容解密:

  • 這段程式碼首先匯入了必要的boto3函式庫,並建立了一個S3客戶端。
  • 定義了本地CSV檔案的路徑以及S3儲存桶中的目標路徑。
  • 使用s3.upload_file方法將本地檔案上傳到S3儲存桶中的指定位置。

使用Amazon S3、Lambda和Step Functions建立Python Pipeline

將檔案匯入S3儲存桶後,我們可以重構我們的“概念驗證”資料管道,以參照S3中的檔案路徑,而不是本地位置:

import boto3
import pandas as pd

# 步驟1:建立boto3 S3客戶端
s3 = boto3.client('s3')
bucket_name = 'my-bucket-name'

# 步驟2:定義“my-bucket-name”S3儲存桶中的檔案路徑
crashes_key = 'traffic/traffic_crashes.csv'
# 為vehicles_key和people_key重複此步驟

# 步驟3:使用s3.get_object()參照S3儲存桶中的檔案物件
crashes_response = s3.get_object(Bucket=bucket_name, Key=crashes_key)
# 為vehicles_response和people_response重複此步驟

# 步驟4:讀取資料
crashes_df = pd.read_csv(crashes_response['Body'])
# 為vehicles_df和people_df重複此步驟

內容解密:

  • 這段程式碼修改了原始的“概念驗證”管道,以從S3儲存桶中讀取檔案。
  • 使用s3.get_object方法取得S3物件的參照,然後使用pd.read_csv讀取CSV資料。

AWS Lambda函式

在本文中,我們將探討如何使用AWS Lambda函式在ETL管道中建立模組化步驟。透過將管道分解為更小、更易於管理的步驟,您可以輕鬆地分別管理和最佳化每個步驟。

將程式碼轉換為Step Functions

  1. 建立boto3 S3客戶端:
import boto3

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    return {"s3": s3, "bucket_name": 'my-bucket-name'}
  1. 定義my-bucket-name S3儲存桶中的檔案路徑:
def lambda_handler(event, context):
    bucket_name = event["bucket_name"]
    crashes_key = 'traffic/traffic_crashes.csv'
    # 為vehicles_key和people_key重複此步驟
    return {"bucket_name": bucket_name, "crashes_key": crashes_key, "vehicles_key": vehicles_key, "people_key": people_key}

內容解密:

  • 每個Lambda函式都必須定義在自己的函式檔案中。
  • 第一個Lambda函式建立了S3客戶端並傳回了客戶端和儲存桶名稱。
  • 第二個Lambda函式定義了S3儲存桶中的檔案路徑,並傳回了這些路徑。