在現代機器學習專案中,從資料獲取、預處理、模型訓練到最終評估,往往涉及多個複雜且相互依賴的步驟。手動執行這些步驟不僅效率低下,也容易出錯。Apache Airflow 作為業界領先的工作流程管理平台,能讓我們以程式化的方式定義、排程和監控這些流程。本文將以一個經典的「糖尿病預測」案例,從零開始,詳細闡述如何使用 Airflow 將一個深度學習專案的完整生命週期,構建成一個自動化、可重複執行的工作管線 (Pipeline)。

第一部分:為什麼需要 Airflow?

一個典型的深度學習專案包含以下階段:

  1. 資料預處理: 清理原始資料、處理缺失值、進行特徵工程與標準化。
  2. 模型訓練: 使用處理好的訓練資料集來訓練神經網路模型。
  3. 模型評估: 使用測試資料集來評估模型的效能。

這些階段之間存在著嚴格的依賴關係:必須先完成預處理才能進行訓練,必須先完成訓練才能進行評估。在開發過程中,任何一個階段的程式碼或參數變更,都可能需要重新執行整個流程。Airflow 讓我們能將這些階段定義為獨立的任務 (Tasks),並將它們之間的依賴關係組織成一個有向無環圖 (Directed Acyclic Graph, DAG),從而實現整個工作流程的自動化管理。

第二部分:定義工作流程的各個階段

在編寫 DAG 之前,我們首先將整個專案拆解為三個獨立的 Python 函式,每個函式對應一個核心任務。

1. 資料預處理 (preprocess_data)

此任務使用 PySpark 讀取原始 CSV 資料,進行簡單的清理和特徵工程,然後將處理好的訓練集和測試集以 Parquet 格式儲存,以供後續任務使用。

# in tasks/preprocessing.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler

def preprocess_data(data_path, train_output_path, test_output_path):
    spark = SparkSession.builder.appName("DiabetesPreprocessing").getOrCreate()
    df = spark.read.csv(data_path, header=True, inferSchema=True)
    
    # ... (資料清理與特徵工程) ...
    assembler = VectorAssembler(inputCols=[...], outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    # ...
    
    train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=42)
    train_df.write.parquet(train_output_path, mode="overwrite")
    test_df.write.parquet(test_output_path, mode="overwrite")

程式碼解析:

此函式實現了深度學習管線中的資料預處理階段。函式使用 PySpark 建立 Spark Session,讀取原始 CSV 資料並進行推斷型態。透過 VectorAssembler 將多個特徵欄位組合成單一特徵向量,再使用 StandardScaler 進行標準化處理,確保所有特徵都在相同的尺度範圍內。最後將資料隨機分割為訓練集(80%)和測試集(20%),並以 Parquet 格式儲存,這種列式儲存格式在大資料處理中具有更高的讀取效率。

2. 模型訓練 (train_model)

此任務讀取預處理好的訓練集 Parquet 檔案,使用 TensorFlow/Keras 建立一個簡單的神經網路,進行訓練,並將訓練好的模型儲存為 H5 檔案。

# in tasks/training.py
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

def train_model(train_input_path, model_output_path):
    train_df = pd.read_parquet(train_input_path)
    X_train = train_df['scaled_features'].tolist()
    y_train = train_df['Outcome'].tolist()

    model = Sequential([...]) # 建立模型架構
    model.compile(...)
    model.fit(X_train, y_train, epochs=100, verbose=0)
    
    model.save(model_output_path)

程式碼解析:

此函式負責深度學習模型的訓練階段。函式首先使用 pandas 讀取經過預處理的 Parquet 格式訓練資料,將特徵向量和目標變數分別提取出來。接著使用 TensorFlow/Keras 建立序列模型(Sequential),通過 Dense 層構建神經網路架構。模型編譯後使用 fit 方法進行訓練,設定 100 個 epochs 並關閉詳細輸出。訓練完成後,將模型以 H5 格式儲存,這是 Keras 標準的模型儲存格式,便於後續載入和部署。

3. 模型評估 (evaluate_model)

此任務讀取測試集 Parquet 檔案和已儲存的模型,進行預測並輸出模型的準確率等評估指標。

