Python 的 ETL 工具 Luigi 和 Airflow 提供了建構複雜資料管線的有效方法。Luigi 擅長任務管理和依賴解析,Airflow 則以工作流程排程和監控見長。兩種工具都支援 Python 指令碼,方便整合現有程式碼。AWS 雲端服務則為 ETL 處理提供了豐富的資源,從儲存服務 S3、RDS、Redshift 到運算服務 EC2、無伺服器運算平台 Lambda,以及全託管 ETL 服務 Glue,都能有效提升 ETL 效率。S3 與 EC2 的協同運作更能實作靈活的應用程式建置,滿足不同資料處理需求。

Python 中的 ETL 工具與工作流程管理平台

Luigi:Python 中的 ETL 工具

Luigi 是一個開源的 Python 套件,用於構建複雜的批次作業資料管線。它最初在 Spotify 開發,用於管理組織的資料工作流程,現已成為資料工程和資料科學團隊中廣泛使用的工具。

Luigi 的主要特點

  • 任務管理:Luigi 提供了一種清晰的方式來定義任務、輸入、輸出和依賴關係,使資料管線的流程易於理解。
  • 依賴解析:Luigi 自動解析依賴關係,確保任務按照正確的順序執行。
  • 工作流程視覺化:Luigi 內建視覺化工具,可以生成工作流程的視覺表示,使任務狀態和管線進度一目瞭然。
  • 靈活性:Luigi 提供了一個靈活的框架,用於定義任務和工作流程,可以根據特定需求自訂管線。

安裝和使用 Luigi

在 PyCharm 終端機中使用 pipenv 環境安裝 Luigi:

pipenv install luigi

將現有的 ETL 程式碼轉換為 Luigi

需要為每個 ETL 步驟定義 Luigi 任務,指定輸入、輸出和處理邏輯。同時需要定義任務依賴關係和工作流程結構,並執行 Luigi 排程器來執行任務。

使用 Luigi 重構 ETL 管線

chapter_08/tools/ 目錄下,開啟 06_luigi_pipeline.py 檔案。其中定義了三個任務,分別對應 ETL 的每個步驟:ExtractCrashesExtractVehiclesExtractPeople 用於讀取 CSV 檔案中的資料並過濾必要的欄位;TransformCrashesTransformVehiclesTransformPeople 用於轉換資料並輸出清理後的 CSV 檔案;LoadCrashesLoadVehiclesLoadPeople 用於將資料載入 PostgreSQL 資料表中。

class ChicagoDMV(luigi.Task):
    def requires(self):
        return [LoadCrashes(), LoadVehicles(), LoadPeople()]

執行 Luigi 管線需要將程式碼儲存為 Python 指令碼,例如 06_luigi_pipeline.py,然後使用 luigi 命令從命令列執行:

luigi --module load ChicagoDMV --local-scheduler

工作流程管理平台

當資料需求擴大時,需要使用工作流程管理平台來簡化和自動化資料管線佈署。常見的工作流程管理平台包括:

  • Apache Airflow:一個以程式設計方式建立、排程和監控工作流程的平台。
  • Apache Nifi:一個易於使用、功能強大且可靠的系統,用於處理和分發資料。
  • Prefect:一個用於在 Python 中構建、佈署和管理工作流程的平台。

Apache Airflow

Apache Airflow 是一個開源平台,允許使用者以程式設計方式建立、排程和監控工作流程。它最初在 Airbnb 開發,用於幫助管理公司複雜的資料處理管線。

Airflow 使用有向無環圖(DAG)結構來構建工作流程,使資料流透過管線的視覺化變得容易。

此圖示說明瞭一個簡單的 DAG 結構,其中任務按照特定的順序執行。

在 Python 中使用 Airflow 重構 ETL 管道

Apache Airflow 是一種強大的工作流程管理工具,專為建立、排程和監控複雜的工作流程而設計。在本章中,我們將探討如何使用 Airflow 重構現有的 ETL(提取、轉換、載入)管道。

Airflow 的主要功能

Airflow 的一個關鍵特性是其可擴充套件性。使用者可以輕鬆地編寫自定義運算元或掛鉤,以與新的系統整合或執行自定義任務。Airflow 還擁有豐富的外掛生態系統,提供額外的功能,例如與雲端提供者或第三方工具的整合。

Airflow 具有一個網頁介面,允許使用者監控工作流程的狀態、檢視任務日誌,並在需要時手動觸發任務。它還支援電子郵件通知和 Slack 整合,以便在工作流程失敗時提醒使用者。

安裝和開發 Airflow

要安裝 Airflow,可以使用 pip 直接將其安裝到本地環境中:

pip install apache-airflow

