實體化檢視能有效提升查詢效能,尤其適用於需頻繁執行複雜查詢的場景。透過預先計算和儲存查詢結果,可避免重複計算,例如計算房源與旅遊景點的距離。資料管線的協調對於自動化資料處理至關重要,AWS Step Function 提供了彈性的狀態機器,能定義和管理任務的執行順序和依賴關係,並根據任務的執行結果執行不同的操作。在建構資料管線時,需要考量如何觸發管線執行,例如排程型或事件驅動型,並選擇合適的 AWS 服務,例如 AWS Data Pipeline、AWS Glue Workflows 或 AWS Step Function。

資料載入資料集市的實務操作與最佳化

在前面的章節中,我們已經深入瞭解了雲端資料倉儲的設計與最佳化,接下來,我們將透過實際操作來示範如何將資料載入Amazon Redshift叢集並執行查詢。

建立實體化檢視以最佳化查詢效能

當我們的業務代理需要定期執行複雜查詢以找到合適的Airbnb房源時,我們可以建立實體化檢視來儲存清單資料以及房源與特定旅遊景點之間的距離。這樣一來,就可以避免每次查詢時都重新計算距離,從而提高查詢效率。

CREATE MATERIALIZED VIEW listings_touristspot_distance_view AS
WITH touristspots_raw(name, lon, lat) AS (
    (SELECT 'Freedom Tower', -74.013382, 40.712742) UNION
    (SELECT 'Empire State Building', -73.985428, 40.748817)
),
touristspots(name, location) AS (
    SELECT name, ST_Point(lon, lat)
    FROM touristspots_raw
),
accommodation(listing_id, name, room_type, price, location) AS (
    SELECT listing_id, name, room_type, price, ST_Point(longitudes, latitude)
    FROM accommodation_local.listings
)
SELECT 
    touristspots.name AS tourist_spot,
    accommodation.listing_id AS listing_id,
    accommodation.name AS location_name,
    (ST_DistanceSphere(touristspots.location, accommodation.location) / 1000)::decimal(10, 2) AS distance_in_km,
    accommodation.price AS price,
    accommodation.room_type AS room_type
FROM touristspots, accommodation;

內容解密:

  1. 建立實體化檢視:使用CREATE MATERIALIZED VIEW語法建立一個名為listings_touristspot_distance_view的實體化檢視。
  2. 定義臨時表:使用WITH子句定義三個臨時表:touristspots_rawtouristspotsaccommodation
    • touristspots_raw包含旅遊景點的名稱、經度和緯度。
    • touristspotstouristspots_raw中的經緯度轉換為地理點。
    • accommodationaccommodation_local.listings表中提取房源資訊,並將經緯度轉換為地理點。
  3. 計算距離:使用ST_DistanceSphere函式計算旅遊景點與房源之間的距離,並將結果轉換為公里單位,保留兩位小數。
  4. 組合資料:將touristspotsaccommodation表進行笛卡爾積,取得每個房源與每個旅遊景點之間的距離。

查詢實體化檢視

建立實體化檢視後,我們可以直接查詢該檢視以取得所需資料,而無需每次都重新計算距離。

SELECT * 
FROM listings_touristspot_distance_view 
WHERE tourist_spot LIKE 'Empire%' 
ORDER BY distance_in_km 
LIMIT 100;

內容解密:

  1. 查詢實體化檢視:直接從listings_touristspot_distance_view檢視中查詢資料。
  2. 篩選資料:使用WHERE子句篩選出與「Empire State Building」相關的資料。
  3. 排序與限制結果:使用ORDER BY子句按距離排序,並使用LIMIT限制結果為前100筆資料。

資料管道的協調

要建立資料管道,我們需要一個協調引擎來定義和管理任務的順序,以及任務之間的依賴關係。協調引擎還需要足夠智慧,以便根據任務的失敗或成功執行不同的操作,並且能夠定義和執行平行執行的任務,以及按順序執行的任務。