# in tasks/evaluation.py
from tensorflow.keras.models import load_model

def evaluate_model(test_input_path, model_input_path):
    test_df = pd.read_parquet(test_input_path)
    X_test = test_df['scaled_features'].tolist()
    y_test = test_df['Outcome'].tolist()
    
    model = load_model(model_input_path)
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"模型在測試集上的準確率為: {accuracy:.2f}")

程式碼解析:

此函式執行模型評估階段,用於驗證已訓練模型的效能。函式從指定路徑讀取測試集 Parquet 檔案,提取特徵向量和真實標籤。接著使用 TensorFlow/Keras 的 load_model 函式載入先前儲存的 H5 格式模型檔案。透過 model.evaluate 方法在測試集上進行評估,該方法回傳損失值和準確率等指標。最後將準確率以格式化的方式輸出,提供模型效能的量化評估結果,這是機器學習管線中驗證模型品質的關鍵步驟。

第三部分:使用 Airflow DAG 串聯流程

在將各個階段的邏輯模組化後,我們現在可以編寫一個 Airflow DAG 來將它們串聯成一個自動化的管線。

# in dags/diabetes_prediction_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 假設上面的三個函式已可被 import
from tasks.preprocessing import preprocess_data
from tasks.training import train_model
from tasks.evaluation import evaluate_model

with DAG(
    dag_id='diabetes_prediction_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['ml', 'deep-learning'],
) as dag:
    
    task_preprocess = PythonOperator(
        task_id='preprocess_data',
        python_callable=preprocess_data,
        op_kwargs={
            'data_path': '/path/to/diabetes.csv',
            'train_output_path': '/path/to/train.parquet',
            'test_output_path': '/path/to/test.parquet',
        }
    )

    task_train = PythonOperator(
        task_id='train_model',
        python_callable=train_model,
        op_kwargs={
            'train_input_path': '/path/to/train.parquet',
            'model_output_path': '/path/to/diabetes_model.h5',
        }
    )

    task_evaluate = PythonOperator(
        task_id='evaluate_model',
        python_callable=evaluate_model,
        op_kwargs={
            'test_input_path': '/path/to/test.parquet',
            'model_input_path': '/path/to/diabetes_model.h5',
        }
    )

    # 設定任務依賴關係
    task_preprocess >> task_train >> task_evaluate

程式碼解析:

此 Airflow DAG 程式碼實現了糖尿病預測的完整 MLOps 管線。DAG 定義包含三個 PythonOperator 任務,每個任務對應一個機器學習階段。PythonOperator 是 Airflow 中最常用的 Operator,允許直接執行 Python 函式作為任務。op_kwargs 參數用於向函式傳遞關鍵字參數,如檔案路徑等設定。最關鍵的是依賴關係設定 task_preprocess >> task_train >> task_evaluate,這行程式碼定義了任務的執行順序,確保資料預處理完成後才進行模型訓練,訓練完成後才進行評估,形成一個完整的自動化工作流程。

圖表一:糖尿病預測 MLOps 流程:此活動圖展示了從資料獲取到模型評估的完整流程,並標示了每個階段如何對應到 Airflow DAG 中的一個具體任務。

@startuml
!theme _none_
skinparam dpi auto
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam minClassWidth 100
skinparam defaultFontSize 14
title 糖尿病預測 MLOps 活動圖

start
:<b>Task 1: 資料預處理</b>\n(preprocess_data);
note right
  - 讀取原始 CSV
  - 清理與特徵工程 (PySpark)
  - 儲存為 Parquet 格式
end note
:<b>Task 2: 模型訓練</b>\n(train_model);
note right
  - 讀取訓練集 Parquet
  - 訓練 TensorFlow/Keras 模型
  - 儲存模型檔案 (.h5)
end note
:<b>Task 3: 模型評估</b>\n(evaluate_model);
note right
  - 讀取測試集 Parquet
  - 載入已訓練的模型
  - 評估模型準確率
end note
stop
@enduml

透過這種方式,我們成功地將一個複雜的深度學習專案,轉化為一個可被 Airflow 自動調度、監控和管理的標準化工作流程,極大地提升了開發與維運的效率。