Python 提供豐富的函式庫,適合建構 ETL 資料處理流程。本文將以芝加哥交通事故資料為例,示範如何從 CSV 檔案擷取資料,進行必要的清理和轉換,最終載入至 PostgreSQL 資料函式庫。過程中將使用 Pandas 進行資料操作,psycopg2 進行資料函式庫互動,並涵蓋資料去重、缺失值處理、資料型別轉換和 DataFrame 合併等關鍵步驟,確保資料品質和流程可靠性。

在本地環境中為資料載入活動做準備

在上一章中,我們介紹了使用 SQLite3 資料函式庫進行資料載入的基本方法。然而,在實際的資料處理流程中,SQLite3 並不是最常用的資料儲存解決方案。因此,本章將指導您如何在本地環境中建立 PostgreSQL 資料函式庫,以便在後續章節中使用它作為資料載入的輸出位置。

為資料載入活動準備本地環境

本文將使用本地 PostgreSQL 資料函式庫作為資料載入的目的地。PostgreSQL 是一種免費且開源的關聯式資料函式倉管理系統(RDBMS),支援 SQL 相容性。您可以透過 PostgreSQL 的維基百科頁面 瞭解更多相關資訊。

下載並安裝 PostgreSQL

根據您的裝置型別,請選擇相應的下載連結(https://www.enterprisedb.com/downloads/postgres-postgresql-downloads)並按照以下步驟進行安裝:

  1. 下載 postgres.app 安裝程式。
  2. 點選 .dmg 檔案並雙擊方框以啟動下載。
  3. 下載完成後,EDB PostgreSQL 安裝精靈將會啟動。請確保在安裝過程中選擇以下設定:
    • 安裝目錄:/Library/PostgreSQL/15
    • 連線埠:5432
    • 選擇元件:
      • PostgreSQL 伺服器
      • pgAdmin 4
      • Stack Builder
      • 命令列工具
    • 資料目錄:/Library/PostgreSQL/15/data
    • 密碼(請輸入您的主密碼)
    • 進階選項(預設地區)
    • Stack Builder(取消勾選「關閉時開啟」)

開啟 pgAdmin 並新增伺服器

  1. 在本地裝置上開啟 pgAdmin 應用程式,並輸入您在 PostgreSQL 安裝精靈中設定的密碼。

    ### 圖 6.1:PostgreSQL 輸入主密碼的截圖
    
  2. 在首頁的快速連結區域,選擇「新增伺服器」:

    • 名稱:Packt_ETL
    • 主機名稱:localhost
    • 連線埠:5432
    • 使用者名稱:postgre
    • 資料函式庫:chicago_dmv
    • 密碼(請輸入您的主密碼)
    • 連線 URL:postgresql://localhost

在 PostgreSQL 中建立資料結構描述(Schema)

在成功建立 PostgreSQL 環境和資料函式庫後,我們需要為資料表新增資料結構描述(Schema)。由於我們將 chicago_dmv 資料函式庫視為輸出資料位置,因此需要為儲存在特定資料表中的欄位和資料型別定義資料結構描述。

建立 chicago_schema

  1. 在 pgAdmin 中,點選資料函式庫,選擇「建立」 > 「結構描述(Schema)」,然後輸入結構描述名稱 chicago_schema
  2. 定義以下欄位及其屬性:
    名稱資料型別可為 NULL主鍵預設值
    primaryIDint01
    num_columnbigint-6749380428
    string_columnchar-“this is a string”
    json_columnjson-{ “key”: 1 }

這些資料需求定義了每個欄位的預期資料型別,從而確保插入到該資料表的資料必須具有相同的欄位名稱和資料型別。否則,資料匯入過程將會失敗。

使用 Python 建立端對端 ETL 資料處理流程

Python 是一種功能豐富的程式語言,具備多樣化的函式庫和工具,非常適合建立強健、可靠且彈性的 ETL 資料處理流程。到目前為止,本文已逐步介紹了 ETL 流程中的每個步驟。

在本章中,我們將透過實務範例展示如何使用 Python 相關工具建立完整的端對端 ETL 資料處理流程。透過本章的學習,您將能夠從來源 CSV 檔案中擷取資料、執行必要的清理和轉換操作,並將處理後的資料載入到 PostgreSQL 資料表中。

本章任務

  1. 資料擷取:讀取來源 CSV 檔案並將資料儲存到不同的 DataFrame 中。
  2. 資料清理和轉換:對每個 DataFrame 執行關鍵的資料清理和轉換操作,以準備將資料匯入到目標輸出位置:
    • 移除重複的列
    • 處理缺失值
    • 將欄位轉換為適當的資料型別
    • 將多個 DataFrame 合併為單一 DataFrame
    • 刪除不必要的欄位
    • 重新命名欄位以符合輸出資料結構描述
  3. 資料載入:將合併後的 DataFrame 中的資料載入到 PostgreSQL 資料表中:
    • 建立 PostgreSQL 資料函式庫
    • 根據結構描述建立 PostgreSQL 資料表

使用Python建立端對端ETL資料管線

專案介紹

本教學將引導您使用Python建立一個完整的ETL(Extract, Transform, Load)資料管線。該管線將從芝加哥開放資料平台提取資料,經過轉換後載入PostgreSQL資料函式庫。

技術需求

  • Python 3.6或更高版本
  • PyCharm或其他Python相容的IDE
  • Jupyter Notebook
  • Pipenv用於管理依賴項
  • PostgreSQL資料函式庫

專案背景

假設您的工程公司被一家名為SafeRoad的資料科學新創公司僱用,需要建立一個自定義的資料管線,以連線芝加哥開放資料平台。SafeRoad希望分析芝加哥的車禍資料,並找出導致這些事故的因素。

資料介紹

SafeRoad的資料科學團隊提供了所需的資料結構,包括以下表格:

  • Vehicle(車輛)
  • Person(人員)
  • Time(時間)
  • Crash(車禍)

這些表格具有結構化的資料格式,適合儲存在PostgreSQL資料函式庫中。

在PostgreSQL中建立表格

首先,您需要在PostgreSQL資料函式庫中建立所需的表格。

  1. 連線到PostgreSQL伺服器:
psql -U postgres
  1. 建立一個名為chicago_vehicle_crash_data的資料函式庫:
CREATE DATABASE chicago_vehicle_crash_data;
  1. chicago_vehicle_crash_data資料函式庫中建立一個名為chicago_dmv的結構:
CREATE SCHEMA chicago_dmv;
  1. chicago_dmv結構中建立所需的表格,例如VehiclePersonTimeCrash
CREATE TABLE chicago_dmv.Vehicle (
    CRASH_UNIT_ID integer,
    CRASH_ID text,
    CRASH_DATE timestamp,
    VEHICLE_ID integer,
    VEHICLE_MAKE text,
    VEHICLE_MODEL text,
    VEHICLE_YEAR integer,
    VEHICLE_TYPE text
);

CREATE TABLE chicago_dmv.Person (
    PERSON_ID text,
    CRASH_ID text,
    CRASH_DATE timestamp,
    PERSON_TYPE text,
    VEHICLE_ID integer,
    PERSON_SEX text,
    PERSON_AGE integer
);

CREATE TABLE chicago_dmv.Time (
    CRASH_DATE timestamp,
    CRASH_ID text,
    CRASH_HOUR integer,
    CRASH_DAY_OF_WEEK integer,
    CRASH_MONTH integer,
    DATE_POLICE_NOTIFIED timestamp
);

CREATE TABLE chicago_dmv.Crash (
    CRASH_UNIT_ID integer,
    CRASH_ID text,
    PERSON_ID text,
    VEHICLE_ID integer,
    NUM_UNITS numeric,
    TOTAL_INJURIES numeric
);

建立表格程式碼解析:

此段程式碼用於在PostgreSQL資料函式庫中建立所需的表格。首先,建立一個名為chicago_vehicle_crash_data的資料函式庫,然後在該資料函式庫中建立一個名為chicago_dmv的結構。接著,在chicago_dmv結構中建立四個表格:VehiclePersonTimeCrash,每個表格都有其特定的欄位和資料型別。

提取和載入資料

使用Pandas讀取CSV檔案,並將其儲存在DataFrame中:

import pandas as pd

try:
    # 讀取交通車禍CSV檔案並儲存在DataFrame中
    df_crashes = pd.read_csv("data/traffic_crashes.csv")
    # 讀取交通車禍車輛CSV檔案並儲存在DataFrame中
    df_vehicles = pd.read_csv("data/traffic_crash_vehicle.csv")
    # 讀取交通車禍人員CSV檔案並儲存在DataFrame中
    df_people = pd.read_csv("data/traffic_crash_people.csv")
except Exception as e:
    print(f"Error reading CSV files: {e}")

資料提取程式碼解析:

此段程式碼使用Pandas的read_csv函式讀取三個CSV檔案:traffic_crashes.csvtraffic_crash_vehicle.csvtraffic_crash_people.csv,並將其儲存在三個DataFrame中:df_crashesdf_vehiclesdf_people。如果讀取過程中發生錯誤,將列印錯誤訊息。

下一步:轉換和載入資料

在下一節中,我們將介紹如何將提取的資料轉換並載入PostgreSQL資料函式庫中的相應表格。敬請期待!

資料轉換與清理:建立可靠的ETL流程

在前一步驟中,我們成功地將交通意外資料載入Jupyter Notebook。現在,我們需要對資料進行清理和轉換,以確保資料的品質和可靠性。這些步驟對於建立一個可靠的ETL(提取、轉換、載入)流程至關重要。

資料清理與轉換步驟

  1. 移除重複資料:使用drop_duplicates()函式移除每個DataFrame中的重複列。

df = df.drop_duplicates()


2. **處理缺失值**:檢查DataFrame中的缺失值,並根據資料型別進行適當的處理。例如,使用`fillna()`函式將數值型欄位的缺失值替換為平均值,將類別型欄位的缺失值替換為眾數。
   ```python
# 將數值型欄位的缺失值替換為平均值
df.fillna(df.mean(), inplace=True)
# 將類別型欄位的缺失值替換為眾數
df.fillna(df.mode().iloc[0], inplace=True)
  1. 轉換資料型別:使用astype()pd.to_datetime()函式將欄位轉換為適當的資料型別,以便於後續處理。

將CRASH_DATE欄位轉換為日期時間型別

df[‘CRASH_DATE’] = pd.to_datetime(df[‘CRASH_DATE’], format=’%m/%d/%Y’)

將POSTED_SPEED_LIMIT欄位轉換為int32型別

df[‘POSTED_SPEED_LIMIT’] = df[‘POSTED_SPEED_LIMIT’].astype(‘int32’)


4. **合併DataFrame**:使用`merge()`函式根據共同欄位將多個DataFrame合併為一個。
   ```python
# 合併三個DataFrame
merge_01_df = pd.merge(df, df2, on='CRASH_RECORD_ID')
all_data_df = pd.merge(merge_01_df, df3, on='CRASH_RECORD_ID')
  1. 移除不必要的欄位:使用drop()函式移除不需要的欄位,以簡化資料結構。

移除不必要的欄位

df = df[[‘CRASH_UNIT_ID’, ‘CRASH_ID’, ‘CRASH_DATE’, ‘VEHICLE_ID’, ‘VEHICLE_MAKE’, ‘VEHICLE_MODEL’, ‘VEHICLE_YEAR’, ‘VEHICLE_TYPE’, ‘PERSON_ID’, ‘PERSON_TYPE’, ‘PERSON_SEX’, ‘PERSON_AGE’, ‘CRASH_HOUR’, ‘CRASH_DAY_OF_WEEK’, ‘CRASH_MONTH’, ‘DATE_POLICE_NOTIFIED’]]


#### 內容解密:
上述步驟詳細說明瞭如何對交通意外資料進行清理和轉換。首先,我們移除了重複的資料列,以確保資料的唯一性。接著,我們處理了缺失值,對於數值型欄位使用平均值填充,對於類別型欄位使用眾數填充。然後,我們將欄位轉換為適當的資料型別,例如將日期欄位轉換為日期時間型別。之後,我們根據共同欄位合併了多個DataFrame,最後移除了不必要的欄位,以簡化資料結構。

### 將資料載入PostgreSQL表格

完成資料清理和轉換後,下一步是將處理好的資料載入PostgreSQL資料函式庫中,以便客戶能夠輕鬆存取和使用這些資料。我們使用`psycopg2` Python模組來實作這一步驟。

```python
import psycopg2

# 建立與PostgreSQL資料函式庫的連線
conn = psycopg2.connect(database="your_database_name", user="your_username", password="your_password", host="your_host", port="your_port")
# 建立遊標物件
cur = conn.cursor()

定義SQL插入陳述式

我們需要定義SQL插入陳述式,以便將DataFrame中的資料插入到相應的PostgreSQL表格中。

insert_query_vehicle = '''INSERT INTO chicago_dmv.Vehicle (CRASH_UNIT_ID, CRASH_ID, CRASH_DATE, VEHICLE_ID, VEHICLE_MAKE, VEHICLE_MODEL, VEHICLE_YEAR, VEHICLE_TYPE) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);'''
insert_query_person = '''INSERT INTO chicago_dmv.Person (PERSON_ID, CRASH_ID, CRASH_DATE, PERSON_TYPE, VEHICLE_ID, PERSON_SEX, PERSON_AGE) VALUES (%s, %s, %s, %s, %s, %s, %s);'''
insert_query_crash = '''INSERT INTO chicago_dmv.Crash (CRASH_UNIT_ID, CRASH_ID, PERSON_ID, VEHICLE_ID, NUM_UNITS, TOTAL_INJURIES) VALUES (%s, %s, %s, %s, %s, %s);'''

執行插入操作

遍歷合併後的DataFrame,將每一行資料插入到相應的資料函式庫表格中。

for index, row in df.iterrows():
    # 準備Vehicle表格的插入資料
    values_vehicle = (row['CRASH_UNIT_ID'], row['CRASH_ID'], row['CRASH_DATE'], row['VEHICLE_ID'], row['VEHICLE_MAKE'], row['VEHICLE_MODEL'], row['VEHICLE_YEAR'], row['VEHICLE_TYPE'])
    # 執行插入操作
    cur.execute(insert_query_vehicle, values_vehicle)
    
    # 對其他表格重複相同的過程
    
# 提交變更到資料函式庫
conn.commit()
# 關閉遊標和資料函式庫連線
cur.close()
conn.close()

內容解密:

這部分程式碼展示瞭如何將清理和轉換後的資料載入到PostgreSQL資料函式庫中。首先,我們建立了與資料函式庫的連線和遊標物件。然後,我們定義了SQL插入陳述式,用於將DataFrame中的資料插入到相應的表格中。接著,我們遍歷DataFrame,將每一行資料插入到對應的表格中。最後,我們提交了變更並關閉了資料函式庫連線。

使ETL流程可佈署

為了使ETL流程可佈署,我們需要將程式碼組織成模組化的結構,分別用於提取、轉換和載入操作。這樣可以提高ETL流程的可維護性和可擴充套件性。

專案目錄結構

project/
├── data/
│   ├── traffic_crashes.csv
│   ├── traffic_crash_vehicle.csv
│   └── traffic_crash_people.csv
├── etl/
│   ├── __init__.py
│   ├── extract.py
│   ├── transform.py
│   └── load.py
├── config.yaml
├── main.py
└── README.md

內容解密:

上述目錄結構展示瞭如何組織ETL流程的程式碼。其中,data目錄存放原始資料,etl目錄包含提取、轉換和載入操作的模組,config.yaml檔案用於存放組態引數,main.py是ETL流程的入口點,README.md檔案提供專案的說明資訊。