機器學習模型在部署到生產環境後,其表現往往會隨著時間推移而逐漸衰退。這種現象並非偶然,而是由多種因素共同造成的結果。輸入資料的分佈可能會改變,使用者行為可能會演變,甚至是商業環境的變化都可能影響模型的預測準確性。因此,建立一套完善的模型監控系統,不僅是維護模型效能的必要手段,更是確保機器學習專案長期成功的關鍵因素。

本文將深入探討機器學習模型監控系統的設計原則與實作方法,從 Data Drift 偵測到 Concept Drift 處理,從模型效能追蹤到 A/B Testing 框架,提供一套完整的解決方案。我們將透過實際的 Python 程式碼範例,展示如何建立一個可靠且可擴展的監控系統。

監控系統架構設計

在設計監控系統之前,我們必須先理解監控的核心目標。監控系統的主要職責是持續追蹤模型的健康狀態,及時發現潛在問題,並在問題惡化之前發出警報。一個完善的監控系統應該涵蓋四個層面:軟體健康、資料健康、模型健康以及商業指標。

軟體健康監控關注的是系統的基礎設施層面,包括服務的可用性、延遲時間、錯誤率等。資料健康監控則專注於輸入資料的品質,確保資料符合預期的格式和分佈。模型健康監控追蹤模型的預測效能,偵測可能的效能衰退。商業指標監控則將技術指標與商業目標連結,確保模型的表現能夠轉化為實際的商業價值。

以下是監控系統的整體架構圖:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "資料來源" as source {
    rectangle "生產環境資料" as prod_data
    rectangle "模型預測結果" as predictions
    rectangle "真實標籤" as labels
}

rectangle "監控核心" as core {
    rectangle "Data Drift 偵測器" as drift_detector
    rectangle "Concept Drift 偵測器" as concept_detector
    rectangle "效能追蹤器" as performance_tracker
    rectangle "異常偵測器" as anomaly_detector
}

rectangle "警報系統" as alert {
    rectangle "閾值管理" as threshold
    rectangle "通知服務" as notification
}

rectangle "視覺化" as viz {
    rectangle "監控儀表板" as dashboard
    rectangle "報表生成" as report
}

prod_data --> drift_detector
predictions --> performance_tracker
labels --> performance_tracker
predictions --> concept_detector
labels --> concept_detector
prod_data --> anomaly_detector

drift_detector --> threshold
concept_detector --> threshold
performance_tracker --> threshold
anomaly_detector --> threshold

threshold --> notification
drift_detector --> dashboard
concept_detector --> dashboard
performance_tracker --> dashboard
anomaly_detector --> dashboard
dashboard --> report

@enduml

這個架構圖展示了監控系統的資料流向。生產環境中的資料首先進入各個偵測器進行分析,偵測結果會傳送到警報系統進行閾值判斷,同時也會輸出到視覺化層供團隊查看和分析。

Data Drift 偵測機制

Data Drift 是指模型輸入資料的統計分佈隨時間發生變化的現象。當訓練資料與生產環境資料的分佈出現顯著差異時,模型的預測效能通常會受到影響。偵測 Data Drift 的關鍵在於選擇適當的統計方法來量化分佈差異。

常用的 Data Drift 偵測方法包括 Kolmogorov-Smirnov 檢定、Population Stability Index 以及 Wasserstein Distance。Kolmogorov-Smirnov 檢定是一種無母數檢定方法,用於比較兩個樣本是否來自相同的分佈。Population Stability Index 則是金融業常用的指標,用於衡量分佈變化的程度。Wasserstein Distance 又稱為 Earth Mover’s Distance,能夠捕捉分佈之間的細微差異。

以下是一個完整的 DriftDetector 類別實作:

# 匯入必要的套件
# numpy 用於數值計算
# scipy.stats 提供統計檢定功能
# typing 用於型別註解
import numpy as np
from scipy import stats
from scipy.spatial.distance import jensenshannon
from typing import Dict, List, Tuple, Optional
import pandas as pd
from dataclasses import dataclass
from datetime import datetime

# 定義資料類別來儲存漂移偵測結果
# 這個類別包含了所有偵測方法的結果以及判斷資訊
@dataclass
class DriftResult:
    # 特徵名稱
    feature_name: str
    # Kolmogorov-Smirnov 檢定的統計量
    ks_statistic: float
    # Kolmogorov-Smirnov 檢定的 p 值
    ks_pvalue: float
    # Population Stability Index 值
    psi_value: float
    # Wasserstein 距離
    wasserstein_distance: float
    # 是否偵測到漂移
    drift_detected: bool
    # 偵測時間戳記
    detection_time: datetime
    # 嚴重程度等級
    severity: str