要執行 Airflow DAG,需要連線到網頁伺服器或直接將指令碼匯入 Airflow 工作區。以下是執行 Airflow 管道的步驟:

  1. 啟動 Airflow 網頁伺服器:airflow webserver
  2. 啟動 Airflow 排程器:airflow scheduler
  3. 在網頁瀏覽器中存取 Airflow 網頁介面:http://localhost:8080
  4. 在 Airflow 網頁介面中建立新的 DAG
  5. 定義 DAG 引數,例如 default_args
  6. 定義任務,例如使用 PythonOperator
  7. 定義任務依賴關係,例如使用 >> 運算元
  8. 儲存 DAG 並啟動它
  9. 監控 DAG 狀態和任務日誌
  10. 測試 DAG,例如手動執行或使用觸發器

重構 ETL 管道

在本章中,我們將使用 Airflow 重構現有的 ETL 管道。首先,我們需要定義 DAG 引數和任務。然後,我們可以使用 PythonOperator 定義任務,並使用 >> 運算元定義任務依賴關係。

例如,以下是定義 DAG 和任務的程式碼:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'first_airflow_pipeline',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 13),
    'retry_delay': timedelta(minutes=5),
    'catchup': False,
}

dag = DAG('chicago_dmv', default_args=default_args, schedule_interval=timedelta(days=1))

task_extract_crashes = PythonOperator(
    task_id='extract_crashes',
    python_callable=extract_data,
    op_kwargs={'crash_filepath': config_data['crash_filepath']},
    dag=dag
)

task_transform_crashes = PythonOperator(
    task_id='transform_crashes',
    python_callable=transform_crash_data,
    op_kwargs={'crash_df': "{{ task_instance.xcom_pull(task_ids='extract_crashes') }}"},
    dag=dag
)

task_load_crash = PythonOperator(
    task_id='load_crash',
    python_callable=load_data,
    op_kwargs={'df': "{{ task_instance.xcom_pull(task_ids='transform_crash') }}"},
    dag=dag
)

task_extract_crashes >> task_transform_crashes
task_transform_crashes >> task_load_crash

程式碼解密:

  • 定義 DAG 引數和任務:使用 default_argsDAG 定義 DAG,使用 PythonOperator 定義任務。
  • 定義任務依賴關係:使用 >> 運算元定義任務依賴關係,例如 task_extract_crashes >> task_transform_crashes
  • 使用 XCom 系統傳遞任務結果:使用 task_instance.xcom_pull 方法傳遞任務結果,例如 {{ task_instance.xcom_pull(task_ids='extract_crashes') }}

AWS 工具在 ETL 處理流程的應用

AWS(Amazon Web Services)是企業資料整合系統中最廣泛使用的平台之一,其靈活的收費標準與多樣化的應用服務,使其對於大規模企業和小規模個人專案都非常有用。本章節將重點介紹 AWS 免費層級(Free Tier)資源,並探討如何利用這些資源進行 ETL(Extract, Transform, Load)處理流程。

AWS 中的常見資料儲存工具

AWS 提供了多種備受推崇的資料儲存服務,包括 Amazon Relational Database Service(RDS)、Amazon Redshift、Amazon Simple Storage Service(S3)和 Amazon Elastic Compute Cloud(EC2)。這些服務因其可靠性、可擴充套件性和安全性而在業界廣泛認可和使用。

本章節將使用的 AWS 工具

AWS 工具用途描述
AWS Lambda無需佈建或管理伺服器即可執行程式碼,非常適合事件驅動的資料處理。
Amazon EC2在雲端提供可擴充套件的運算能力,用於執行應用程式。
Amazon RDS受管理的關係型資料函式庫服務,適合結構化資料的儲存和檢索。
Amazon S3物件儲存服務,非常適合儲存和檢索大量資料。
AWS Glue完全受管理的 ETL 服務,適合資料編目和 ETL 作業。
AWS Step Functions將多個 AWS 服務協調成無伺服器工作流程,適合複雜的 ETL 任務。
Amazon Kinesis收集、處理和分析即時資料流,適合即時分析。
Amazon Redshift資料倉儲服務,非常適合大規模資料分析。
AWS Data Pipeline協調和自動化資料移動和轉換,適合複雜的 ETL 工作流程。

詳細介紹 AWS 中的常見資料儲存工具

Amazon RDS

