將深度學習模型從實驗階段推向生產環境,需要一個穩健、可重複且自動化的工作流程。Apache Airflow 作為業界領先的工作流程管理平台,正是實現此目標的關鍵工具。本文將以一個「Tesla 股價預測」的實際案例,從零開始,詳細闡述如何將一個深度學習專案的完整生命週期,構建成一個由 Airflow 調度的、模組化的自動化管線 (Pipeline)。

第一部分:MLOps 流程設計與任務拆分

一個典型的機器學習專案流程,可以被拆解為一系列相互依賴的獨立任務。這種模組化的設計是使用 Airflow 的基礎。對於我們的股價預測專案,我們將其拆分為三個核心任務:

  1. 資料預處理: 讀取原始股價 CSV 檔案,使用 PySpark 進行資料清理、特徵工程(如計算移動平均線),並將處理好的訓練集與測試集儲存為 Parquet 檔案。
  2. 模型訓練: 讀取預處理好的訓練集,使用 PyTorch 建立一個神經網路模型,進行訓練,並將訓練好的模型權重 (state_dict) 儲存到檔案。
  3. 模型評估: 讀取測試集和已儲存的模型權重,在測試資料上評估模型效能,並輸出 R-squared 等指標。

這種拆分的好處在於,每個任務職責單一,並且可以獨立執行和重試。任務之間透過檔案系統(儲存 Parquet 檔案和模型檔案)來傳遞資料,實現了解耦。

caption="圖表一:Tesla 股價預測 MLOps 管線。此活動圖展示了從資料獲取到模型評估的完整流程,並標示了每個階段如何對應到 Airflow DAG 中的一個具體任務,以及任務之間傳遞的中間產物。"
alt="一個展示 Tesla 股價預測 MLOps 流程的活動圖。流程包含三個任務:資料預處理(輸入 CSV,輸出 Parquet)、模型訓練(輸入 Parquet,輸出 .pth 模型檔案)和模型評估(輸入 Parquet 和 .pth,輸出評估報告)。"
@startuml
!theme _none_
skinparam dpi auto
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam minClassWidth 100
skinparam defaultFontSize 14
title Tesla 股價預測 MLOps 活動圖

start
:<b>Task 1: 資料預處理</b>\n(preprocess_data);
note right
  <b>輸入</b>: TSLA_stock.csv
  <b>輸出</b>: train_data.parquet, test_data.parquet
end note
:<b>Task 2: 模型訓練</b>\n(train_model);
note right
  <b>輸入</b>: train_data.parquet
  <b>輸出</b>: trained_model.pth
end note
:<b>Task 3: 模型評估</b>\n(evaluate_model);
note right
  <b>輸入</b>: test_data.parquet, trained_model.pth
  <b>輸出</b>: 評估報告 (R-squared)
end note
stop
@enduml

第二部分:編寫 Airflow DAG

在將各階段的邏輯分別封裝成獨立的 Python 函式後,我們就可以編寫一個 Airflow DAG 來將它們串聯起來。

# dags/tesla_prediction_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 假設以下函式已在其他模組中定義並可被匯入
from tasks.data_processing import preprocess_and_save_data
from tasks.model_training import train_and_save_model
from tasks.model_evaluation import evaluate_saved_model

# --- DAG 定義 
---
with DAG(
    dag_id='tesla_stock_prediction_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily', # 設定為每日執行
    catchup=False,
    tags=['ml', 'stock-prediction'],
) as dag:
    
    # --- 任務定義 
---
    task_preprocess = PythonOperator(
        task_id='preprocess_data',
        python_callable=preprocess_and_save_data,
        op_kwargs={
            'input_path': '/path/to/TSLA_stock.csv',
            'train_output_path': '/path/to/train.parquet',
            'test_output_path': '/path/to/test.parquet',
        }
    )

    task_train = PythonOperator(
        task_id='train_model',
        python_callable=train_and_save_model,
        op_kwargs={
            'train_input_path': '/path/to/train.parquet',
            'model_output_path': '/path/to/tesla_model.pth',
        }
    )

    task_evaluate = PythonOperator(
        task_id='evaluate_model',
        python_callable=evaluate_saved_model,
        op_kwargs={
            'test_input_path': '/path/to/test.parquet',
            'model_input_path': '/path/to/tesla_model.pth',
        }
    )

    # --- 設定任務依賴關係 
---
    task_preprocess >> task_train >> task_evaluate

程式碼解說:

  • schedule_interval='@daily': 我們將此 DAG 設定為每日執行一次,Airflow 會自動處理排程。
  • PythonOperator: 每個 Operator 實例化一個任務,python_callable 指向我們之前定義的模組化函式。
  • op_kwargs: 用於向任務函式傳遞檔案路徑等參數,實現任務間的資料交換。
  • >>: 這個簡潔的語法清晰地定義了任務的執行順序,確保了資料處理的正確流程。

第三部分:執行與監控

將此 DAG 檔案部署到您的 Airflow 環境後,您就可以在 Airflow UI 中:

  • 視覺化工作流程: 在「Graph View」中查看任務的依賴關係圖。
  • 監控執行狀態: 追蹤每次 DAG Run 的進度,查看每個任務的成功、失敗或運行中狀態。
  • 查看日誌: 點擊任何一個任務實例,即可查看其詳細的執行日誌,方便除錯。
  • 手動觸發與重跑: 除了自動排程,您也可以手動觸發 DAG,或對失敗的任務進行單獨重跑。

透過將深度學習專案流程化並交由 Airflow 管理,我們不僅提升了開發效率,更為模型的持續整合、持續部署 (CI/CD) 和自動化再訓練奠定了堅實的 MLOps 基礎。