class DriftDetector:
    """
    Data Drift 偵測器類別

    這個類別實作了多種統計方法來偵測資料分佈的變化,
    包括 KS 檢定、PSI 計算和 Wasserstein 距離。
    透過綜合多種方法的結果,可以更準確地判斷是否發生了 Data Drift。
    """

    def __init__(
        self,
        ks_threshold: float = 0.05,
        psi_threshold: float = 0.2,
        wasserstein_threshold: float = 0.1
    ):
        """
        初始化 DriftDetector

        參數說明:
        - ks_threshold: KS 檢定的顯著性水準,預設為 0.05
        - psi_threshold: PSI 的警告閾值,預設為 0.2
        - wasserstein_threshold: Wasserstein 距離的閾值,預設為 0.1
        """
        # 儲存各個檢定方法的閾值
        self.ks_threshold = ks_threshold
        self.psi_threshold = psi_threshold
        self.wasserstein_threshold = wasserstein_threshold
        # 用於儲存參考分佈的字典
        self.reference_distributions: Dict[str, np.ndarray] = {}
        # 用於儲存偵測歷史記錄的列表
        self.detection_history: List[DriftResult] = []

    def set_reference(self, feature_name: str, data: np.ndarray) -> None:
        """
        設定特徵的參考分佈

        這個方法用於儲存訓練資料的分佈作為基準,
        後續的生產資料將與這個基準進行比較。

        參數說明:
        - feature_name: 特徵名稱
        - data: 參考資料的 numpy 陣列
        """
        # 將參考資料儲存到字典中
        self.reference_distributions[feature_name] = np.array(data)

    def _calculate_ks_test(
        self,
        reference: np.ndarray,
        current: np.ndarray
    ) -> Tuple[float, float]:
        """
        執行 Kolmogorov-Smirnov 檢定

        KS 檢定是一種無母數檢定方法,用於判斷兩個樣本
        是否來自相同的連續分佈。統計量代表兩個累積分佈
        函數之間的最大距離。

        參數說明:
        - reference: 參考分佈的資料
        - current: 當前分佈的資料

        回傳值:
        - 統計量和 p 值的元組
        """
        # 使用 scipy 的 ks_2samp 函數執行雙樣本 KS 檢定
        statistic, pvalue = stats.ks_2samp(reference, current)
        return statistic, pvalue

    def _calculate_psi(
        self,
        reference: np.ndarray,
        current: np.ndarray,
        bins: int = 10
    ) -> float:
        """
        計算 Population Stability Index

        PSI 是衡量兩個分佈差異程度的指標,常用於金融風險模型。
        計算方式是將資料分成多個區間,然後計算各區間比例的變化。

        一般而言:
        - PSI < 0.1:分佈變化不顯著
        - 0.1 <= PSI < 0.2:分佈有中等程度變化
        - PSI >= 0.2:分佈變化顯著

        參數說明:
        - reference: 參考分佈的資料
        - current: 當前分佈的資料
        - bins: 分箱數量,預設為 10

        回傳值:
        - PSI 值
        """
        # 計算參考分佈的分位數作為分箱邊界
        # 使用分位數而非等距分箱可以避免空箱問題
        percentiles = np.percentile(reference, np.linspace(0, 100, bins + 1))

        # 計算參考分佈在各區間的比例
        reference_counts, _ = np.histogram(reference, bins=percentiles)
        reference_percents = reference_counts / len(reference)

        # 計算當前分佈在各區間的比例
        current_counts, _ = np.histogram(current, bins=percentiles)
        current_percents = current_counts / len(current)

        # 為了避免除以零,將零值替換為極小值
        reference_percents = np.where(
            reference_percents == 0,
            0.0001,
            reference_percents
        )
        current_percents = np.where(
            current_percents == 0,
            0.0001,
            current_percents
        )

        # 計算 PSI
        # PSI = sum((current% - reference%) * ln(current% / reference%))
        psi = np.sum(
            (current_percents - reference_percents) *
            np.log(current_percents / reference_percents)
        )

        return psi

    def _calculate_wasserstein(
        self,
        reference: np.ndarray,
        current: np.ndarray
    ) -> float:
        """
        計算 Wasserstein 距離

        Wasserstein 距離又稱為 Earth Mover's Distance,
        可以理解為將一個分佈轉換為另一個分佈所需的最小「工作量」。
        這個指標對於分佈的細微變化特別敏感。

        參數說明:
        - reference: 參考分佈的資料
        - current: 當前分佈的資料

        回傳值:
        - Wasserstein 距離
        """
        # 使用 scipy 的 wasserstein_distance 函數計算
        distance = stats.wasserstein_distance(reference, current)
        return distance

    def detect_drift(
        self,
        feature_name: str,
        current_data: np.ndarray
    ) -> DriftResult:
        """
        執行完整的 Data Drift 偵測

        這個方法會綜合運用多種統計方法來判斷是否發生 Data Drift,
        並根據結果計算嚴重程度等級。

        參數說明:
        - feature_name: 要偵測的特徵名稱
        - current_data: 當前的生產資料

        回傳值:
        - DriftResult 物件,包含完整的偵測結果
        """
        # 檢查是否已設定參考分佈
        if feature_name not in self.reference_distributions:
            raise ValueError(
                f"未找到特徵 '{feature_name}' 的參考分佈,"
                f"請先呼叫 set_reference 方法設定參考資料"
            )

        # 取得參考分佈
        reference = self.reference_distributions[feature_name]
        current = np.array(current_data)

        # 執行各種偵測方法
        ks_stat, ks_pvalue = self._calculate_ks_test(reference, current)
        psi_value = self._calculate_psi(reference, current)
        wasserstein_dist = self._calculate_wasserstein(reference, current)

        # 判斷是否偵測到漂移
        # 當任一指標超過閾值時,判定為發生漂移
        drift_detected = (
            ks_pvalue < self.ks_threshold or
            psi_value > self.psi_threshold or
            wasserstein_dist > self.wasserstein_threshold
        )

        # 計算嚴重程度等級
        # 根據超過閾值的指標數量來判定
        severity_score = sum([
            ks_pvalue < self.ks_threshold,
            psi_value > self.psi_threshold,
            wasserstein_dist > self.wasserstein_threshold
        ])

        if severity_score == 0:
            severity = "正常"
        elif severity_score == 1:
            severity = "輕微"
        elif severity_score == 2:
            severity = "中等"
        else:
            severity = "嚴重"

        # 建立偵測結果物件
        result = DriftResult(
            feature_name=feature_name,
            ks_statistic=ks_stat,
            ks_pvalue=ks_pvalue,
            psi_value=psi_value,
            wasserstein_distance=wasserstein_dist,
            drift_detected=drift_detected,
            detection_time=datetime.now(),
            severity=severity
        )

        # 將結果加入歷史記錄
        self.detection_history.append(result)

        return result

    def detect_all_features(
        self,
        current_data: pd.DataFrame
    ) -> Dict[str, DriftResult]:
        """
        對所有已設定參考分佈的特徵執行漂移偵測

        這個方法會遍歷所有已註冊的特徵,並逐一執行偵測。
        適合用於批次處理多個特徵的情況。

        參數說明:
        - current_data: 包含所有特徵的 DataFrame

        回傳值:
        - 字典,鍵為特徵名稱,值為對應的 DriftResult
        """
        results = {}

        # 遍歷所有已設定的參考分佈
        for feature_name in self.reference_distributions.keys():
            if feature_name in current_data.columns:
                # 執行偵測並儲存結果
                result = self.detect_drift(
                    feature_name,
                    current_data[feature_name].values
                )
                results[feature_name] = result

        return results

    def get_drift_summary(self) -> pd.DataFrame:
        """
        產生漂移偵測摘要報告

        這個方法會將所有偵測歷史整理成 DataFrame 格式,
        方便進行分析和視覺化。

        回傳值:
        - 包含所有偵測結果的 DataFrame
        """
        # 將偵測歷史轉換為字典列表
        records = []
        for result in self.detection_history:
            records.append({
                "特徵名稱": result.feature_name,
                "KS統計量": result.ks_statistic,
                "KS_p值": result.ks_pvalue,
                "PSI值": result.psi_value,
                "Wasserstein距離": result.wasserstein_distance,
                "偵測到漂移": result.drift_detected,
                "嚴重程度": result.severity,
                "偵測時間": result.detection_time
            })

        return pd.DataFrame(records)

這個 DriftDetector 類別提供了完整的 Data Drift 偵測功能。它綜合運用了三種統計方法,能夠從不同角度檢測分佈變化。透過設定適當的閾值,團隊可以根據業務需求調整偵測的敏感度。

Concept Drift 處理策略

Concept Drift 與 Data Drift 不同,它指的是輸入特徵與目標變數之間的關係發生了變化。即使輸入資料的分佈保持穩定,模型的預測效能仍可能因為 Concept Drift 而下降。這種情況在動態變化的環境中特別常見,例如使用者偏好的改變、市場趨勢的轉變等。

偵測 Concept Drift 需要觀察模型預測效能的變化趨勢。常用的方法包括 Page-Hinkley Test、ADWIN 演算法以及基於滑動視窗的效能監控。以下是 Concept Drift 偵測器的實作:

# 匯入必要的套件
from collections import deque
from typing import Optional, Tuple
import numpy as np
from dataclasses import dataclass
from datetime import datetime

# 定義 Concept Drift 偵測結果的資料類別
@dataclass
class ConceptDriftResult:
    # 是否偵測到漂移
    drift_detected: bool
    # 當前的累積統計量
    cumulative_sum: float
    # 偵測閾值
    threshold: float
    # 最近一次效能指標
    recent_performance: float
    # 偵測時間戳記
    detection_time: datetime
    # 漂移類型
    drift_type: str