Amazon RDS(https://aws.amazon.com/free/database/)是一種完全受管理的關係型資料函式庫服務,提供在雲端執行和管理關係型資料函式庫的靈活性。RDS 支援多種流行的資料函式庫引擎,如 MySQL、PostgreSQL、Oracle、SQL Server 和 MariaDB。它提供諸如自動備份、時間點還原、讀取副本、多可用區域佈署以及與其他 AWS 服務(如 S3、Glue 和 Lambda)的整合等功能。RDS 為執行各種型別的資料函式庫工作負載(如線上交易處理(OLTP)、線上分析處理(OLAP)或交易處理)提供了一個可擴充套件且可靠的平台。

Amazon Redshift

AWS Redshift(https://aws.amazon.com/pm/redshift/)是一種完全受管理的資料倉儲服務,允許以可擴充套件且成本效益高的方式儲存和分析大量資料。Redshift 可用作需要聚合、報告或資料倉儲的 ETL 工作流程的目標。它提供根據 SQL 的介面、自動壓縮、分佈、備份以及與各種商業智慧(BI)工具(如 Tableau 和 Power BI)的整合。

Amazon S3

AWS S3(https://aws.amazon.com/pm/serv-s3/)是一種非常優秀的物件儲存服務,提供從世界任何地方高效儲存和檢索資料的方式。只要具備正確的憑證,使用者可以在任何時間、從網路上的任何位置,使用簡單的網頁服務介面儲存和檢索任意數量的資料。根據其檔案,S3 的設計目標是達到 99.999999999% 的耐久性和 99.99% 的可用性(這是非常高的可靠性!),並提供無限制的儲存容量,沒有最小或最大物件大小限制。S3 支援各種資料型別和格式,包括文字、影像、音訊、影片和二進位制資料,並提供版本控制、生命週期策略、加密和存取控制等功能。由於其極高的靈活性和可擴充套件性,S3 經常被用作資料湖或資料中心,用於資料的攝取、處理和分發到各種應用程式和服務。

Amazon EC2

AWS EC2(https://aws.amazon.com/pm/ec2/)是一種雲端服務,提供虛擬運算資源,如 CPU、記憶體、儲存和網路,按需提供。EC2 允許使用者快速輕鬆地在雲端啟動和管理虛擬伺服器(稱為例項),使用多種作業系統,如 Linux、Windows 和 macOS。EC2 提供多種例項型別,從通用型到高效能,並提供多種定價選項,包括按需、預留和競價例項。最後,EC2 為執行各種型別的應用程式(如網頁伺服器、資料函式庫和機器學習(ML)模型)提供了一個可擴充套件且可靠的平台。

程式碼範例與詳細解說

以下是一個使用 AWS SDK for Python(Boto3)來建立 S3 儲存桶的簡單範例:

import boto3

# 建立 S3 客戶端
s3 = boto3.client('s3')

# 定義儲存桶名稱
bucket_name = 'my-s3-bucket'

# 建立 S3 儲存桶
try:
    response = s3.create_bucket(Bucket=bucket_name)
    print(f"儲存桶 {bucket_name} 建立成功!")
except Exception as e:
    print(f"建立儲存桶失敗:{e}")

內容解密:

  1. 匯入 Boto3 函式庫:首先,我們需要匯入 Boto3,這是 AWS SDK for Python,讓我們能夠與 AWS 服務互動。
  2. 建立 S3 客戶端:使用 boto3.client('s3') 方法建立一個 S3 客戶端,用於與 S3 服務進行互動。
  3. 定義儲存桶名稱:指定要建立的 S3 儲存桶的名稱。在實際應用中,應確保名稱的唯一性。
  4. 建立 S3 儲存桶:呼叫 create_bucket 方法,並傳入儲存桶名稱,以建立新的 S3 儲存桶。
  5. 錯誤處理:透過 try-except 結構捕捉並處理可能發生的異常,例如儲存桶名稱已被使用的錯誤。

AWS 雲端服務在 ETL 處理中的關鍵角色

AWS 提供了一套完整的雲端服務,以滿足不同資料儲存需求。無論是使用 Amazon RDS 的關聯式資料函式庫、Amazon Redshift 的資料倉儲,還是 Amazon S3 的物件儲存,或是利用 Amazon EC2 進行可擴充套件運算,AWS 的全面性資料儲存服務使工程師能夠在安全且可擴充套件的環境中有效地儲存、處理和分析資料。

靈活應用程式建置:S3 與 EC2 的協同運作

在探討用於自動化 ETL 開發的 AWS 應用程式之前,瞭解如何使用 AWS 的核心服務是至關重要的。S3 和 EC2 是 AWS 提供的兩項核心服務,經常被用來共同建置可擴充套件且靈活的雲端應用程式。

結合 S3 與 EC2 的優勢

S3 和 EC2 可以協同使用,建立一個強大且靈活的平台,用於在雲端中啟動可輕易擴充套件的應用程式。S3 可作為 EC2 例項的儲存後端,透過 S3 API、HTTP 或 CLI 存取資料。透過將 S3 用作 ETL 工作流程的資料來源或目標,可以在不同 S3 儲存桶中的臨時位置擷取、處理和儲存資料。EC2 例項可以直接從 S3 資料存取轉換後的輸出資料,無需跨越憑證牆複製或行動資料。

資料管道建置範例

資料擷取

S3 可用作來自各種來源(如物聯網裝置、記錄檔或批次處理)的資料擷取的著陸區。EC2 例項可用於執行自定義指令碼或應用程式,從來源讀取資料、轉換後上傳至 S3。這些資料可以根據資料型別和存取需求,使用不同的字首、資料夾或儲存桶策略在 S3 中儲存和組織。

資料轉換

S3 可用作需要轉換、聚合或豐富的中間或處理資料的儲存。EC2 例項可用於執行 ETL 工作流程,從 S3 讀取資料,使用 Apache Spark、Python 或 SQL 等工具進行轉換,然後將結果寫回 S3。這種方法可用於批次和串流處理,取決於資料速度和延遲需求。

資料分析

S3 可用作資料分析的資料來源或目標,使用 Amazon Athena、Amazon Redshift 或 Apache Hive 等工具。EC2 例項可用於執行查詢、生成報告或使用 Tableau、Power BI 或 Amazon QuickSight 等商業智慧工具視覺化資料。這種方法可用於即席和排程分析,取決於業務需求和資料來源。

資料儲存

S3 可用作長期儲存解決方案,用於資料歸檔、備份或災難還原(DR)。EC2 例項可用於執行備份指令碼或工具,將資料從 EC2 例項或其他資料來源複製到 S3。這種方法可以提供一種成本效益高且可擴充套件的解決方案,用於長時間儲存大量資料,具有高永續性和可用性。

AWS Glue:全託管的 ETL 服務

AWS Glue 是 AWS 提供的一項全託管的 ETL 服務,能夠自動發現、編目資料,並提供任務排程、錯誤處理和監控等功能。由於 Glue 是 AWS 服務,因此它直接與超過 70 種來源資料格式以及流行的 AWS 資料目標位置(如 Amazon S3、Amazon RDS 和 Amazon Redshift)整合。最重要的是,Glue 使用無伺服器架構,提供內建的高用性和按需付費的計費模式,從而提高敏捷性。

自定義轉換與 Apache Spark 環境

Glue 支援多種語言,如 Python、Scala 和 Java,用於建立自定義轉換。它還包含一個受管理的 Apache Spark 環境,可用於執行 PySpark 或 Scala 工作流程,以在雲端環境中使用平行處理技術處理大型資料集。Glue 可用於建置可擴充套件、可靠且成本效益高的資料管道,適用於各種使用場景,如資料倉儲、資料遷移和資料湖處理。

無伺服器運算與工作流程自動化

AWS Glue、AWS Lambda 和 AWS Step Functions 是 AWS 提供的三項雲端服務,提供無伺服器運算和工作流程自動化功能。這些服務使資料工程師和分析師能夠建立端對端的可擴充套件、可靠且成本效益高的資料管道。

#### 內容解密:

此段落描述了 AWS Glue 在 ETL 處理中的關鍵角色,以及其與其他 AWS 服務(如 S3 和 EC2)的整合優勢。同時,也介紹了 AWS Lambda 和 AWS Step Functions 在無伺服器運算和工作流程自動化方面的功能。這些服務共同為建置靈活、可擴充套件和高效的資料管道提供了強大的支援。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Python ETL 工具與 AWS 雲端服務應用

package "Python 應用架構" {
    package "應用層" {
        component [主程式] as main
        component [模組/套件] as modules
        component [設定檔] as config
    }

    package "框架層" {
        component [Web 框架] as web
        component [ORM] as orm
        component [非同步處理] as async
    }

    package "資料層" {
        database [資料庫] as db
        component [快取] as cache
        component [檔案系統] as fs
    }
}

main --> modules : 匯入模組
main --> config : 載入設定
modules --> web : HTTP 處理
web --> orm : 資料操作
orm --> db : 持久化
web --> cache : 快取查詢
web --> async : 背景任務
async --> fs : 檔案處理

note right of web
  Flask / FastAPI / Django
end note

@enduml

此圖示說明瞭使用 S3 和 EC2 建置資料管道的流程,包括資料擷取、處理、轉換、分析和視覺化等步驟。

#### 內容解密:

此結論總結了 AWS 在 ETL 處理中的關鍵角色,以及其提供的各種雲端服務如何協同工作以滿足不同資料需求。同時,也強調了這些服務在建置靈活、可擴充套件和高效的資料管道方面的重要性。