在現代機器學習專案中,從資料獲取、預處理、模型訓練到最終評估,往往涉及多個複雜且相互依賴的步驟。手動執行這些步驟不僅效率低下,也容易出錯。Apache Airflow 作為業界領先的工作流程管理平台,能讓我們以程式化的方式定義、排程和監控這些流程。本文將以一個經典的「糖尿病預測」案例,從零開始,詳細闡述如何使用 Airflow 將一個深度學習專案的完整生命週期,構建成一個自動化、可重複執行的工作管線 (Pipeline)。
第一部分:為什麼需要 Airflow?
一個典型的深度學習專案包含以下階段:
- 資料預處理: 清理原始資料、處理缺失值、進行特徵工程與標準化。
- 模型訓練: 使用處理好的訓練資料集來訓練神經網路模型。
- 模型評估: 使用測試資料集來評估模型的效能。
這些階段之間存在著嚴格的依賴關係:必須先完成預處理才能進行訓練,必須先完成訓練才能進行評估。在開發過程中,任何一個階段的程式碼或參數變更,都可能需要重新執行整個流程。Airflow 讓我們能將這些階段定義為獨立的任務 (Tasks),並將它們之間的依賴關係組織成一個有向無環圖 (Directed Acyclic Graph, DAG),從而實現整個工作流程的自動化管理。
第二部分:定義工作流程的各個階段
在編寫 DAG 之前,我們首先將整個專案拆解為三個獨立的 Python 函式,每個函式對應一個核心任務。
1. 資料預處理 (preprocess_data)
此任務使用 PySpark 讀取原始 CSV 資料,進行簡單的清理和特徵工程,然後將處理好的訓練集和測試集以 Parquet 格式儲存,以供後續任務使用。
# in tasks/preprocessing.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
def preprocess_data(data_path, train_output_path, test_output_path):
spark = SparkSession.builder.appName("DiabetesPreprocessing").getOrCreate()
df = spark.read.csv(data_path, header=True, inferSchema=True)
# ... (資料清理與特徵工程) ...
assembler = VectorAssembler(inputCols=[...], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# ...
train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=42)
train_df.write.parquet(train_output_path, mode="overwrite")
test_df.write.parquet(test_output_path, mode="overwrite")
程式碼解析:
此函式實現了深度學習管線中的資料預處理階段。函式使用 PySpark 建立 Spark Session,讀取原始 CSV 資料並進行推斷型態。透過 VectorAssembler 將多個特徵欄位組合成單一特徵向量,再使用 StandardScaler 進行標準化處理,確保所有特徵都在相同的尺度範圍內。最後將資料隨機分割為訓練集(80%)和測試集(20%),並以 Parquet 格式儲存,這種列式儲存格式在大資料處理中具有更高的讀取效率。
2. 模型訓練 (train_model)
此任務讀取預處理好的訓練集 Parquet 檔案,使用 TensorFlow/Keras 建立一個簡單的神經網路,進行訓練,並將訓練好的模型儲存為 H5 檔案。
# in tasks/training.py
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
def train_model(train_input_path, model_output_path):
train_df = pd.read_parquet(train_input_path)
X_train = train_df['scaled_features'].tolist()
y_train = train_df['Outcome'].tolist()
model = Sequential([...]) # 建立模型架構
model.compile(...)
model.fit(X_train, y_train, epochs=100, verbose=0)
model.save(model_output_path)
程式碼解析:
此函式負責深度學習模型的訓練階段。函式首先使用 pandas 讀取經過預處理的 Parquet 格式訓練資料,將特徵向量和目標變數分別提取出來。接著使用 TensorFlow/Keras 建立序列模型(Sequential),通過 Dense 層構建神經網路架構。模型編譯後使用 fit 方法進行訓練,設定 100 個 epochs 並關閉詳細輸出。訓練完成後,將模型以 H5 格式儲存,這是 Keras 標準的模型儲存格式,便於後續載入和部署。
3. 模型評估 (evaluate_model)
此任務讀取測試集 Parquet 檔案和已儲存的模型,進行預測並輸出模型的準確率等評估指標。
# in tasks/evaluation.py
from tensorflow.keras.models import load_model
def evaluate_model(test_input_path, model_input_path):
test_df = pd.read_parquet(test_input_path)
X_test = test_df['scaled_features'].tolist()
y_test = test_df['Outcome'].tolist()
model = load_model(model_input_path)
loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
print(f"模型在測試集上的準確率為: {accuracy:.2f}")
程式碼解析:
此函式執行模型評估階段,用於驗證已訓練模型的效能。函式從指定路徑讀取測試集 Parquet 檔案,提取特徵向量和真實標籤。接著使用 TensorFlow/Keras 的 load_model 函式載入先前儲存的 H5 格式模型檔案。透過 model.evaluate 方法在測試集上進行評估,該方法回傳損失值和準確率等指標。最後將準確率以格式化的方式輸出,提供模型效能的量化評估結果,這是機器學習管線中驗證模型品質的關鍵步驟。
第三部分:使用 Airflow DAG 串聯流程
在將各個階段的邏輯模組化後,我們現在可以編寫一個 Airflow DAG 來將它們串聯成一個自動化的管線。
# in dags/diabetes_prediction_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 假設上面的三個函式已可被 import
from tasks.preprocessing import preprocess_data
from tasks.training import train_model
from tasks.evaluation import evaluate_model
with DAG(
dag_id='diabetes_prediction_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
tags=['ml', 'deep-learning'],
) as dag:
task_preprocess = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
op_kwargs={
'data_path': '/path/to/diabetes.csv',
'train_output_path': '/path/to/train.parquet',
'test_output_path': '/path/to/test.parquet',
}
)
task_train = PythonOperator(
task_id='train_model',
python_callable=train_model,
op_kwargs={
'train_input_path': '/path/to/train.parquet',
'model_output_path': '/path/to/diabetes_model.h5',
}
)
task_evaluate = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
op_kwargs={
'test_input_path': '/path/to/test.parquet',
'model_input_path': '/path/to/diabetes_model.h5',
}
)
# 設定任務依賴關係
task_preprocess >> task_train >> task_evaluate
程式碼解析:
此 Airflow DAG 程式碼實現了糖尿病預測的完整 MLOps 管線。DAG 定義包含三個 PythonOperator 任務,每個任務對應一個機器學習階段。PythonOperator 是 Airflow 中最常用的 Operator,允許直接執行 Python 函式作為任務。op_kwargs 參數用於向函式傳遞關鍵字參數,如檔案路徑等設定。最關鍵的是依賴關係設定 task_preprocess >> task_train >> task_evaluate,這行程式碼定義了任務的執行順序,確保資料預處理完成後才進行模型訓練,訓練完成後才進行評估,形成一個完整的自動化工作流程。
圖表一:糖尿病預測 MLOps 流程:此活動圖展示了從資料獲取到模型評估的完整流程,並標示了每個階段如何對應到 Airflow DAG 中的一個具體任務。
@startuml
!theme _none_
skinparam dpi auto
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam minClassWidth 100
skinparam defaultFontSize 14
title 糖尿病預測 MLOps 活動圖
start
:<b>Task 1: 資料預處理</b>\n(preprocess_data);
note right
- 讀取原始 CSV
- 清理與特徵工程 (PySpark)
- 儲存為 Parquet 格式
end note
:<b>Task 2: 模型訓練</b>\n(train_model);
note right
- 讀取訓練集 Parquet
- 訓練 TensorFlow/Keras 模型
- 儲存模型檔案 (.h5)
end note
:<b>Task 3: 模型評估</b>\n(evaluate_model);
note right
- 讀取測試集 Parquet
- 載入已訓練的模型
- 評估模型準確率
end note
stop
@enduml透過這種方式,我們成功地將一個複雜的深度學習專案,轉化為一個可被 Airflow 自動調度、監控和管理的標準化工作流程,極大地提升了開發與維運的效率。