class ConceptDriftDetector:
    """
    Concept Drift 偵測器類別

    這個類別實作了 Page-Hinkley Test 演算法來偵測 Concept Drift。
    Page-Hinkley Test 是一種序列分析方法,能夠偵測資料流中的
    突變點或漸進式變化。
    """

    def __init__(
        self,
        delta: float = 0.005,
        lambda_threshold: float = 50.0,
        alpha: float = 0.9999,
        window_size: int = 100
    ):
        """
        初始化 ConceptDriftDetector

        參數說明:
        - delta: 容許的變化幅度,用於調整敏感度
        - lambda_threshold: 偵測閾值,超過此值判定為漂移
        - alpha: 遺忘因子,用於加權歷史資料
        - window_size: 滑動視窗大小,用於計算移動平均
        """
        self.delta = delta
        self.lambda_threshold = lambda_threshold
        self.alpha = alpha
        self.window_size = window_size

        # 初始化內部狀態變數
        # 累積和用於追蹤效能變化
        self.cumulative_sum = 0.0
        # 最小累積和用於計算偵測統計量
        self.minimum_sum = 0.0
        # 樣本數量計數
        self.sample_count = 0
        # 平均值用於追蹤效能基準
        self.mean = 0.0

        # 使用 deque 實作固定大小的滑動視窗
        self.performance_window = deque(maxlen=window_size)
        # 儲存偵測歷史
        self.detection_history: list = []

    def _update_mean(self, value: float) -> None:
        """
        更新累積平均值

        使用增量式計算方法來更新平均值,
        避免儲存所有歷史資料。

        參數說明:
        - value: 新的效能指標值
        """
        self.sample_count += 1
        # 使用增量式公式更新平均值
        # mean = mean + (value - mean) / count
        self.mean = self.mean + (value - self.mean) / self.sample_count

    def update(self, performance_metric: float) -> ConceptDriftResult:
        """
        更新偵測器狀態並檢查是否發生 Concept Drift

        每次收到新的效能指標時呼叫此方法。
        它會更新內部狀態並判斷是否偵測到漂移。

        參數說明:
        - performance_metric: 最新的效能指標值(如準確率、誤差等)

        回傳值:
        - ConceptDriftResult 物件,包含偵測結果
        """
        # 將新值加入滑動視窗
        self.performance_window.append(performance_metric)

        # 更新平均值
        self._update_mean(performance_metric)

        # 計算與平均值的偏差
        deviation = performance_metric - self.mean - self.delta

        # 更新累積和
        self.cumulative_sum = self.cumulative_sum + deviation

        # 更新最小累積和
        self.minimum_sum = min(self.minimum_sum, self.cumulative_sum)

        # 計算 Page-Hinkley 統計量
        ph_statistic = self.cumulative_sum - self.minimum_sum

        # 判斷是否偵測到漂移
        drift_detected = ph_statistic > self.lambda_threshold

        # 判斷漂移類型
        if drift_detected:
            if len(self.performance_window) >= 2:
                # 計算最近效能的變化趨勢
                recent_trend = np.mean(
                    list(self.performance_window)[-10:]
                ) - np.mean(
                    list(self.performance_window)[:10]
                )
                drift_type = "效能下降" if recent_trend < 0 else "效能提升"
            else:
                drift_type = "未知"
        else:
            drift_type = "無漂移"

        # 建立偵測結果
        result = ConceptDriftResult(
            drift_detected=drift_detected,
            cumulative_sum=ph_statistic,
            threshold=self.lambda_threshold,
            recent_performance=performance_metric,
            detection_time=datetime.now(),
            drift_type=drift_type
        )

        # 儲存偵測結果
        self.detection_history.append(result)

        # 如果偵測到漂移,重置狀態
        if drift_detected:
            self.reset()

        return result

    def reset(self) -> None:
        """
        重置偵測器狀態

        在偵測到漂移後呼叫此方法,
        以便開始新一輪的偵測。
        """
        self.cumulative_sum = 0.0
        self.minimum_sum = 0.0
        self.sample_count = 0
        self.mean = 0.0

    def get_window_statistics(self) -> dict:
        """
        取得當前滑動視窗的統計資訊

        回傳值:
        - 包含視窗統計資訊的字典
        """
        if len(self.performance_window) == 0:
            return {
                "平均值": None,
                "標準差": None,
                "最小值": None,
                "最大值": None,
                "樣本數": 0
            }

        window_array = np.array(self.performance_window)

        return {
            "平均值": np.mean(window_array),
            "標準差": np.std(window_array),
            "最小值": np.min(window_array),
            "最大值": np.max(window_array),
            "樣本數": len(window_array)
        }

這個 ConceptDriftDetector 類別實作了 Page-Hinkley Test 演算法,能夠偵測效能指標的顯著變化。當模型效能出現持續性的下降或上升趨勢時,偵測器會發出警報。

模型效能追蹤系統

模型效能追蹤是監控系統的核心功能之一。它需要持續記錄模型的預測結果與真實標籤,計算各種效能指標,並分析效能變化趨勢。一個完善的效能追蹤系統應該能夠支援多種指標類型,包括分類指標和迴歸指標,同時也要能夠追蹤特定子群體的效能表現。

以下是模型監控器的完整實作:

# 匯入必要的套件
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Union
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    mean_squared_error,
    mean_absolute_error,
    r2_score
)
import json

# 定義效能記錄的資料類別
@dataclass
class PerformanceRecord:
    # 記錄時間戳記
    timestamp: datetime
    # 模型版本
    model_version: str
    # 效能指標字典
    metrics: Dict[str, float]
    # 樣本數量
    sample_size: int
    # 子群體標籤(選填)
    segment: Optional[str] = None
    # 額外的元資料
    metadata: Dict = field(default_factory=dict)

