在資料工程領域,建構穩固的 ETL 管線至關重要。本文將引導讀者利用 AWS 平台上的美國建築市場資料,開發一個兼具效率和可靠性的 ETL 流程。我們將使用 Python 搭配 boto3、pandas 和 sqlalchemy 等函式庫,示範如何從 S3 擷取資料、進行必要的轉換,最後載入 PostgreSQL 或 Redshift 資料函式庫。程式碼範例中包含了錯誤處理機制,確保管線的穩定性。此外,我們也將介紹如何撰寫單元測試,以驗證 ETL 流程的各個環節,提升程式碼品質和可維護性。最後,文章將提供完整的程式碼和執行步驟,讓讀者可以快速上手,並將其應用於實際的資料處理場景。
使用案例與深入閱讀
技術需求
要有效利用本章提供的資源和程式碼範例,請確保您的系統符合以下技術需求:
- 軟體需求:
- 整合開發環境(IDE):建議使用 PyCharm 作為主要的 Python IDE,我們可能會在本章中對 PyCharm 進行特定參考。然而,您可以自由選擇任何與 Python 相容的 IDE。
- 應安裝 Jupyter Notebooks。
- 應安裝 Python 3.6 或更高版本。
- 應安裝 Pipenv 以管理依賴項。
- GitHub 儲存函式庫:
建立資料管道的實作練習
我們的第一次實作練習旨在加強我們在本文中討論的設計概念和結構。在這個練習中,我們將使用一個電子商務資料集。資料由三個 CSV 檔案組成:orders.csv、products.csv 和 customers.csv。orders.csv 檔案包含每個訂單的詳細資訊,例如訂單 ID、客戶 ID、產品 ID、數量和訂單日期。products.csv 檔案包含每個產品的資訊,例如產品 ID、名稱和價格。最後,customers.csv 檔案包含客戶詳細資訊,包括客戶 ID、名稱和位置。
程式碼重構
# 匯入必要的函式庫
import pandas as pd
from sqlalchemy import create_engine
# 第 1 步:資料提取
# 將 CSV 檔案載入 pandas DataFrame
orders = pd.read_csv('orders.csv')
products = pd.read_csv('products.csv')
customers = pd.read_csv('customers.csv')
# 第 2 步:資料轉換
# 清理資料(例如,處理遺失值,修正資料型別)
orders = orders.dropna()
products = products.dropna()
customers = customers.dropna()
# 合併 orders 和 products DataFrame
data = pd.merge(orders, products, on='product_id')
# 計算每個訂單的總價格
data['total_price'] = data['quantity'] * data['price']
# 合併結果 DataFrame 與 customers DataFrame
data = pd.merge(data, customers, on='customer_id')
# 第 3 步:資料載入
# 建立與 PostgreSQL 資料函式庫的連線
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
# 將 DataFrame 載入資料函式庫作為新表格
data.to_sql('EcommerceData', engine, if_exists='replace', index=False)
#### 內容解密:
- 資料提取:使用
pd.read_csv將 CSV 檔案讀入 DataFrame。 - 資料轉換:使用
dropna處理遺失值,並使用pd.merge合併 DataFrame。計算總價格並新增到 DataFrame。 - 資料載入:使用
create_engine建立與 PostgreSQL 資料函式庫的連線,並使用to_sql將 DataFrame 載入資料函式庫。
重構 ETL 管道
本練習的目標是幫助您建立一個 ETL 管道,從 CSV 檔案中提取資料,執行多項轉換(例如資料清理、合併資料和計算新指標),並將結果載入 PostgreSQL 資料函式庫。最終產品將是一個重構的專案目錄,結構如下:
project: ecommerce
├── data
│ ├── orders.csv
│ ├── products.csv
│ └── customers.csv
├── etl
│ ├── __init__.py
│ ├── extract.py
│ ├── transform.py
│ └── load.py
└── config.yaml
└── pipeline.py
紐約黃色計程車資料、ETL 管道和佈署
在這個案例中,我們將建立一個更類別似於您在專業環境中可能遇到的管道。這條管道將包括專業的編碼實踐,例如錯誤處理、易於擴充套件的模組化和單元測試。我們將使用紐約 2021 年黃色計程車行程資料(https://data.cityofnewyork.us/Transportation/2021-Yellow-Taxi-Trip-Data/m6nq-qud6),這是一個開源資料集,比前面的例子中的資料更大、更複雜。它包含有關紐約市計程車行程的詳細資訊,例如上車和下車時間、行程距離、車費金額等。
使用此結構,您將建立一個強健且適合生產的 ETL 管道,從資料中提取資料,執行三到五種我們在第 5 章中討論的資料轉換方法,並將結果載入 PostgreSQL 資料函式庫。完成管道後,您需要新增錯誤處理和單元測試以加強管道。
#### 內容解密:
- 選擇適當的資料集:選擇一個更複雜和更大的開源資料集,例如紐約黃色計程車行程資料。
- 建立強健的 ETL 管道:使用專業的編碼實踐,例如錯誤處理、模組化和單元測試,建立一個適合生產的 ETL 管道。
- 執行多種資料轉換:執行多種資料轉換方法,例如資料清理、合併資料和計算新指標,將原始資料轉換為有用的格式。
建構強健的 ETL 管道於 AWS 中處理美國建築資料
簡介
在本章節中,我們將透過實際案例來建構一個 ETL(提取、轉換、載入)管道,使用來自 AWS Marketplace 的美國建築市場資料。該資料倉儲包含了住宅、商業及太陽能建設專案等豐富內容,為我們的練習提供了良好的基礎。
步驟 1:組態設定
首先,我們需要建立一個名為 config.py 的組態檔案,用於存放所有與 ETL 管道相關的組態變數,例如資料函式庫連線字串、資料表名稱及檔案路徑等。這樣的做法能夠使我們的程式碼更加簡潔且易於維護。
# config.py
DATABASE_CONNECTION = 'postgresql://username:password@localhost:5432/mydatabase'
TABLE_NAME = 'ConstructionData'
FILE_PATH = 'construction_data.parquet'
S3_BUCKET_NAME = 'my-s3-bucket'
內容解密:
DATABASE_CONNECTION:定義了連線到 PostgreSQL 資料函式庫所需的完整字串。TABLE_NAME:指定了資料將被載入的資料表名稱。FILE_PATH:指明瞭本地儲存的 Parquet 檔案路徑。S3_BUCKET_NAME:儲存了 AWS S3 的儲存桶名稱。
步驟 2:ETL 管道指令碼
接下來,我們將建立主要的 ETL 管道指令碼 etl_pipeline.py。該指令碼包含了 ETL 處理流程中的每個步驟:extract_data()、transform_data() 和 load_data()。每個函式都具備錯誤處理機制,以管理執行期間可能出現的問題。
# etl_pipeline.py
import pandas as pd
from sqlalchemy import create_engine
import boto3
from config import DATABASE_CONNECTION, TABLE_NAME, FILE_PATH, S3_BUCKET_NAME
def extract_data(file_path):
# 從 Parquet 檔案讀取資料
df = pd.read_parquet(file_path)
return df
def transform_data(df):
# 進行資料轉換,例如計算新的欄位
df['new_column'] = df['existing_column'] * 2
return df
def load_data(df, table_name, database_connection):
# 將 DataFrame 載入 PostgreSQL 資料函式庫
engine = create_engine(database_connection)
df.to_sql(table_name, engine, if_exists='replace', index=False)
def upload_to_s3(df, bucket_name, file_path):
# 將 DataFrame 上傳至 S3 儲存桶
s3 = boto3.client('s3')
df.to_parquet(file_path, index=False)
s3.upload_file(file_path, bucket_name, file_path)
def run_etl_pipeline():
df = extract_data(FILE_PATH)
df = transform_data(df)
load_data(df, TABLE_NAME, DATABASE_CONNECTION)
upload_to_s3(df, S3_BUCKET_NAME, FILE_PATH)
內容解密:
extract_data():從指定的 Parquet 檔案中提取資料。transform_data():對提取的資料進行必要的轉換。load_data():將轉換後的資料載入指定的 PostgreSQL 資料表。upload_to_s3():將處理後的資料上傳至指定的 S3 儲存桶。
步驟 3:單元測試
最後,為了確保 ETL 管道的可靠性,我們需要撰寫單元測試。這些測試將驗證管道中每個函式的正確性,並幫助我們在未來修改和擴充套件程式碼時捕捉任何問題或迴歸。
# test_etl_pipeline.py
import unittest
from etl_pipeline import extract_data, transform_data, load_data
from config import DATABASE_CONNECTION, TABLE_NAME, FILE_PATH
class TestETLPipeline(unittest.TestCase):
def test_extract_data(self):
df = extract_data(FILE_PATH)
self.assertIsNotNone(df)
self.assertEqual(df.shape[1], 18) # 假設資料有 18 個欄位
def test_transform_data(self):
df = extract_data(FILE_PATH)
df = transform_data(df)
self.assertIn('new_column', df.columns)
def test_load_data(self):
df = extract_data(FILE_PATH)
df = transform_data(df)
load_data(df, TABLE_NAME, DATABASE_CONNECTION)
# 在實際使用中,通常會建立資料函式庫連線並進行驗證
if __name__ == '__main__':
unittest.main()
內容解密:
- 使用
unittest.TestCase建立測試類別TestETLPipeline。 test_extract_data():驗證提取的 DataFrame 不為空且欄位數量正確。test_transform_data():檢查轉換後的 DataFrame 是否包含預期的欄位。test_load_data():驗證資料是否能夠成功載入資料函式庫。
建構AWS上的美國建築資料ETL管線實務
步驟一:資料擷取
首先,我們需要從AWS S3下載美國建築市場資料。為此,我們將使用boto3函式庫來與AWS SDK互動。
import boto3
def extract_data(bucket_name, file_key, local_path):
s3 = boto3.client('s3')
s3.download_file(bucket_name, file_key, local_path)
print(f"已從{bucket_name}下載{file_key}到{local_path}")
內容解密:
boto3.client('s3'):建立一個S3客戶端,用於與S3儲存桶互動。s3.download_file:從指定的S3儲存桶下載檔案到本地路徑。bucket_name、file_key和local_path分別代表S3儲存桶名稱、檔案鍵值和本地儲存路徑。
步驟二:資料轉換
接下來,我們需要對下載的資料進行轉換。假設我們的資料集包含每個建築專案的開始和結束日期,我們可以使用pandas函式庫來計算專案的持續時間。
import pandas as pd
def transform_data(local_path):
df = pd.read_csv(local_path)
df['start_date'] = pd.to_datetime(df['start_date'])
df['end_date'] = pd.to_datetime(df['end_date'])
df['duration'] = df['end_date'] - df['start_date']
df.loc[df['end_date'].isna(), 'duration'] = pd.Timestamp.today() - df['start_date']
df['duration'] = df['duration'].dt.days
print(f"已轉換{local_path}中的資料")
return df
內容解密:
pd.read_csv(local_path):讀取CSV檔案到DataFrame。pd.to_datetime:將字串日期轉換為日期時間格式。df['end_date'] - df['start_date']:計算專案的持續時間。df.loc[df['end_date'].isna(), 'duration'] = pd.Timestamp.today() - df['start_date']:對於尚未結束的專案,使用當前日期計算持續時間。df['duration'].dt.days:將持續時間轉換為天數。
步驟三:資料載入
最後,我們需要將轉換後的資料載入到Amazon Redshift叢集。使用sqlalchemy模組可以實作這一步驟。
from sqlalchemy import create_engine
def load_data(df, table_name, redshift_conn_str):
engine = create_engine(redshift_conn_str)
df.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"已將資料載入{table_name}")
內容解密:
create_engine(redshift_conn_str):建立一個Redshift連線引擎。df.to_sql:將DataFrame寫入Redshift表格。if_exists='replace':如果表格已存在,則替換它。
執行ETL管線
設定好AWS S3環境後,定義必要的引數,然後執行ETL管線。
s3_BUCKET_NAME = "your_s3_bucket_name"
CMDW_FILE_KEY = "your_filename"
LOCAL_PATH = "data/us_construction_extract.csv"
REDSHIFT_TABLE = "your_redshift_table"
REDSHIFT_CONN_STR = "your_redshift_conn_str"
def run_etl_pipeline(bucket_name, file_key, local_path, table_name, redshift_conn_str):
extract_data(bucket_name, file_key, local_path)
df = transform_data(local_path)
load_data(df, table_name, redshift_conn_str)
if __name__ == '__main__':
run_etl_pipeline(bucket_name=s3_BUCKET_NAME,
file_key=CMDW_FILE_KEY,
local_path=LOCAL_PATH,
table_name=REDSHIFT_TABLE,
redshift_conn_str=REDSHIFT_CONN_STR)
內容解密:
- 定義必要的引數,如S3儲存桶名稱、檔案鍵值、本地路徑、Redshift表格名稱和連線字串。
run_etl_pipeline函式依序執行資料擷取、轉換和載入。