在本章中,我們將探討如何使用不同的協調引擎來管理資料管道。首先,我們將研究管道協調的一些核心概念,然後回顧AWS中用於協調資料管道的幾個不同選項。在本章的實作活動中,我們將使用AWS Step Function服務來協調資料管道。

在本章中,我們將涵蓋以下主題:

  • 瞭解管道協調的核心概念
  • 檢查AWS中協調管道的選項
  • 實作 – 使用AWS Step Function協調資料管道

技術需求

要完成本章的實作練習,您需要一個AWS帳戶,其中具有管理員許可權的使用者(正如第1章《資料工程介紹》中所介紹的)。我們將使用各種AWS服務,包括AWS Lambda、AWS Step Function和Amazon Simple Notification Service(SNS)。

瞭解管道協調的核心概念

在第5章《資料工程管道架構》中,我們設計了一個資料管道的高階概述。我們研究了潛在的資料來源,討論了可能需要的資料轉換型別,並研究瞭如何使轉換後的資料可供我們的資料消費者使用。

資料管道是什麼,如何協調它?

一個簡單的定義是,資料管道是一組需要按特定順序執行的資料處理任務。有些任務可能需要按順序執行,而其他任務可能可以平行執行。您也可以將這些任務的排序稱為工作流程。

資料管道協調是指自動執行資料管道工作流程中涉及的任務,管理不同任務之間的依賴關係,並確保管道在預期時間執行。

有向無環圖是什麼?

在討論資料管道時,您可能會聽到有向無環圖(DAG)這個術語。如果您在Google上搜尋這個術語,您可能會發現很多關於DAG的複雜數學解釋。這是因為這個術語不僅適用於資料管道,還用於定義許多不同型別的有序流程。例如,DAG也用於設計編譯器。

DAG的一個簡單解釋是,它代表節點之間的連線,節點之間的流動始終只發生在一個方向上,並且永遠不會迴圈回早期的節點(無環意味著不是一個迴圈)。

以下圖表顯示了一個簡單的DAG:

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title 資料載入資料集市實務最佳化

package "資料庫架構" {
    package "應用層" {
        component [連線池] as pool
        component [ORM 框架] as orm
    }

    package "資料庫引擎" {
        component [查詢解析器] as parser
        component [優化器] as optimizer
        component [執行引擎] as executor
    }

    package "儲存層" {
        database [主資料庫] as master
        database [讀取副本] as replica
        database [快取層] as cache
    }
}

pool --> orm : 管理連線
orm --> parser : SQL 查詢
parser --> optimizer : 解析樹
optimizer --> executor : 執行計畫
executor --> master : 寫入操作
executor --> replica : 讀取操作
cache --> executor : 快取命中

master --> replica : 資料同步

note right of cache
  Redis/Memcached
  減少資料庫負載
end note

@enduml

此圖示:代表了一個簡單的有向無環圖,其中節點之間的連線代表了任務之間的依賴關係。

內容解密:

此圖表顯示了一個簡單的DAG,其中事件A觸發事件B和C,事件B觸發事件F,事件C和B觸發事件D,事件D觸發事件E。在這個例子中,事件F永遠不能迴圈回事件A、B或C,因為這會破壞DAG定義的無環部分。

雖然某些協調工具要求將資料管道定義為DAG,但AWS Step Function允許在狀態機器的定義中迴圈,因此根據Step Function的管道不需要強制遵循DAG的定義。

管道協調的核心概念

我們現在需要將ETL過程中涉及的各個步驟結合起來,以實作資料處理的操作化和自動化。但在深入研究用於實作這一目標的AWS服務之前,讓我們先研究一下管道協調的一些關鍵概念。

沒有規定資料管道必須被定義為DAG,雖然某些協調工具要求這樣做。例如,Apache Airflow要求將管道定義為DAG,如果管道定義中存在迴圈,則無法執行。但是,AWS Step Function允許在狀態機器的定義中包含迴圈,因此根據Step Function的管道不需要是DAG。

import boto3

# 定義 AWS Step Function 的客戶端
sfn_client = boto3.client('stepfunctions')