class ModelMonitor:
    """
    模型效能監控器類別

    這個類別提供完整的模型效能追蹤功能,
    支援分類和迴歸任務的多種效能指標,
    並能夠追蹤特定子群體的效能表現。
    """

    def __init__(
        self,
        model_name: str,
        model_version: str,
        task_type: str = "classification"
    ):
        """
        初始化 ModelMonitor

        參數說明:
        - model_name: 模型名稱
        - model_version: 模型版本號
        - task_type: 任務類型,可以是 "classification" 或 "regression"
        """
        self.model_name = model_name
        self.model_version = model_version
        self.task_type = task_type

        # 儲存效能記錄的列表
        self.performance_history: List[PerformanceRecord] = []
        # 儲存預測記錄的列表
        self.prediction_logs: List[Dict] = []
        # 效能警報閾值
        self.alert_thresholds: Dict[str, float] = {}
        # 基準效能(用於比較)
        self.baseline_metrics: Optional[Dict[str, float]] = None

    def set_baseline(self, metrics: Dict[str, float]) -> None:
        """
        設定基準效能指標

        基準效能用於比較當前效能是否有顯著下降。
        通常使用模型上線前的測試效能作為基準。

        參數說明:
        - metrics: 基準效能指標字典
        """
        self.baseline_metrics = metrics

    def set_alert_threshold(self, metric_name: str, threshold: float) -> None:
        """
        設定效能警報閾值

        當效能指標低於閾值時,系統會發出警報。

        參數說明:
        - metric_name: 指標名稱
        - threshold: 閾值
        """
        self.alert_thresholds[metric_name] = threshold

    def log_prediction(
        self,
        prediction: Union[float, int, np.ndarray],
        features: Dict,
        prediction_id: str,
        timestamp: Optional[datetime] = None
    ) -> None:
        """
        記錄單次預測

        這個方法用於記錄模型的每次預測,
        包括輸入特徵和預測結果。

        參數說明:
        - prediction: 預測結果
        - features: 輸入特徵字典
        - prediction_id: 預測唯一識別碼
        - timestamp: 預測時間戳記(選填)
        """
        # 如果沒有提供時間戳記,使用當前時間
        if timestamp is None:
            timestamp = datetime.now()

        # 建立預測記錄
        log_entry = {
            "prediction_id": prediction_id,
            "timestamp": timestamp,
            "model_version": self.model_version,
            "features": features,
            "prediction": (
                prediction.tolist()
                if isinstance(prediction, np.ndarray)
                else prediction
            ),
            "actual": None  # 真實標籤稍後更新
        }

        self.prediction_logs.append(log_entry)

    def update_actual(
        self,
        prediction_id: str,
        actual: Union[float, int]
    ) -> None:
        """
        更新預測記錄的真實標籤

        當真實標籤可用時,呼叫此方法更新對應的預測記錄。

        參數說明:
        - prediction_id: 預測唯一識別碼
        - actual: 真實標籤值
        """
        # 尋找對應的預測記錄並更新
        for log in self.prediction_logs:
            if log["prediction_id"] == prediction_id:
                log["actual"] = actual
                break

    def _calculate_classification_metrics(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        y_prob: Optional[np.ndarray] = None
    ) -> Dict[str, float]:
        """
        計算分類任務的效能指標

        參數說明:
        - y_true: 真實標籤陣列
        - y_pred: 預測標籤陣列
        - y_prob: 預測機率陣列(選填,用於計算 AUC)

        回傳值:
        - 效能指標字典
        """
        metrics = {
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": precision_score(
                y_true, y_pred,
                average="weighted",
                zero_division=0
            ),
            "recall": recall_score(
                y_true, y_pred,
                average="weighted",
                zero_division=0
            ),
            "f1": f1_score(
                y_true, y_pred,
                average="weighted",
                zero_division=0
            )
        }

        # 如果提供了預測機率,計算 AUC
        if y_prob is not None:
            try:
                # 嘗試計算 AUC
                if len(np.unique(y_true)) == 2:
                    # 二元分類
                    metrics["auc"] = roc_auc_score(y_true, y_prob)
                else:
                    # 多類別分類
                    metrics["auc"] = roc_auc_score(
                        y_true, y_prob,
                        multi_class="ovr",
                        average="weighted"
                    )
            except ValueError:
                # 如果無法計算 AUC(例如只有一個類別),設為 None
                metrics["auc"] = None

        return metrics

    def _calculate_regression_metrics(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray
    ) -> Dict[str, float]:
        """
        計算迴歸任務的效能指標

        參數說明:
        - y_true: 真實值陣列
        - y_pred: 預測值陣列

        回傳值:
        - 效能指標字典
        """
        metrics = {
            "mse": mean_squared_error(y_true, y_pred),
            "rmse": np.sqrt(mean_squared_error(y_true, y_pred)),
            "mae": mean_absolute_error(y_true, y_pred),
            "r2": r2_score(y_true, y_pred),
            "mape": np.mean(
                np.abs((y_true - y_pred) / (y_true + 1e-8))
            ) * 100
        }

        return metrics

    def calculate_metrics(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        y_prob: Optional[np.ndarray] = None,
        segment: Optional[str] = None
    ) -> PerformanceRecord:
        """
        計算效能指標並建立記錄

        這個方法會根據任務類型計算相應的效能指標,
        並建立 PerformanceRecord 物件。

        參數說明:
        - y_true: 真實標籤陣列
        - y_pred: 預測結果陣列
        - y_prob: 預測機率陣列(選填)
        - segment: 子群體標籤(選填)

        回傳值:
        - PerformanceRecord 物件
        """
        # 根據任務類型選擇計算方法
        if self.task_type == "classification":
            metrics = self._calculate_classification_metrics(
                y_true, y_pred, y_prob
            )
        else:
            metrics = self._calculate_regression_metrics(y_true, y_pred)

        # 建立效能記錄
        record = PerformanceRecord(
            timestamp=datetime.now(),
            model_version=self.model_version,
            metrics=metrics,
            sample_size=len(y_true),
            segment=segment
        )

        # 儲存記錄
        self.performance_history.append(record)

        return record

    def check_alerts(
        self,
        metrics: Dict[str, float]
    ) -> List[Dict[str, str]]:
        """
        檢查效能指標是否觸發警報

        參數說明:
        - metrics: 當前效能指標字典

        回傳值:
        - 警報列表,每個警報包含指標名稱、當前值和閾值
        """
        alerts = []

        for metric_name, threshold in self.alert_thresholds.items():
            if metric_name in metrics:
                current_value = metrics[metric_name]

                # 對於誤差類指標(越小越好),當超過閾值時警報
                # 對於效能類指標(越大越好),當低於閾值時警報
                error_metrics = ["mse", "rmse", "mae", "mape"]

                if metric_name in error_metrics:
                    if current_value > threshold:
                        alerts.append({
                            "metric": metric_name,
                            "current_value": current_value,
                            "threshold": threshold,
                            "message": (
                                f"{metric_name} ({current_value:.4f}) "
                                f"超過閾值 ({threshold})"
                            )
                        })
                else:
                    if current_value < threshold:
                        alerts.append({
                            "metric": metric_name,
                            "current_value": current_value,
                            "threshold": threshold,
                            "message": (
                                f"{metric_name} ({current_value:.4f}) "
                                f"低於閾值 ({threshold})"
                            )
                        })

        return alerts

    def get_performance_trend(
        self,
        metric_name: str,
        days: int = 7
    ) -> pd.DataFrame:
        """
        取得效能指標的趨勢資料

        參數說明:
        - metric_name: 指標名稱
        - days: 回溯天數

        回傳值:
        - 包含時間和指標值的 DataFrame
        """
        # 計算起始時間
        start_time = datetime.now() - timedelta(days=days)

        # 篩選指定時間範圍內的記錄
        records = [
            r for r in self.performance_history
            if r.timestamp >= start_time
        ]

        # 建立趨勢資料
        trend_data = []
        for record in records:
            if metric_name in record.metrics:
                trend_data.append({
                    "timestamp": record.timestamp,
                    "value": record.metrics[metric_name],
                    "segment": record.segment
                })

        return pd.DataFrame(trend_data)

    def compare_with_baseline(
        self,
        current_metrics: Dict[str, float]
    ) -> Dict[str, Dict[str, float]]:
        """
        與基準效能比較

        參數說明:
        - current_metrics: 當前效能指標字典

        回傳值:
        - 比較結果字典,包含基準值、當前值和變化百分比
        """
        if self.baseline_metrics is None:
            raise ValueError("未設定基準效能,請先呼叫 set_baseline 方法")

        comparison = {}

        for metric_name, current_value in current_metrics.items():
            if metric_name in self.baseline_metrics:
                baseline_value = self.baseline_metrics[metric_name]

                # 計算變化百分比
                if baseline_value != 0:
                    change_percent = (
                        (current_value - baseline_value) / baseline_value
                    ) * 100
                else:
                    change_percent = 0 if current_value == 0 else float('inf')

                comparison[metric_name] = {
                    "baseline": baseline_value,
                    "current": current_value,
                    "change_percent": change_percent
                }

        return comparison

    def generate_report(self) -> Dict:
        """
        產生監控報告

        回傳值:
        - 包含完整監控資訊的報告字典
        """
        # 取得最近一次的效能記錄
        if not self.performance_history:
            return {"error": "沒有可用的效能記錄"}

        latest_record = self.performance_history[-1]

        report = {
            "report_time": datetime.now().isoformat(),
            "model_name": self.model_name,
            "model_version": self.model_version,
            "task_type": self.task_type,
            "latest_metrics": latest_record.metrics,
            "sample_size": latest_record.sample_size,
            "alerts": self.check_alerts(latest_record.metrics),
            "total_predictions": len(self.prediction_logs),
            "history_length": len(self.performance_history)
        }

        # 如果有基準效能,加入比較結果
        if self.baseline_metrics:
            report["baseline_comparison"] = self.compare_with_baseline(
                latest_record.metrics
            )

        return report

這個 ModelMonitor 類別提供了完整的效能追蹤功能,包括預測記錄、指標計算、警報檢查和報告生成。它支援分類和迴歸兩種任務類型,並能夠追蹤特定子群體的效能表現。

A/B 測試框架

A/B Testing 是評估模型改進效果的重要方法。透過將流量隨機分配給不同版本的模型,我們可以客觀地比較它們的效能差異。一個完善的 A/B Testing 框架需要支援流量分配、指標收集、統計分析以及實驗管理。

以下是 A/B 測試框架的流程圖:

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:收到預測請求;

:計算使用者雜湊值;

if (雜湊值 < 分流比例?) then (是)
    :使用實驗組模型;
    :記錄為實驗組;
else (否)
    :使用對照組模型;
    :記錄為對照組;
endif

:執行預測;

:記錄預測結果;

:收集效能指標;

