在現代機器學習專案中,從資料獲取、預處理、模型訓練到最終評估,往往涉及多個複雜且相互依賴的步驟。手動執行這些步驟不僅效率低下,也容易出錯。Apache Airflow 作為業界領先的工作流程管理平台,能讓我們以程式化的方式定義、排程和監控這些流程。本文將以一個經典的「糖尿病預測」分類任務為例,從零開始,詳細闡述如何使用 Airflow 將一個結合了 PySpark 和 TensorFlow 的深度學習專案,構建成一個自動化、可重複執行的工作管線 (Pipeline)。

第一部分:MLOps 流程設計與任務拆分

一個典型的機器學習專案流程,可以被拆解為一系列相互依賴的獨立任務。這種模組化的設計是使用 Airflow 的基礎。對於我們的糖尿病預測專案,我們將其拆分為三個核心任務:

  1. 資料預處理: 使用 PySpark 讀取原始 CSV 資料,進行資料清理(如過濾無效值)、特徵工程(使用 VectorAssembler)和特徵標準化(使用 StandardScaler),並將處理好的訓練集和測試集以 Parquet 格式儲存。
  2. 模型訓練: 讀取預處理好的訓練集 Parquet 檔案,使用 TensorFlow/Keras 建立一個用於二元分類的神經網路模型,進行訓練,並將訓練好的模型儲存為 H5 檔案。
  3. 模型評估: 讀取測試集 Parquet 檔案和已儲存的模型,在測試資料上評估模型效能,並輸出準確率、精確率、召回率和 F1 分數等分類指標。

這種拆分的好處在於,每個任務職責單一,並且可以獨立執行和重試。任務之間透過檔案系統(儲存 Parquet 檔案和模型檔案)來傳遞資料,實現了解耦。

caption="圖表一:糖尿病預測 MLOps 管線。此活動圖展示了從資料獲取到模型評估的完整流程,並標示了每個階段如何對應到 Airflow DAG 中的一個具體任務,以及任務之間傳遞的中間產物。"
alt="一個展示糖尿病預測 MLOps 流程的活動圖。流程包含三個任務:資料預處理(輸入 CSV,輸出 Parquet)、模型訓練(輸入 Parquet,輸出 .h5 模型檔案)和模型評估(輸入 Parquet 和 .h5,輸出評估報告)。"
@startuml
!theme _none_
skinparam dpi auto
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam minClassWidth 100
skinparam defaultFontSize 14
title 糖尿病預測 MLOps 活動圖

start
:<b>Task 1: 資料預處理</b>\n(preprocess_data);
note right
  <b>輸入</b>: diabetes.csv
  <b>輸出</b>: train.parquet, test.parquet
end note
:<b>Task 2: 模型訓練</b>\n(train_model);
note right
  <b>輸入</b>: train.parquet
  <b>輸出</b>: diabetes_model.h5
end note
:<b>Task 3: 模型評估</b>\n(evaluate_model);
note right
  <b>輸入</b>: test.parquet, diabetes_model.h5
  <b>輸出</b>: 評估報告 (準確率, F1-Score 等)
end note
stop
@enduml

第二部分:編寫 Airflow DAG

在將各階段的邏輯分別封裝成獨立的 Python 函式後(此處省略函式具體實現,重點在於 DAG 結構),我們就可以編寫一個 Airflow DAG 來將它們串聯起來。

# dags/diabetes_prediction_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 假設以下函式已在其他模組中定義並可被匯入
from tasks.preprocessing import preprocess_data
from tasks.training import train_tensorflow_model
from tasks.evaluation import evaluate_model

# --- DAG 定義 
---
with DAG(
    dag_id='diabetes_prediction_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@weekly', # 設定為每週執行一次
    catchup=False,
    tags=['ml', 'tensorflow', 'pyspark'],
) as dag:
    
    # --- 任務定義 
---
    task_preprocess = PythonOperator(
        task_id='preprocess_data',
        python_callable=preprocess_data,
        op_kwargs={
            'data_file_path': '/path/to/diabetes.csv',
            'train_output_path': '/path/to/train.parquet',
            'test_output_path': '/path/to/test.parquet',
        }
    )

    task_train = PythonOperator(
        task_id='train_model',
        python_callable=train_tensorflow_model,
        op_kwargs={
            'train_input_path': '/path/to/train.parquet',
            'model_output_path': '/path/to/diabetes_model.h5',
        }
    )

    task_evaluate = PythonOperator(
        task_id='evaluate_model',
        python_callable=evaluate_model,
        op_kwargs={
            'test_input_path': '/path/to/test.parquet',
            'model_input_path': '/path/to/diabetes_model.h5',
        }
    )

    # --- 設定任務依賴關係 
---
    task_preprocess >> task_train >> task_evaluate

程式碼解說:

  • schedule_interval='@weekly': 我們將此 DAG 設定為每週執行一次,例如用於模型的定期再訓練。
  • PythonOperator: 每個 Operator 將一個 Python 函式實例化為一個 Airflow 任務。
  • op_kwargs: 用於向任務函式傳遞檔案路徑等參數,實現任務間的資料交換。
  • >>: 這個簡潔的語法清晰地定義了任務的執行順序,確保了資料處理的正確流程。

第三部分:執行與監控

將此 DAG 檔案部署到您的 Airflow 環境後,您就可以在 Airflow UI 中:

  • 視覺化工作流程: 在「Graph View」中查看任務的依賴關係圖。
  • 監控執行狀態: 追蹤每次 DAG Run 的進度,查看每個任務的成功、失敗或運行中狀態。
  • 查看日誌: 點擊任何一個任務實例,即可查看其詳細的執行日誌,方便除錯。
  • 手動觸發與重跑: 除了自動排程,您也可以手動觸發 DAG,或對失敗的任務進行單獨重跑。

透過將深度學習專案流程化並交由 Airflow 管理,我們不僅提升了開發效率,更為模型的持續整合、持續部署 (CI/CD) 和自動化再訓練奠定了堅實的 MLOps 基礎。