# 定義狀態機器的 ARN
state_machine_arn = 'arn:aws:states:REGION:ACCOUNT_ID:stateMachine:STATE_MACHINE_NAME'

# 定義輸入資料
input_data = {'key': 'value'}

# 啟動狀態機器的執行
response = sfn_client.start_execution(
    stateMachineArn=state_machine_arn,
    input=json.dumps(input_data)
)

# 取得執行的 ARN
execution_arn = response['executionArn']

print(f'執行的 ARN:{execution_arn}')

內容解密:

此程式碼段使用Boto3函式庫定義了AWS Step Function的客戶端,並啟動了狀態機器的執行。它首先匯入必要的函式庫,然後定義了狀態機器的ARN和輸入資料。最後,它使用start_execution方法啟動狀態機器的執行,並取得執行的ARN。

此程式碼的作用是啟動一個AWS Step Function狀態機器的執行,並將輸入資料傳遞給狀態機器。它使用了Boto3函式庫,這是AWS的Python SDK,用於與AWS服務進行互動。程式碼中使用的start_execution方法是Step Function客戶端的一部分,用於啟動狀態機器的執行。

資料管線協調的核心概念理解

如何觸發資料管線執行?

資料管線的觸發主要有兩種型別:排程型管線事件驅動型管線。傳統上,管線多依照排程執行,例如每天、每小時或每15分鐘執行一次,這種方式在批次導向的管線中仍然常見。舉例來說,第二個管線可能是每天執行一次,將當天接收到的合作夥伴檔案進行整合與豐富化處理。

然而,現今許多管線被設計為事件驅動型,即在特定事件完成後才觸發管線執行。事件驅動的工作流程能有效減少資料可用與管線處理之間的延遲。例如,若預計在凌晨4點到6點之間接收必要資料檔案,可以將管線排程在6點執行;但若某些日子裡資料在5點前就已全部到位,使用事件驅動觸發便能讓管線提早執行。

在前述例子中,第一個管線便是事件驅動型,當合作夥伴上傳新檔案時便觸發執行。在AWS中,有很強大的功能支援事件驅動活動,例如根據檔案寫入特定S3儲存桶的事件來觸發管線或其他活動。

使用清單檔案作為管線觸發器

在資料管線領域,清單檔案常用來記錄一批檔案的相關資訊。例如,當接收合作夥伴傳來的檔案時,若他們每小時傳送數百個小型CSV檔案,可能希望將一批檔案一起處理並轉換為單一Parquet檔案。

此時,可以要求合作夥伴在每批次檔案傳送結束時附上清單檔案,列出所有傳輸檔案的名稱,並包含驗證資料,如檔案大小或SHA-256雜湊值。接著,組態S3事件通知,使其僅在名稱以「manifest」開頭的檔案寫入儲存桶時才觸發管線。

管線啟動後,第一步是讀取清單檔案,並逐一驗證清單中所列檔案是否存在,甚至可以計算每個檔案的SHA-256雜湊值並與清單中的值進行比對。驗證完成後,再執行ETL任務,將所有檔案讀入並以Parquet格式輸出。這種方式仍被視為事件驅動型管線,雖然不是對每個檔案上傳事件做出反應,而是對一批上傳完成的事件進行回應。

如何處理管線中的步驟失敗?

在自動化管線處理流程中,正確處理失敗情況至關重要。同時,也需要確保與每個步驟相關的日誌檔案易於存取。本文將探討失敗處理和日誌記錄的一些重要概念。

資料管線常見的失敗原因