if (達到樣本數量?) then (是)
    :執行統計檢定;

    if (差異顯著?) then (是)
        :判定實驗結論;
    else (否)
        :繼續收集資料;
    endif
else (否)
    :繼續實驗;
endif

stop

@enduml

以下是 A/B 測試管理器的實作:

# 匯入必要的套件
import numpy as np
import hashlib
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from dataclasses import dataclass, field
from scipy import stats
import pandas as pd

# 定義實驗配置的資料類別
@dataclass
class ExperimentConfig:
    # 實驗名稱
    name: str
    # 實驗組模型識別碼
    treatment_model_id: str
    # 對照組模型識別碼
    control_model_id: str
    # 實驗組流量比例(0 到 1 之間)
    traffic_ratio: float
    # 主要評估指標
    primary_metric: str
    # 最小樣本數量
    min_sample_size: int
    # 顯著性水準
    significance_level: float = 0.05
    # 實驗開始時間
    start_time: datetime = field(default_factory=datetime.now)
    # 實驗狀態
    status: str = "running"

# 定義實驗結果的資料類別
@dataclass
class ExperimentResult:
    # 實驗名稱
    experiment_name: str
    # 對照組指標
    control_metrics: Dict[str, float]
    # 實驗組指標
    treatment_metrics: Dict[str, float]
    # 對照組樣本數
    control_sample_size: int
    # 實驗組樣本數
    treatment_sample_size: int
    # p 值
    p_value: float
    # 效果量(提升百分比)
    effect_size: float
    # 是否顯著
    is_significant: bool
    # 信賴區間
    confidence_interval: Tuple[float, float]
    # 分析時間
    analysis_time: datetime

class ABTestManager:
    """
    A/B 測試管理器類別

    這個類別提供完整的 A/B 測試功能,
    包括流量分配、指標收集和統計分析。
    """

    def __init__(self):
        """
        初始化 ABTestManager
        """
        # 儲存實驗配置的字典
        self.experiments: Dict[str, ExperimentConfig] = {}
        # 儲存實驗資料的字典
        self.experiment_data: Dict[str, Dict[str, List]] = {}

    def create_experiment(
        self,
        name: str,
        treatment_model_id: str,
        control_model_id: str,
        traffic_ratio: float = 0.5,
        primary_metric: str = "accuracy",
        min_sample_size: int = 1000,
        significance_level: float = 0.05
    ) -> ExperimentConfig:
        """
        建立新的 A/B 測試實驗

        參數說明:
        - name: 實驗名稱
        - treatment_model_id: 實驗組模型識別碼
        - control_model_id: 對照組模型識別碼
        - traffic_ratio: 實驗組流量比例
        - primary_metric: 主要評估指標
        - min_sample_size: 最小樣本數量
        - significance_level: 顯著性水準

        回傳值:
        - ExperimentConfig 物件
        """
        # 建立實驗配置
        config = ExperimentConfig(
            name=name,
            treatment_model_id=treatment_model_id,
            control_model_id=control_model_id,
            traffic_ratio=traffic_ratio,
            primary_metric=primary_metric,
            min_sample_size=min_sample_size,
            significance_level=significance_level
        )

        # 儲存實驗配置
        self.experiments[name] = config

        # 初始化實驗資料儲存
        self.experiment_data[name] = {
            "control": [],
            "treatment": []
        }

        return config

    def assign_variant(
        self,
        experiment_name: str,
        user_id: str
    ) -> str:
        """
        根據使用者 ID 分配實驗變體

        使用雜湊函數確保同一使用者始終分配到相同的變體,
        這對於一致性體驗很重要。

        參數說明:
        - experiment_name: 實驗名稱
        - user_id: 使用者識別碼

        回傳值:
        - 變體名稱("control" 或 "treatment")
        """
        # 檢查實驗是否存在
        if experiment_name not in self.experiments:
            raise ValueError(f"實驗 '{experiment_name}' 不存在")

        config = self.experiments[experiment_name]

        # 使用 MD5 雜湊函數計算使用者的分配值
        # 將使用者 ID 和實驗名稱組合以確保跨實驗的獨立性
        hash_input = f"{user_id}_{experiment_name}"
        hash_value = hashlib.md5(hash_input.encode()).hexdigest()

        # 將雜湊值轉換為 0 到 1 之間的數值
        hash_int = int(hash_value, 16)
        normalized_value = hash_int / (16 ** 32)

        # 根據分流比例分配變體
        if normalized_value < config.traffic_ratio:
            return "treatment"
        else:
            return "control"

    def get_model_id(
        self,
        experiment_name: str,
        variant: str
    ) -> str:
        """
        取得指定變體的模型識別碼

        參數說明:
        - experiment_name: 實驗名稱
        - variant: 變體名稱

        回傳值:
        - 模型識別碼
        """
        config = self.experiments[experiment_name]

        if variant == "treatment":
            return config.treatment_model_id
        else:
            return config.control_model_id

    def record_outcome(
        self,
        experiment_name: str,
        variant: str,
        metric_value: float,
        metadata: Optional[Dict] = None
    ) -> None:
        """
        記錄實驗結果

        參數說明:
        - experiment_name: 實驗名稱
        - variant: 變體名稱
        - metric_value: 指標值
        - metadata: 額外的元資料(選填)
        """
        # 檢查實驗是否存在
        if experiment_name not in self.experiment_data:
            raise ValueError(f"實驗 '{experiment_name}' 不存在")

        # 建立記錄
        record = {
            "metric_value": metric_value,
            "timestamp": datetime.now(),
            "metadata": metadata or {}
        }

        # 儲存記錄
        self.experiment_data[experiment_name][variant].append(record)

    def _calculate_statistics(
        self,
        control_values: List[float],
        treatment_values: List[float],
        significance_level: float
    ) -> Tuple[float, float, bool, Tuple[float, float]]:
        """
        計算統計檢定結果

        使用 Welch's t-test 比較兩組樣本的平均值差異。

        參數說明:
        - control_values: 對照組指標值列表
        - treatment_values: 實驗組指標值列表
        - significance_level: 顯著性水準

        回傳值:
        - p 值、效果量、是否顯著、信賴區間
        """
        control_array = np.array(control_values)
        treatment_array = np.array(treatment_values)

        # 執行 Welch's t-test
        # 這種檢定方法不假設兩組樣本的變異數相等
        t_stat, p_value = stats.ttest_ind(
            treatment_array,
            control_array,
            equal_var=False
        )

        # 計算效果量(相對於對照組的提升百分比)
        control_mean = np.mean(control_array)
        treatment_mean = np.mean(treatment_array)

        if control_mean != 0:
            effect_size = (
                (treatment_mean - control_mean) / control_mean
            ) * 100
        else:
            effect_size = 0 if treatment_mean == 0 else float('inf')

        # 判斷是否顯著
        is_significant = p_value < significance_level

        # 計算效果量的信賴區間
        # 使用 bootstrap 方法估計
        n_bootstrap = 1000
        bootstrap_effects = []

        for _ in range(n_bootstrap):
            # 重複抽樣
            boot_control = np.random.choice(
                control_array,
                size=len(control_array),
                replace=True
            )
            boot_treatment = np.random.choice(
                treatment_array,
                size=len(treatment_array),
                replace=True
            )

            # 計算效果量
            boot_control_mean = np.mean(boot_control)
            boot_treatment_mean = np.mean(boot_treatment)

            if boot_control_mean != 0:
                boot_effect = (
                    (boot_treatment_mean - boot_control_mean) /
                    boot_control_mean
                ) * 100
                bootstrap_effects.append(boot_effect)

        # 計算信賴區間
        ci_lower = np.percentile(bootstrap_effects, 2.5)
        ci_upper = np.percentile(bootstrap_effects, 97.5)

        return p_value, effect_size, is_significant, (ci_lower, ci_upper)

    def analyze_experiment(
        self,
        experiment_name: str
    ) -> ExperimentResult:
        """
        分析實驗結果

        參數說明:
        - experiment_name: 實驗名稱

        回傳值:
        - ExperimentResult 物件
        """
        # 檢查實驗是否存在
        if experiment_name not in self.experiments:
            raise ValueError(f"實驗 '{experiment_name}' 不存在")

        config = self.experiments[experiment_name]
        data = self.experiment_data[experiment_name]

        # 提取指標值
        control_values = [r["metric_value"] for r in data["control"]]
        treatment_values = [r["metric_value"] for r in data["treatment"]]

        # 檢查樣本數量
        if len(control_values) < 2 or len(treatment_values) < 2:
            raise ValueError("樣本數量不足,無法進行統計分析")

        # 計算統計結果
        p_value, effect_size, is_significant, ci = self._calculate_statistics(
            control_values,
            treatment_values,
            config.significance_level
        )

        # 計算各組的平均指標
        control_metrics = {
            config.primary_metric: np.mean(control_values),
            f"{config.primary_metric}_std": np.std(control_values)
        }

        treatment_metrics = {
            config.primary_metric: np.mean(treatment_values),
            f"{config.primary_metric}_std": np.std(treatment_values)
        }

        # 建立結果物件
        result = ExperimentResult(
            experiment_name=experiment_name,
            control_metrics=control_metrics,
            treatment_metrics=treatment_metrics,
            control_sample_size=len(control_values),
            treatment_sample_size=len(treatment_values),
            p_value=p_value,
            effect_size=effect_size,
            is_significant=is_significant,
            confidence_interval=ci,
            analysis_time=datetime.now()
        )

        return result

    def should_stop_experiment(
        self,
        experiment_name: str
    ) -> Tuple[bool, str]:
        """
        判斷是否應該停止實驗

        根據樣本數量和統計顯著性來判斷。

        參數說明:
        - experiment_name: 實驗名稱

        回傳值:
        - 是否應該停止、原因說明
        """
        config = self.experiments[experiment_name]
        data = self.experiment_data[experiment_name]

        control_size = len(data["control"])
        treatment_size = len(data["treatment"])

        # 檢查是否達到最小樣本數量
        if (control_size < config.min_sample_size or
            treatment_size < config.min_sample_size):
            return False, (
                f"樣本數量不足:對照組 {control_size},"
                f"實驗組 {treatment_size},"
                f"目標 {config.min_sample_size}"
            )

        # 執行分析
        try:
            result = self.analyze_experiment(experiment_name)

            if result.is_significant:
                return True, (
                    f"達到統計顯著性:p 值 = {result.p_value:.4f},"
                    f"效果量 = {result.effect_size:.2f}%"
                )
            else:
                return False, (
                    f"尚未達到統計顯著性:p 值 = {result.p_value:.4f}"
                )
        except Exception as e:
            return False, f"分析失敗:{str(e)}"

    def get_experiment_summary(
        self,
        experiment_name: str
    ) -> Dict:
        """
        取得實驗摘要

        參數說明:
        - experiment_name: 實驗名稱

        回傳值:
        - 實驗摘要字典
        """
        config = self.experiments[experiment_name]
        data = self.experiment_data[experiment_name]

        summary = {
            "experiment_name": experiment_name,
            "status": config.status,
            "start_time": config.start_time.isoformat(),
            "traffic_ratio": config.traffic_ratio,
            "primary_metric": config.primary_metric,
            "control_sample_size": len(data["control"]),
            "treatment_sample_size": len(data["treatment"]),
            "min_sample_size": config.min_sample_size
        }

        # 如果有足夠的資料,加入分析結果
        if (len(data["control"]) >= 2 and
            len(data["treatment"]) >= 2):
            try:
                result = self.analyze_experiment(experiment_name)
                summary["analysis"] = {
                    "p_value": result.p_value,
                    "effect_size": result.effect_size,
                    "is_significant": result.is_significant,
                    "confidence_interval": result.confidence_interval
                }
            except Exception as e:
                summary["analysis_error"] = str(e)

        return summary

