將深度學習模型從實驗階段推向生產環境,需要一個穩健、可重複且自動化的工作流程。Apache Airflow 作為業界領先的工作流程管理平台,正是實現此目標的關鍵工具。本文將以一個「Tesla 股價預測」的實際案例,從零開始,詳細闡述如何將一個深度學習專案的完整生命週期,構建成一個由 Airflow 調度的、模組化的自動化管線 (Pipeline)。
第一部分:MLOps 流程設計與任務拆分
一個典型的機器學習專案流程,可以被拆解為一系列相互依賴的獨立任務。這種模組化的設計是使用 Airflow 的基礎。對於我們的股價預測專案,我們將其拆分為三個核心任務:
- 資料預處理: 讀取原始股價 CSV 檔案,使用 PySpark 進行資料清理、特徵工程(如計算移動平均線),並將處理好的訓練集與測試集儲存為 Parquet 檔案。
- 模型訓練: 讀取預處理好的訓練集,使用 PyTorch 建立一個神經網路模型,進行訓練,並將訓練好的模型權重 (
state_dict) 儲存到檔案。 - 模型評估: 讀取測試集和已儲存的模型權重,在測試資料上評估模型效能,並輸出 R-squared 等指標。
這種拆分的好處在於,每個任務職責單一,並且可以獨立執行和重試。任務之間透過檔案系統(儲存 Parquet 檔案和模型檔案)來傳遞資料,實現了解耦。
caption="圖表一:Tesla 股價預測 MLOps 管線。此活動圖展示了從資料獲取到模型評估的完整流程,並標示了每個階段如何對應到 Airflow DAG 中的一個具體任務,以及任務之間傳遞的中間產物。"
alt="一個展示 Tesla 股價預測 MLOps 流程的活動圖。流程包含三個任務:資料預處理(輸入 CSV,輸出 Parquet)、模型訓練(輸入 Parquet,輸出 .pth 模型檔案)和模型評估(輸入 Parquet 和 .pth,輸出評估報告)。"
@startuml
!theme _none_
skinparam dpi auto
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam minClassWidth 100
skinparam defaultFontSize 14
title Tesla 股價預測 MLOps 活動圖
start
:<b>Task 1: 資料預處理</b>\n(preprocess_data);
note right
<b>輸入</b>: TSLA_stock.csv
<b>輸出</b>: train_data.parquet, test_data.parquet
end note
:<b>Task 2: 模型訓練</b>\n(train_model);
note right
<b>輸入</b>: train_data.parquet
<b>輸出</b>: trained_model.pth
end note
:<b>Task 3: 模型評估</b>\n(evaluate_model);
note right
<b>輸入</b>: test_data.parquet, trained_model.pth
<b>輸出</b>: 評估報告 (R-squared)
end note
stop
@enduml第二部分:編寫 Airflow DAG
在將各階段的邏輯分別封裝成獨立的 Python 函式後,我們就可以編寫一個 Airflow DAG 來將它們串聯起來。
# dags/tesla_prediction_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 假設以下函式已在其他模組中定義並可被匯入
from tasks.data_processing import preprocess_and_save_data
from tasks.model_training import train_and_save_model
from tasks.model_evaluation import evaluate_saved_model
# --- DAG 定義
---
with DAG(
dag_id='tesla_stock_prediction_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily', # 設定為每日執行
catchup=False,
tags=['ml', 'stock-prediction'],
) as dag:
# --- 任務定義
---
task_preprocess = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_and_save_data,
op_kwargs={
'input_path': '/path/to/TSLA_stock.csv',
'train_output_path': '/path/to/train.parquet',
'test_output_path': '/path/to/test.parquet',
}
)
task_train = PythonOperator(
task_id='train_model',
python_callable=train_and_save_model,
op_kwargs={
'train_input_path': '/path/to/train.parquet',
'model_output_path': '/path/to/tesla_model.pth',
}
)
task_evaluate = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_saved_model,
op_kwargs={
'test_input_path': '/path/to/test.parquet',
'model_input_path': '/path/to/tesla_model.pth',
}
)
# --- 設定任務依賴關係
---
task_preprocess >> task_train >> task_evaluate
程式碼解說:
schedule_interval='@daily': 我們將此 DAG 設定為每日執行一次,Airflow 會自動處理排程。PythonOperator: 每個 Operator 實例化一個任務,python_callable指向我們之前定義的模組化函式。op_kwargs: 用於向任務函式傳遞檔案路徑等參數,實現任務間的資料交換。>>: 這個簡潔的語法清晰地定義了任務的執行順序,確保了資料處理的正確流程。
第三部分:執行與監控
將此 DAG 檔案部署到您的 Airflow 環境後,您就可以在 Airflow UI 中:
- 視覺化工作流程: 在「Graph View」中查看任務的依賴關係圖。
- 監控執行狀態: 追蹤每次 DAG Run 的進度,查看每個任務的成功、失敗或運行中狀態。
- 查看日誌: 點擊任何一個任務實例,即可查看其詳細的執行日誌,方便除錯。
- 手動觸發與重跑: 除了自動排程,您也可以手動觸發 DAG,或對失敗的任務進行單獨重跑。
透過將深度學習專案流程化並交由 Airflow 管理,我們不僅提升了開發效率,更為模型的持續整合、持續部署 (CI/CD) 和自動化再訓練奠定了堅實的 MLOps 基礎。