資料管線中的某個步驟可能會因為多種原因而失敗,以下是一些常見的原因:

  • 資料品質問題:如果某個步驟預期接收CSV檔案進行處理,但實際收到的是JSON格式的檔案,而該步驟無法處理這種格式,就會導致嚴重的失敗(即需要解決資料品質問題後才能還原運作的失敗)。
  • 程式碼錯誤:更新作業時,可能會引入語法或邏輯錯誤。雖然在佈署至生產環境前進行程式碼測試非常重要,但測試仍有可能遺漏某些錯誤,這也會導致嚴重失敗,需要重新佈署修復後的程式碼。
  • 端點錯誤:某個步驟可能需要從特定端點讀取或寫入資料(例如從S3讀取檔案或寫入資料倉儲)。有時,這些錯誤是由於暫時性問題,例如暫時性網路錯誤,這可以視為軟性失敗(即透過重試可能得以克服的失敗)。但有些時候,錯誤可能是嚴重失敗,例如作業未組態正確許可權而無法存取端點。

內容解密:

上述段落說明瞭資料管線中常見的三類別錯誤及其成因:

  1. 資料品質問題:這類別錯誤通常發生在預期輸入格式與實際輸入格式不符時,例如預期CSV卻收到JSON格式。
  2. 程式碼錯誤:更新程式碼時引入的語法或邏輯錯誤會導致作業失敗,需要重新測試及佈署修復後的程式碼。
  3. 端點錯誤:讀寫端點時的錯誤可能源於暫時性網路問題(軟性失敗)或許可權組態錯誤(嚴重失敗)。

這些錯誤型別的理解對於設計可靠的資料管線至關重要,能夠幫助開發人員針對不同型別的錯誤採取適當的處理措施,例如重試機制或手動干預。

import boto3
from botocore.exceptions import ClientError

def verify_manifest_file(manifest_file, s3_bucket):
    s3 = boto3.client('s3')
    try:
        # 讀取清單檔案
        response = s3.get_object(Bucket=s3_bucket, Key=manifest_file)
        manifest_content = response['Body'].read().decode('utf-8')
        # 解析清單內容
        files_to_verify = manifest_content.splitlines()
        for file_entry in files_to_verify:
            file_name, file_size, file_hash = file_entry.split(',')
            # 驗證每個檔案是否存在
            try:
                s3.head_object(Bucket=s3_bucket, Key=file_name)
            except ClientError as e:
                if e.response['Error']['Code'] == '404':
                    print(f"File {file_name} not found.")
                    return False
            # 驗證檔案大小和雜湊值
            # ...
        return True
    except ClientError as e:
        print(f"Error reading manifest file: {e}")
        return False

# 使用範例
manifest_file = 'manifest.txt'
s3_bucket = 'my-data-bucket'
if verify_manifest_file(manifest_file, s3_bucket):
    print("All files verified successfully.")
else:
    print("Verification failed.")

內容解密:

這段Python程式碼展示瞭如何驗證S3儲存桶中的清單檔案及其所列檔案:

  1. 程式使用boto3函式庫與S3互動,首先嘗試讀取指定的清單檔案。
  2. 清單檔案內容被假設為以逗號分隔的簡單文字格式,包含檔案名稱、大小和雜湊值。
  3. 對每個清單中的檔案,程式檢查其是否存在於S3中,並可進一步擴充套件以驗證大小和雜湊值。
  4. 若所有檢查透過,則傳回True;否則傳回False並輸出錯誤訊息。
  5. 程式中使用了例外處理來捕捉ClientError,特別是檢查檔案是否存在的head_object呼叫可能會丟出404錯誤。

此範例展示瞭如何在AWS S3環境中實作基本的清單檔案驗證邏輯,能夠確保資料完整性並提前發現問題。

檢視AWS中的資料管線協調選項

在AWS中建立和協調資料管線有多種工具可供選擇,包括AWS Data Pipeline、AWS Step Function、Amazon Managed Workflows for Apache Airflow(MWAA)和AWS Glue Workflows。每種解決方案都有其優缺點,選擇時需要考慮多個因素,如管理努力程度、與目標ETL引擎的整合難易度、日誌記錄、錯誤處理機制、成本和平台獨立性。

AWS Data Pipeline:管理資料來源之間的ETL

AWS Data Pipeline是AWS最早用於建立和協調資料管線的服務之一,於2012年首次發布。使用此服務,您可以在特定的AWS資料來源之間提取、轉換和載入資料,甚至可以處理本地資料來源。