這個 ABTestManager 類別提供了完整的 A/B 測試管理功能。它使用雜湊函數來確保使用者分配的一致性,並使用 Welch’s t-test 進行統計分析。Bootstrap 方法用於計算效果量的信賴區間,提供更可靠的結果評估。

監控儀表板建置

監控儀表板是將所有監控資訊視覺化呈現的介面。一個好的儀表板應該能夠清楚地展示模型的健康狀態,並在問題發生時提供足夠的資訊來協助診斷。以下是儀表板資料整合器的實作:

# 匯入必要的套件
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import json

# 定義儀表板面板的資料類別
@dataclass
class DashboardPanel:
    # 面板標題
    title: str
    # 面板類型(chart, metric, table, alert)
    panel_type: str
    # 面板資料
    data: Dict
    # 更新時間
    updated_at: datetime

class MonitoringDashboard:
    """
    監控儀表板類別

    這個類別整合各個監控元件的資料,
    並提供統一的介面來產生儀表板所需的資訊。
    """

    def __init__(
        self,
        model_monitor: 'ModelMonitor',
        drift_detector: 'DriftDetector',
        concept_drift_detector: 'ConceptDriftDetector',
        ab_test_manager: Optional['ABTestManager'] = None
    ):
        """
        初始化 MonitoringDashboard

        參數說明:
        - model_monitor: 模型監控器實例
        - drift_detector: Data Drift 偵測器實例
        - concept_drift_detector: Concept Drift 偵測器實例
        - ab_test_manager: A/B 測試管理器實例(選填)
        """
        self.model_monitor = model_monitor
        self.drift_detector = drift_detector
        self.concept_drift_detector = concept_drift_detector
        self.ab_test_manager = ab_test_manager

        # 儲存儀表板面板
        self.panels: List[DashboardPanel] = []

    def _create_health_summary_panel(self) -> DashboardPanel:
        """
        建立健康狀態摘要面板

        這個面板顯示模型的整體健康狀態。

        回傳值:
        - DashboardPanel 物件
        """
        # 取得最新的效能記錄
        if self.model_monitor.performance_history:
            latest_record = self.model_monitor.performance_history[-1]

            # 檢查警報
            alerts = self.model_monitor.check_alerts(latest_record.metrics)

            # 判斷健康狀態
            if len(alerts) == 0:
                health_status = "健康"
            elif len(alerts) <= 2:
                health_status = "警告"
            else:
                health_status = "危險"

            data = {
                "health_status": health_status,
                "active_alerts": len(alerts),
                "latest_metrics": latest_record.metrics,
                "sample_size": latest_record.sample_size,
                "model_version": self.model_monitor.model_version
            }
        else:
            data = {
                "health_status": "未知",
                "active_alerts": 0,
                "latest_metrics": {},
                "sample_size": 0,
                "model_version": self.model_monitor.model_version
            }

        return DashboardPanel(
            title="模型健康狀態",
            panel_type="metric",
            data=data,
            updated_at=datetime.now()
        )

    def _create_drift_status_panel(self) -> DashboardPanel:
        """
        建立漂移狀態面板

        這個面板顯示 Data Drift 和 Concept Drift 的偵測結果。

        回傳值:
        - DashboardPanel 物件
        """
        # 取得 Data Drift 摘要
        drift_summary = self.drift_detector.get_drift_summary()

        # 計算漂移統計
        if len(drift_summary) > 0:
            drift_count = drift_summary["偵測到漂移"].sum()
            total_features = len(drift_summary)
            drift_percentage = (drift_count / total_features) * 100
        else:
            drift_count = 0
            total_features = 0
            drift_percentage = 0

        # 取得 Concept Drift 統計
        concept_drift_stats = self.concept_drift_detector.get_window_statistics()

        # 取得最近的 Concept Drift 偵測結果
        if self.concept_drift_detector.detection_history:
            latest_concept = self.concept_drift_detector.detection_history[-1]
            concept_drift_detected = latest_concept.drift_detected
        else:
            concept_drift_detected = False

        data = {
            "data_drift": {
                "features_with_drift": drift_count,
                "total_features": total_features,
                "drift_percentage": drift_percentage
            },
            "concept_drift": {
                "detected": concept_drift_detected,
                "window_stats": concept_drift_stats
            }
        }

        return DashboardPanel(
            title="漂移偵測狀態",
            panel_type="metric",
            data=data,
            updated_at=datetime.now()
        )

    def _create_performance_trend_panel(
        self,
        metric_name: str,
        days: int = 7
    ) -> DashboardPanel:
        """
        建立效能趨勢面板

        這個面板顯示指定指標在一段時間內的變化趨勢。

        參數說明:
        - metric_name: 指標名稱
        - days: 回溯天數

        回傳值:
        - DashboardPanel 物件
        """
        # 取得趨勢資料
        trend_df = self.model_monitor.get_performance_trend(
            metric_name,
            days
        )

        # 轉換為儀表板所需的格式
        if len(trend_df) > 0:
            trend_data = {
                "timestamps": trend_df["timestamp"].dt.strftime(
                    "%Y-%m-%d %H:%M"
                ).tolist(),
                "values": trend_df["value"].tolist(),
                "metric_name": metric_name
            }
        else:
            trend_data = {
                "timestamps": [],
                "values": [],
                "metric_name": metric_name
            }

        return DashboardPanel(
            title=f"{metric_name} 趨勢",
            panel_type="chart",
            data=trend_data,
            updated_at=datetime.now()
        )

    def _create_alerts_panel(self) -> DashboardPanel:
        """
        建立警報面板

        這個面板列出所有目前觸發的警報。

        回傳值:
        - DashboardPanel 物件
        """
        # 收集所有警報
        all_alerts = []

        # 效能警報
        if self.model_monitor.performance_history:
            latest_record = self.model_monitor.performance_history[-1]
            performance_alerts = self.model_monitor.check_alerts(
                latest_record.metrics
            )
            for alert in performance_alerts:
                alert["type"] = "效能"
                all_alerts.append(alert)

        # Data Drift 警報
        if self.drift_detector.detection_history:
            for result in self.drift_detector.detection_history[-10:]:
                if result.drift_detected:
                    all_alerts.append({
                        "type": "Data Drift",
                        "metric": result.feature_name,
                        "message": (
                            f"特徵 {result.feature_name} "
                            f"偵測到漂移,嚴重程度:{result.severity}"
                        )
                    })

        # Concept Drift 警報
        if self.concept_drift_detector.detection_history:
            latest_concept = self.concept_drift_detector.detection_history[-1]
            if latest_concept.drift_detected:
                all_alerts.append({
                    "type": "Concept Drift",
                    "metric": "模型效能",
                    "message": (
                        f"偵測到 Concept Drift,"
                        f"類型:{latest_concept.drift_type}"
                    )
                })

        return DashboardPanel(
            title="警報列表",
            panel_type="alert",
            data={"alerts": all_alerts},
            updated_at=datetime.now()
        )

    def _create_ab_test_panel(self) -> Optional[DashboardPanel]:
        """
        建立 A/B 測試面板

        這個面板顯示所有進行中的 A/B 測試狀態。

        回傳值:
        - DashboardPanel 物件,如果沒有 A/B 測試管理器則返回 None
        """
        if self.ab_test_manager is None:
            return None

        experiments_summary = []

        for experiment_name in self.ab_test_manager.experiments:
            summary = self.ab_test_manager.get_experiment_summary(
                experiment_name
            )
            experiments_summary.append(summary)

        return DashboardPanel(
            title="A/B 測試狀態",
            panel_type="table",
            data={"experiments": experiments_summary},
            updated_at=datetime.now()
        )

    def generate_dashboard(
        self,
        metric_name: str = "accuracy"
    ) -> Dict:
        """
        產生完整的儀表板資料

        參數說明:
        - metric_name: 要追蹤的主要指標名稱

        回傳值:
        - 包含所有面板資料的字典
        """
        # 建立所有面板
        self.panels = []

        # 健康狀態摘要
        self.panels.append(self._create_health_summary_panel())

        # 漂移狀態
        self.panels.append(self._create_drift_status_panel())

        # 效能趨勢
        self.panels.append(
            self._create_performance_trend_panel(metric_name)
        )

        # 警報列表
        self.panels.append(self._create_alerts_panel())

        # A/B 測試(如果有)
        ab_panel = self._create_ab_test_panel()
        if ab_panel:
            self.panels.append(ab_panel)

        # 組裝儀表板
        dashboard = {
            "generated_at": datetime.now().isoformat(),
            "model_name": self.model_monitor.model_name,
            "model_version": self.model_monitor.model_version,
            "panels": [
                {
                    "title": panel.title,
                    "type": panel.panel_type,
                    "data": panel.data,
                    "updated_at": panel.updated_at.isoformat()
                }
                for panel in self.panels
            ]
        }

        return dashboard

    def export_to_json(self, filepath: str) -> None:
        """
        將儀表板資料匯出為 JSON 檔案

        參數說明:
        - filepath: 輸出檔案路徑
        """
        dashboard = self.generate_dashboard()

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(dashboard, f, ensure_ascii=False, indent=2)