資料來源和目標

Data Pipeline支援以下AWS資料服務作為來源和目標:

  • Amazon DynamoDB
  • Amazon Relational Database System(RDS)
  • Amazon Redshift
  • Amazon S3

此外,Data Pipeline還能夠讀寫其他JDBC資料儲存,例如本地資料函式庫。

運算服務

可以使用以下運算服務來執行作業以轉換資料:

  • Amazon EC2
  • Amazon EMR
  • 本地運算資源(透過安裝根據Java的Data Pipeline任務執行器軟體)

限制和建議

由於Data Pipeline的更新相對較少,例如最後一次檔案更新是在2018年,預設的EC2例項大多數地區是m1例項家族(儘管可以使用更新的世代,如m5例項),並且任務執行器軟體僅支援Java 1.6和Java 1.8版本。此外,Data Pipeline服務僅在五個AWS區域(北弗吉尼亞、俄勒岡、悉尼、東京和愛爾蘭)受支援。

因此,通常建議使用更新的AWS服務來建立和協調資料管線。

AWS Glue Workflows:協調Glue資源

在第3章《AWS資料工程師工具包》中,我們介紹了AWS Glue Workflows服務。作為AWS Glue服務的一部分,它可用於建立由Glue元件(Glue Crawlers和Glue Spark或Python作業)組成的資料管線。

使用案例

對於僅使用AWS Glue元件建立資料管線的使用案例,Glue Workflows是一個不錯的選擇。例如,您可以使用Glue Workflows建立以下管線:

  1. 執行Glue Crawler,將新擷取的CSV檔案新增到Glue Data Catalog的新分割槽中。
  2. 執行Glue Spark作業,使用目錄讀取新資料,然後將CSV檔案轉換為Parquet檔案。
  3. 執行另一個Glue Crawler,將新轉換的Parquet檔案新增到Glue Data Catalog。
  4. 平行執行兩個Glue作業。一個Glue作業聚合資料並將結果寫入DynamoDB表格。另一個Glue作業建立一個新的豐富資料集,將新資料與現有的參考資料集合併。
  5. 執行另一個Glue Crawler,將新的豐富資料集新增到Glue Catalog。
  6. 執行Glue Python Shell作業,傳送有關作業成功或失敗的通知。

程式碼範例與解密

import boto3

# 初始化 Glue 使用者端
glue = boto3.client('glue')

# 定義 Glue Workflow 的步驟
def lambda_handler(event, context):
    # 啟動 Glue Crawler
    glue.start_crawler(Name='my_crawler')
    
    # 啟動 Glue Spark 作業
    glue.start_job_run(JobName='my_spark_job')
    
    return {
        'statusCode': 200,
        'statusMessage': 'OK'
    }

內容解密:

上述程式碼展示瞭如何使用Python的boto3函式庫與AWS Glue互動。首先,我們初始化了一個Glue使用者端。然後,在Lambda函式中,我們定義了啟動Glue Crawler和Glue Spark作業的步驟。這段程式碼邏輯簡單明瞭,首先透過start_crawler方法啟動名為my_crawler的Glue Crawler,接著使用start_job_run方法啟動名為my_spark_job的Glue Spark作業。最後,函式傳回一個包含狀態碼和狀態訊息的字典,表示操作成功與否。

管線失敗重試策略

在設計管線時,應考慮為失敗的步驟實施重試策略。許多協調工具(如Apache Airflow和AWS Step Function)允許您指定重試次數、重試間隔以及退避率。

重試退避率

重試退避率(也稱為指數退避)會導致重試間隔時間在每次重試時增加。例如,在AWS Step Function中,您可以指定一個BackOffRate值,該值將重試之間的延遲乘以該值。如果您指定重試間隔為10秒,重試退避率為1.5,則Step Function將在第二次重試時等待15秒(10秒 x 1.5),第三次重試時等待22.5秒(15秒 x 1.5),依此類別推。