這個 MonitoringDashboard 類別整合了所有監控元件的資料,並提供統一的介面來產生儀表板所需的資訊。它支援匯出 JSON 格式,方便與前端視覺化工具整合。

實務案例分析

讓我們透過一個實際的案例來展示如何使用這些監控工具。假設我們有一個電商平台的商品推薦模型,需要持續監控其效能並偵測可能的漂移。

以下是完整的使用範例:

# 匯入所有必要的模組
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

# 假設我們已經定義了上述所有類別

# 模擬訓練資料和生產資料
# 在實際應用中,這些資料會來自資料庫或資料管道
np.random.seed(42)

# 建立參考資料(訓練時的資料分佈)
reference_data = {
    "user_age": np.random.normal(35, 10, 10000),
    "purchase_frequency": np.random.exponential(5, 10000),
    "session_duration": np.random.gamma(2, 50, 10000)
}

# 建立當前生產資料(可能已經發生漂移)
# 模擬使用者年齡分佈改變的情況
current_data = {
    "user_age": np.random.normal(38, 12, 1000),  # 平均年齡上升,變異增加
    "purchase_frequency": np.random.exponential(4.5, 1000),  # 購買頻率下降
    "session_duration": np.random.gamma(2, 50, 1000)  # 維持穩定
}

# 步驟 1:初始化 Data Drift 偵測器
print("初始化 Data Drift 偵測器...")
drift_detector = DriftDetector(
    ks_threshold=0.05,
    psi_threshold=0.2,
    wasserstein_threshold=0.1
)

# 設定各特徵的參考分佈
for feature_name, data in reference_data.items():
    drift_detector.set_reference(feature_name, data)
    print(f"  已設定特徵 '{feature_name}' 的參考分佈")

# 步驟 2:執行 Data Drift 偵測
print("\n執行 Data Drift 偵測...")
current_df = pd.DataFrame(current_data)
drift_results = drift_detector.detect_all_features(current_df)

# 輸出偵測結果
for feature_name, result in drift_results.items():
    print(f"\n特徵: {feature_name}")
    print(f"  KS 統計量: {result.ks_statistic:.4f}")
    print(f"  KS p 值: {result.ks_pvalue:.4f}")
    print(f"  PSI 值: {result.psi_value:.4f}")
    print(f"  Wasserstein 距離: {result.wasserstein_distance:.4f}")
    print(f"  偵測到漂移: {result.drift_detected}")
    print(f"  嚴重程度: {result.severity}")

# 步驟 3:初始化 Concept Drift 偵測器
print("\n\n初始化 Concept Drift 偵測器...")
concept_detector = ConceptDriftDetector(
    delta=0.005,
    lambda_threshold=50.0,
    window_size=100
)

# 模擬一段時間的效能指標
# 前 50 個樣本效能穩定,後 50 個樣本效能下降
stable_performance = np.random.normal(0.85, 0.02, 50)
degraded_performance = np.random.normal(0.78, 0.03, 50)
performance_stream = np.concatenate([stable_performance, degraded_performance])

# 步驟 4:逐一更新 Concept Drift 偵測器
print("監控效能變化...")
for i, perf in enumerate(performance_stream):
    result = concept_detector.update(perf)

    if result.drift_detected:
        print(f"\n在第 {i+1} 個樣本偵測到 Concept Drift!")
        print(f"  漂移類型: {result.drift_type}")
        print(f"  累積統計量: {result.cumulative_sum:.4f}")

# 輸出視窗統計
window_stats = concept_detector.get_window_statistics()
print(f"\n滑動視窗統計:")
print(f"  平均值: {window_stats['平均值']:.4f}")
print(f"  標準差: {window_stats['標準差']:.4f}")

# 步驟 5:初始化模型監控器
print("\n\n初始化模型監控器...")
model_monitor = ModelMonitor(
    model_name="推薦模型",
    model_version="1.2.0",
    task_type="classification"
)

# 設定基準效能和警報閾值
model_monitor.set_baseline({
    "accuracy": 0.85,
    "precision": 0.82,
    "recall": 0.88,
    "f1": 0.85
})

model_monitor.set_alert_threshold("accuracy", 0.80)
model_monitor.set_alert_threshold("f1", 0.78)

# 模擬預測結果
y_true = np.random.randint(0, 2, 500)
y_pred = y_true.copy()
# 模擬一些錯誤預測
error_indices = np.random.choice(500, 60, replace=False)
y_pred[error_indices] = 1 - y_pred[error_indices]

# 計算效能指標
record = model_monitor.calculate_metrics(y_true, y_pred)

print("\n效能指標:")
for metric_name, value in record.metrics.items():
    print(f"  {metric_name}: {value:.4f}")

# 檢查警報
alerts = model_monitor.check_alerts(record.metrics)
if alerts:
    print("\n觸發的警報:")
    for alert in alerts:
        print(f"  {alert['message']}")

# 與基準比較
comparison = model_monitor.compare_with_baseline(record.metrics)
print("\n與基準效能比較:")
for metric_name, values in comparison.items():
    print(
        f"  {metric_name}: "
        f"基準 {values['baseline']:.4f} -> "
        f"當前 {values['current']:.4f} "
        f"({values['change_percent']:+.2f}%)"
    )

# 步驟 6:設定 A/B 測試
print("\n\n設定 A/B 測試...")
ab_manager = ABTestManager()

# 建立實驗
experiment = ab_manager.create_experiment(
    name="新推薦演算法測試",
    treatment_model_id="model_v2.0",
    control_model_id="model_v1.2",
    traffic_ratio=0.5,
    primary_metric="conversion_rate",
    min_sample_size=1000,
    significance_level=0.05
)

print(f"實驗 '{experiment.name}' 已建立")
print(f"  實驗組模型: {experiment.treatment_model_id}")
print(f"  對照組模型: {experiment.control_model_id}")
print(f"  流量比例: {experiment.traffic_ratio}")

# 模擬實驗資料
# 對照組轉換率約 3%
control_conversions = np.random.binomial(1, 0.03, 1200)
# 實驗組轉換率約 3.5%(提升約 17%)
treatment_conversions = np.random.binomial(1, 0.035, 1200)

# 記錄實驗結果
for conversion in control_conversions:
    ab_manager.record_outcome(
        "新推薦演算法測試",
        "control",
        conversion
    )

for conversion in treatment_conversions:
    ab_manager.record_outcome(
        "新推薦演算法測試",
        "treatment",
        conversion
    )

# 分析實驗結果
result = ab_manager.analyze_experiment("新推薦演算法測試")

print(f"\n實驗分析結果:")
print(f"  對照組轉換率: {result.control_metrics['conversion_rate']:.4f}")
print(f"  實驗組轉換率: {result.treatment_metrics['conversion_rate']:.4f}")
print(f"  效果量: {result.effect_size:.2f}%")
print(f"  p 值: {result.p_value:.4f}")
print(f"  是否顯著: {result.is_significant}")
print(
    f"  95% 信賴區間: "
    f"[{result.confidence_interval[0]:.2f}%, "
    f"{result.confidence_interval[1]:.2f}%]"
)

# 步驟 7:產生監控儀表板
print("\n\n產生監控儀表板...")
dashboard = MonitoringDashboard(
    model_monitor=model_monitor,
    drift_detector=drift_detector,
    concept_drift_detector=concept_detector,
    ab_test_manager=ab_manager
)

# 產生儀表板資料
dashboard_data = dashboard.generate_dashboard(metric_name="accuracy")

print(f"儀表板產生時間: {dashboard_data['generated_at']}")
print(f"面板數量: {len(dashboard_data['panels'])}")

for panel in dashboard_data['panels']:
    print(f"\n面板: {panel['title']}")
    print(f"  類型: {panel['type']}")

# 產生監控報告
report = model_monitor.generate_report()
print("\n\n監控報告:")
print(f"  報告時間: {report['report_time']}")
print(f"  模型名稱: {report['model_name']}")
print(f"  模型版本: {report['model_version']}")
print(f"  警報數量: {len(report['alerts'])}")
print(f"  總預測數: {report['total_predictions']}")

這個範例展示了如何整合所有監控元件來建立一個完整的監控系統。從 Data Drift 偵測到 Concept Drift 處理,從效能追蹤到 A/B 測試,每個元件都扮演著重要的角色。

最佳實務建議

在實施機器學習模型監控系統時,有幾個重要的最佳實務需要注意。首先,監控策略應該與業務目標緊密結合。不同的應用場景需要不同的監控重點,例如金融風險模型可能更關注 False Negative Rate,而推薦系統可能更關注 Click-Through Rate。

其次,警報閾值的設定需要謹慎考量。過於敏感的閾值會導致大量誤報,使團隊對警報產生疲勞;而過於寬鬆的閾值則可能錯過重要的問題。建議先使用較寬鬆的閾值開始,然後根據實際情況逐步調整。

第三,監控系統本身也需要監控。確保監控服務的可用性和可靠性,避免在關鍵時刻監控系統失效而無法發現問題。可以設定監控服務的健康檢查和備援機制。

第四,建立完善的應變流程。當監控系統發出警報時,團隊應該知道如何回應。這包括問題的分級、負責人的指派、處理的時限等。定期演練應變流程可以確保團隊在實際問題發生時能夠迅速有效地處理。

第五,持續改進監控系統。隨著模型的演進和業務的變化,監控系統也需要不斷調整。定期檢視監控指標的有效性,移除不再需要的指標,並添加新的重要指標。

機器學習模型監控是一個持續的過程,而非一次性的工作。透過建立完善的監控系統,團隊可以更有信心地將模型部署到生產環境,並在問題發生時迅速回應。本文提供的程式碼範例可以作為建立監控系統的起點,團隊可以根據實際需求進行調整和擴展。

監控系統的價值不僅在於發現問題,更在於提供洞察。透過分析監控資料,團隊可以更深入地理解模型的行為模式,發現改進的機會,並做出更好的決策。這是確保機器學習專案長期成功的關鍵因素。