機器學習預測模型已成為現代企業數據驅動決策的核心工具。從電子商務的推薦系統到金融業的風險評估,從醫療診斷輔助到製造業的預測性維護,預測模型的應用無所不在。然而,建構一個真正有效的預測模型絕非僅是套用演算法這麼簡單,它需要對商業問題的深刻理解、對資料特性的細緻洞察、對演算法原理的掌握,以及對模型評估方法的嚴謹應用。更重要的是,隨著 AI 技術的普及,模型的公平性、透明度與倫理考量已成為不可忽視的議題。本文將系統性地探討預測模型建構的完整流程,從最初的目標定義到最終的模型部署,從技術實作到倫理實踐,為讀者提供全方位的指引。

預測模型建構的基礎框架

預測模型的建構是一個系統性的工程,需要經過多個階段的迭代優化。成功的預測模型專案始於清晰的目標定義,這不僅包括技術層面的指標,更重要的是要理解背後的商業價值與應用場景。一個預測客戶流失的模型,其成功標準不僅是預測準確率,更在於能否幫助企業及時採取挽留措施,最終降低流失率並提升客戶終身價值。

技術選型是建構預測模型的關鍵決策點。不同的問題類型需要不同的建模方法。迴歸分析適用於預測連續數值,如房價預測、銷售額預測等。分類模型則用於預測離散類別,如信用評等、疾病診斷等。時間序列分析專注於具有時間依賴性的資料,如股價預測、需求預測等。異常檢測則用於識別罕見事件或異常行為,如詐欺偵測、設備故障預警等。選擇合適的技術需要考慮問題的本質、資料的特性以及業務的需求。

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.metrics import (
    mean_squared_error, r2_score, mean_absolute_error,
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, classification_report, roc_auc_score, roc_curve
)
import matplotlib.pyplot as plt
import seaborn as sns
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PredictiveModelBuilder:
    """
    預測模型建構器
    提供完整的模型建構、訓練、評估與視覺化功能
    """
    
    def __init__(self, problem_type='regression'):
        """
        初始化模型建構器
        
        Parameters:
        -----------
        problem_type : str
            問題類型: 'regression' 或 'classification'
        """
        self.problem_type = problem_type
        self.model = None
        self.scaler = StandardScaler()
        self.is_fitted = False
        self.feature_importance = None
        
        logger.info(f"初始化預測模型建構器 - 問題類型: {problem_type}")
    
    def prepare_data(self, X, y, test_size=0.2, random_state=42):
        """
        準備訓練與測試資料
        
        Parameters:
        -----------
        X : pd.DataFrame or np.ndarray
            特徵資料
        y : pd.Series or np.ndarray
            目標變數
        test_size : float
            測試集比例
        random_state : int
            隨機種子
        
        Returns:
        --------
        tuple
            (X_train, X_test, y_train, y_test)
        """
        logger.info("開始資料準備流程")
        
        # 分割資料
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        # 特徵標準化
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        logger.info(f"訓練集大小: {X_train_scaled.shape}")
        logger.info(f"測試集大小: {X_test_scaled.shape}")
        
        return X_train_scaled, X_test_scaled, y_train, y_test
    
    def feature_engineering(self, df, target_col, 
                          numeric_features=None,
                          categorical_features=None):
        """
        特徵工程
        處理數值型與類別型特徵
        
        Parameters:
        -----------
        df : pd.DataFrame
            原始資料框架
        target_col : str
            目標變數欄位名稱
        numeric_features : list
            數值型特徵欄位列表
        categorical_features : list
            類別型特徵欄位列表
        
        Returns:
        --------
        tuple
            (X, y) 處理後的特徵與目標變數
        """
        logger.info("執行特徵工程")
        
        df_processed = df.copy()
        
        # 處理缺失值
        if numeric_features:
            for col in numeric_features:
                if col in df_processed.columns:
                    df_processed[col].fillna(
                        df_processed[col].median(),
                        inplace=True
                    )
        
        # 編碼類別型特徵
        if categorical_features:
            for col in categorical_features:
                if col in df_processed.columns:
                    le = LabelEncoder()
                    df_processed[col] = le.fit_transform(
                        df_processed[col].astype(str)
                    )
        
        # 分離特徵與目標變數
        X = df_processed.drop(columns=[target_col])
        y = df_processed[target_col]
        
        logger.info(f"特徵數量: {X.shape[1]}")
        logger.info(f"樣本數量: {X.shape[0]}")
        
        return X, y
    
    def train_model(self, X_train, y_train, model_type='random_forest'):
        """
        訓練預測模型
        
        Parameters:
        -----------
        X_train : np.ndarray
            訓練特徵
        y_train : np.ndarray
            訓練目標
        model_type : str
            模型類型: 'linear', 'random_forest', 'gradient_boosting'
        """
        logger.info(f"開始訓練 {model_type} 模型")
        
        if self.problem_type == 'regression':
            if model_type == 'linear':
                self.model = LinearRegression()
            elif model_type == 'random_forest':
                self.model = RandomForestRegressor(
                    n_estimators=100,
                    max_depth=10,
                    random_state=42,
                    n_jobs=-1
                )
        else:  # classification
            if model_type == 'logistic':
                self.model = LogisticRegression(
                    max_iter=1000,
                    random_state=42
                )
            elif model_type == 'random_forest':
                self.model = RandomForestClassifier(
                    n_estimators=100,
                    max_depth=10,
                    random_state=42,
                    n_jobs=-1
                )
        
        # 訓練模型
        self.model.fit(X_train, y_train)
        self.is_fitted = True
        
        # 提取特徵重要性(如果可用)
        if hasattr(self.model, 'feature_importances_'):
            self.feature_importance = self.model.feature_importances_
        
        logger.info("模型訓練完成")
    
    def evaluate_regression(self, X_test, y_test):
        """
        評估迴歸模型
        
        Parameters:
        -----------
        X_test : np.ndarray
            測試特徵
        y_test : np.ndarray
            測試目標
        
        Returns:
        --------
        dict
            評估指標字典
        """
        if not self.is_fitted:
            raise ValueError("模型尚未訓練")
        
        logger.info("開始評估迴歸模型")
        
        # 預測
        y_pred = self.model.predict(X_test)
        
        # 計算評估指標
        metrics = {
            'MSE': mean_squared_error(y_test, y_pred),
            'RMSE': np.sqrt(mean_squared_error(y_test, y_pred)),
            'MAE': mean_absolute_error(y_test, y_pred),
            'R2': r2_score(y_test, y_pred),
            'MAPE': np.mean(np.abs((y_test - y_pred) / y_test)) * 100
        }
        
        logger.info("迴歸模型評估完成")
        return metrics, y_pred
    
    def evaluate_classification(self, X_test, y_test):
        """
        評估分類模型
        
        Parameters:
        -----------
        X_test : np.ndarray
            測試特徵
        y_test : np.ndarray
            測試目標
        
        Returns:
        --------
        dict
            評估指標字典
        """
        if not self.is_fitted:
            raise ValueError("模型尚未訓練")
        
        logger.info("開始評估分類模型")
        
        # 預測
        y_pred = self.model.predict(X_test)
        y_pred_proba = None
        
        if hasattr(self.model, 'predict_proba'):
            y_pred_proba = self.model.predict_proba(X_test)
        
        # 計算評估指標
        metrics = {
            'accuracy': accuracy_score(y_test, y_pred),
            'precision': precision_score(y_test, y_pred, average='weighted', zero_division=0),
            'recall': recall_score(y_test, y_pred, average='weighted', zero_division=0),
            'f1_score': f1_score(y_test, y_pred, average='weighted', zero_division=0)
        }
        
        # 混淆矩陣
        cm = confusion_matrix(y_test, y_pred)
        
        # ROC AUC(如果是二元分類)
        if len(np.unique(y_test)) == 2 and y_pred_proba is not None:
            metrics['roc_auc'] = roc_auc_score(y_test, y_pred_proba[:, 1])
        
        logger.info("分類模型評估完成")
        return metrics, y_pred, cm
    
    def cross_validate(self, X, y, cv=5):
        """
        交叉驗證
        
        Parameters:
        -----------
        X : np.ndarray
            特徵資料
        y : np.ndarray
            目標變數
        cv : int
            交叉驗證折數
        
        Returns:
        --------
        dict
            交叉驗證結果
        """
        if not self.is_fitted:
            raise ValueError("模型尚未訓練")
        
        logger.info(f"執行 {cv}-折交叉驗證")
        
        if self.problem_type == 'regression':
            scoring = 'neg_mean_squared_error'
        else:
            scoring = 'accuracy'
        
        scores = cross_val_score(
            self.model, X, y,
            cv=cv,
            scoring=scoring,
            n_jobs=-1
        )
        
        if self.problem_type == 'regression':
            scores = -scores  # 轉換為正值
        
        cv_results = {
            'scores': scores,
            'mean': scores.mean(),
            'std': scores.std(),
            'min': scores.min(),
            'max': scores.max()
        }
        
        logger.info(f"交叉驗證平均分數: {cv_results['mean']:.4f} (+/- {cv_results['std']:.4f})")
        
        return cv_results
    
    def hyperparameter_tuning(self, X_train, y_train, param_grid, cv=5):
        """
        超參數調整
        使用網格搜尋找到最佳參數組合
        
        Parameters:
        -----------
        X_train : np.ndarray
            訓練特徵
        y_train : np.ndarray
            訓練目標
        param_grid : dict
            參數網格
        cv : int
            交叉驗證折數
        
        Returns:
        --------
        dict
            最佳參數與分數
        """
        logger.info("開始超參數調整")
        
        if self.problem_type == 'regression':
            scoring = 'neg_mean_squared_error'
        else:
            scoring = 'accuracy'
        
        grid_search = GridSearchCV(
            self.model,
            param_grid,
            cv=cv,
            scoring=scoring,
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        
        self.model = grid_search.best_estimator_
        
        results = {
            'best_params': grid_search.best_params_,
            'best_score': grid_search.best_score_,
            'cv_results': grid_search.cv_results_
        }
        
        logger.info(f"最佳參數: {results['best_params']}")
        logger.info(f"最佳分數: {results['best_score']:.4f}")
        
        return results
    
    def plot_regression_results(self, y_test, y_pred):
        """
        視覺化迴歸結果
        """
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 實際值 vs 預測值
        axes[0].scatter(y_test, y_pred, alpha=0.5)
        axes[0].plot([y_test.min(), y_test.max()],
                    [y_test.min(), y_test.max()],
                    'r--', lw=2)
        axes[0].set_xlabel('實際值', fontsize=12, fontweight='bold')
        axes[0].set_ylabel('預測值', fontsize=12, fontweight='bold')
        axes[0].set_title('實際值 vs 預測值', fontsize=14, fontweight='bold')
        axes[0].grid(True, alpha=0.3)
        
        # 殘差分佈
        residuals = y_test - y_pred
        axes[1].hist(residuals, bins=30, edgecolor='black')
        axes[1].set_xlabel('殘差', fontsize=12, fontweight='bold')
        axes[1].set_ylabel('頻率', fontsize=12, fontweight='bold')
        axes[1].set_title('殘差分佈', fontsize=14, fontweight='bold')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def plot_confusion_matrix(self, cm, class_names=None):
        """
        視覺化混淆矩陣
        """
        plt.figure(figsize=(8, 6))
        
        sns.heatmap(
            cm,
            annot=True,
            fmt='d',
            cmap='Blues',
            xticklabels=class_names if class_names else 'auto',
            yticklabels=class_names if class_names else 'auto'
        )
        
        plt.title('混淆矩陣', fontsize=14, fontweight='bold')
        plt.ylabel('實際類別', fontsize=12, fontweight='bold')
        plt.xlabel('預測類別', fontsize=12, fontweight='bold')
        plt.tight_layout()
        plt.show()
    
    def plot_feature_importance(self, feature_names, top_n=10):
        """
        視覺化特徵重要性
        """
        if self.feature_importance is None:
            logger.warning("模型不支援特徵重要性分析")
            return
        
        # 排序特徵
        indices = np.argsort(self.feature_importance)[::-1][:top_n]
        
        plt.figure(figsize=(10, 6))
        plt.barh(
            range(top_n),
            self.feature_importance[indices]
        )
        plt.yticks(
            range(top_n),
            [feature_names[i] for i in indices]
        )
        plt.xlabel('重要性分數', fontsize=12, fontweight='bold')
        plt.title(f'前 {top_n} 個重要特徵', fontsize=14, fontweight='bold')
        plt.gca().invert_yaxis()
        plt.tight_layout()
        plt.show()

# 使用範例
if __name__ == "__main__":
    
    # 範例 1: 迴歸問題 - 房價預測
    print("=" * 60)
    print("範例 1: 迴歸問題 - 房價預測")
    print("=" * 60)
    
    # 生成模擬資料
    np.random.seed(42)
    n_samples = 1000
    
    house_data = pd.DataFrame({
        'size': np.random.uniform(50, 300, n_samples),
        'bedrooms': np.random.randint(1, 6, n_samples),
        'age': np.random.uniform(0, 50, n_samples),
        'location_score': np.random.uniform(1, 10, n_samples)
    })
    
    # 生成目標變數(房價)
    house_data['price'] = (
        house_data['size'] * 500 +
        house_data['bedrooms'] * 10000 +
        house_data['location_score'] * 20000 -
        house_data['age'] * 500 +
        np.random.normal(0, 20000, n_samples)
    )
    
    # 建構迴歸模型
    regression_builder = PredictiveModelBuilder(problem_type='regression')
    
    X, y = regression_builder.feature_engineering(
        house_data,
        target_col='price',
        numeric_features=['size', 'bedrooms', 'age', 'location_score']
    )
    
    X_train, X_test, y_train, y_test = regression_builder.prepare_data(X, y)
    
    regression_builder.train_model(X_train, y_train, model_type='random_forest')
    
    metrics, y_pred = regression_builder.evaluate_regression(X_test, y_test)
    
    print("\n迴歸模型評估指標:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.2f}")
    
    # 交叉驗證
    cv_results = regression_builder.cross_validate(
        np.vstack([X_train, X_test]),
        np.concatenate([y_train, y_test]),
        cv=5
    )
    
    print(f"\n交叉驗證結果:")
    print(f"  平均分數: {cv_results['mean']:.2f}")
    print(f"  標準差: {cv_results['std']:.2f}")
    
    # 範例 2: 分類問題 - 客戶流失預測
    print("\n" + "=" * 60)
    print("範例 2: 分類問題 - 客戶流失預測")
    print("=" * 60)
    
    # 生成模擬資料
    customer_data = pd.DataFrame({
        'tenure_months': np.random.randint(1, 60, n_samples),
        'monthly_charges': np.random.uniform(20, 200, n_samples),
        'total_charges': np.random.uniform(100, 10000, n_samples),
        'contract_type': np.random.choice(['month', 'year', 'two_year'], n_samples),
        'payment_method': np.random.choice(['auto', 'manual'], n_samples)
    })
    
    # 生成目標變數(是否流失)
    churn_probability = (
        1 / (1 + np.exp(
            0.1 * customer_data['tenure_months'] -
            0.01 * customer_data['monthly_charges'] +
            np.random.normal(0, 1, n_samples)
        ))
    )
    customer_data['churn'] = (churn_probability > 0.5).astype(int)
    
    # 建構分類模型
    classification_builder = PredictiveModelBuilder(problem_type='classification')
    
    X, y = classification_builder.feature_engineering(
        customer_data,
        target_col='churn',
        numeric_features=['tenure_months', 'monthly_charges', 'total_charges'],
        categorical_features=['contract_type', 'payment_method']
    )
    
    X_train, X_test, y_train, y_test = classification_builder.prepare_data(X, y)
    
    classification_builder.train_model(X_train, y_train, model_type='random_forest')
    
    metrics, y_pred, cm = classification_builder.evaluate_classification(X_test, y_test)
    
    print("\n分類模型評估指標:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}")
    
    print("\n混淆矩陣:")
    print(cm)

這個完整的預測模型建構系統展示了從資料準備到模型評估的全流程。特徵工程模組處理資料清理、缺失值填補與類別編碼,標準化模組確保不同尺度的特徵能被模型公平對待。模型訓練支援多種演算法,從簡單的線性模型到複雜的集成學習方法。評估模組針對迴歸與分類問題提供了完整的指標體系,交叉驗證確保模型的泛化能力,超參數調整則追求最佳的模型效能。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 140

start
:定義商業目標;
note right
  - 預測問題定義
  - 成功指標設定
  - 商業價值評估
end note
:資料收集與探索;
:資料清理與前處理;
fork
  :處理缺失值;
fork again
  :處理異常值;
fork again
  :特徵編碼;
end fork
:特徵工程;
note right
  - 特徵選擇
  - 特徵轉換
  - 特徵生成
end note
:資料分割;
fork
  :訓練集 (70-80%);
fork again
  :驗證集 (10-15%);
fork again
  :測試集 (10-15%);
end fork
:選擇演算法;
if (問題類型?) then (迴歸)
  :線性迴歸;
  :決策樹迴歸;
  :隨機森林迴歸;
else (分類)
  :邏輯迴歸;
  :決策樹分類;
  :隨機森林分類;
endif
:模型訓練;
:模型評估;
if (效能滿意?) then (否)
  :超參數調整;
  :特徵優化;
else (是)
  :交叉驗證;
  if (泛化能力良好?) then (是)
    :模型部署;
    :持續監控;
  else (否)
    :重新訓練;
  endif
endif
stop

@enduml

監督式學習深度實踐

監督式學習是機器學習中應用最廣泛的範式,其核心在於從標記資料中學習輸入與輸出之間的映射關係。迴歸與分類是監督式學習的兩大主要任務,前者預測連續數值,後者預測離散類別。演算法的選擇需要考慮資料的規模、特徵的維度、問題的複雜度以及可解釋性的需求。

線性迴歸是最基礎的迴歸演算法,假設目標變數與特徵之間存在線性關係。雖然簡單,但在許多實務場景中仍然有效,特別是當資料關係確實接近線性時。決策樹與隨機森林能夠捕捉非線性關係,不需要特徵標準化,且能提供特徵重要性資訊。梯度提升樹如 XGBoost、LightGBM 在許多競賽中表現優異,但需要仔細調整超參數以避免過擬合。

分類問題的演算法選擇同樣豐富。邏輯迴歸適合二元分類且需要可解釋性的場景。支援向量機在高維度資料上表現良好。神經網路能夠學習極其複雜的決策邊界,但需要大量的訓練資料與計算資源。實務中,隨機森林與梯度提升樹往往是效能與複雜度平衡的較佳選擇。

from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, davies_bouldin_score
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class UnsupervisedLearningAnalyzer:
    """
    非監督式學習分析器
    提供聚類分析與降維功能
    """
    
    def __init__(self):
        self.model = None
        self.labels = None
        self.pca = None
    
    def kmeans_clustering(self, X, n_clusters=3, random_state=42):
        """
        K-Means 聚類分析
        
        Parameters:
        -----------
        X : np.ndarray
            特徵資料
        n_clusters : int
            聚類數量
        random_state : int
            隨機種子
        
        Returns:
        --------
        dict
            聚類結果與評估指標
        """
        logger.info(f"執行 K-Means 聚類分析 (k={n_clusters})")
        
        self.model = KMeans(
            n_clusters=n_clusters,
            random_state=random_state,
            n_init=10
        )
        
        self.labels = self.model.fit_predict(X)
        
        # 評估指標
        silhouette = silhouette_score(X, self.labels)
        davies_bouldin = davies_bouldin_score(X, self.labels)
        inertia = self.model.inertia_
        
        results = {
            'labels': self.labels,
            'centroids': self.model.cluster_centers_,
            'silhouette_score': silhouette,
            'davies_bouldin_score': davies_bouldin,
            'inertia': inertia
        }
        
        logger.info(f"輪廓係數: {silhouette:.4f}")
        logger.info(f"Davies-Bouldin 指數: {davies_bouldin:.4f}")
        
        return results
    
    def find_optimal_clusters(self, X, max_clusters=10):
        """
        尋找最佳聚類數量
        使用手肘法與輪廓係數
        
        Parameters:
        -----------
        X : np.ndarray
            特徵資料
        max_clusters : int
            測試的最大聚類數
        
        Returns:
        --------
        dict
            不同聚類數的評估結果
        """
        logger.info(f"尋找最佳聚類數 (測試 2 到 {max_clusters})")
        
        inertias = []
        silhouettes = []
        k_range = range(2, max_clusters + 1)
        
        for k in k_range:
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            labels = kmeans.fit_predict(X)
            
            inertias.append(kmeans.inertia_)
            silhouettes.append(silhouette_score(X, labels))
        
        # 視覺化
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 手肘圖
        axes[0].plot(k_range, inertias, 'bo-')
        axes[0].set_xlabel('聚類數量 (k)', fontsize=12, fontweight='bold')
        axes[0].set_ylabel('慣性 (Inertia)', fontsize=12, fontweight='bold')
        axes[0].set_title('手肘法', fontsize=14, fontweight='bold')
        axes[0].grid(True, alpha=0.3)
        
        # 輪廓係數圖
        axes[1].plot(k_range, silhouettes, 'ro-')
        axes[1].set_xlabel('聚類數量 (k)', fontsize=12, fontweight='bold')
        axes[1].set_ylabel('輪廓係數', fontsize=12, fontweight='bold')
        axes[1].set_title('輪廓係數分析', fontsize=14, fontweight='bold')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        return {
            'k_range': list(k_range),
            'inertias': inertias,
            'silhouettes': silhouettes,
            'optimal_k': k_range[np.argmax(silhouettes)]
        }
    
    def hierarchical_clustering(self, X, n_clusters=3, linkage='ward'):
        """
        階層式聚類分析
        
        Parameters:
        -----------
        X : np.ndarray
            特徵資料
        n_clusters : int
            聚類數量
        linkage : str
            連結方法: 'ward', 'complete', 'average', 'single'
        
        Returns:
        --------
        dict
            聚類結果
        """
        logger.info(f"執行階層式聚類 (n_clusters={n_clusters}, linkage={linkage})")
        
        self.model = AgglomerativeClustering(
            n_clusters=n_clusters,
            linkage=linkage
        )
        
        self.labels = self.model.fit_predict(X)
        
        silhouette = silhouette_score(X, self.labels)
        
        results = {
            'labels': self.labels,
            'silhouette_score': silhouette
        }
        
        logger.info(f"輪廓係數: {silhouette:.4f}")
        
        return results
    
    def dbscan_clustering(self, X, eps=0.5, min_samples=5):
        """
        DBSCAN 密度聚類
        能夠發現任意形狀的聚類並識別雜訊點
        
        Parameters:
        -----------
        X : np.ndarray
            特徵資料
        eps : float
            鄰域半徑
        min_samples : int
            核心點的最小鄰居數
        
        Returns:
        --------
        dict
            聚類結果
        """
        logger.info(f"執行 DBSCAN 聚類 (eps={eps}, min_samples={min_samples})")
        
        self.model = DBSCAN(eps=eps, min_samples=min_samples)
        self.labels = self.model.fit_predict(X)
        
        n_clusters = len(set(self.labels)) - (1 if -1 in self.labels else 0)
        n_noise = list(self.labels).count(-1)
        
        results = {
            'labels': self.labels,
            'n_clusters': n_clusters,
            'n_noise': n_noise
        }
        
        if n_clusters > 1:
            # 計算輪廓係數(排除雜訊點)
            mask = self.labels != -1
            if np.sum(mask) > 0:
                silhouette = silhouette_score(X[mask], self.labels[mask])
                results['silhouette_score'] = silhouette
                logger.info(f"輪廓係數: {silhouette:.4f}")
        
        logger.info(f"發現 {n_clusters} 個聚類, {n_noise} 個雜訊點")
        
        return results
    
    def dimensionality_reduction(self, X, n_components=2):
        """
        主成分分析降維
        
        Parameters:
        -----------
        X : np.ndarray
            特徵資料
        n_components : int
            降維後的維度數
        
        Returns:
        --------
        tuple
            (降維後的資料, 解釋方差比例)
        """
        logger.info(f"執行 PCA 降維 (n_components={n_components})")
        
        self.pca = PCA(n_components=n_components)
        X_reduced = self.pca.fit_transform(X)
        
        explained_variance = self.pca.explained_variance_ratio_
        cumulative_variance = np.cumsum(explained_variance)
        
        logger.info(f"解釋方差比例: {explained_variance}")
        logger.info(f"累積解釋方差: {cumulative_variance[-1]:.4f}")
        
        return X_reduced, explained_variance
    
    def visualize_clusters_2d(self, X, labels, title="聚類結果視覺化"):
        """
        2D 聚類視覺化
        """
        plt.figure(figsize=(10, 7))
        
        unique_labels = np.unique(labels)
        colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
        
        for label, color in zip(unique_labels, colors):
            if label == -1:
                # 雜訊點使用黑色
                color = 'black'
                marker = 'x'
            else:
                marker = 'o'
            
            mask = labels == label
            plt.scatter(
                X[mask, 0],
                X[mask, 1],
                c=[color],
                label=f'聚類 {label}' if label != -1 else '雜訊',
                marker=marker,
                s=50,
                alpha=0.6,
                edgecolors='black',
                linewidth=0.5
            )
        
        plt.xlabel('特徵 1', fontsize=12, fontweight='bold')
        plt.ylabel('特徵 2', fontsize=12, fontweight='bold')
        plt.title(title, fontsize=14, fontweight='bold')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

# 使用範例
if __name__ == "__main__":
    
    print("=" * 60)
    print("非監督式學習範例 - 客戶分群分析")
    print("=" * 60)
    
    # 生成模擬客戶資料
    np.random.seed(42)
    n_customers = 500
    
    # 三個客戶群體
    # 群體 1: 高價值客戶
    group1 = np.random.multivariate_normal(
        [80, 200], [[100, 50], [50, 400]], size=150
    )
    
    # 群體 2: 中價值客戶
    group2 = np.random.multivariate_normal(
        [50, 100], [[80, 30], [30, 200]], size=200
    )
    
    # 群體 3: 低價值客戶
    group3 = np.random.multivariate_normal(
        [20, 50], [[50, 20], [20, 100]], size=150
    )
    
    X = np.vstack([group1, group2, group3])
    
    # 初始化分析器
    analyzer = UnsupervisedLearningAnalyzer()
    
    # 尋找最佳聚類數
    optimal_results = analyzer.find_optimal_clusters(X, max_clusters=8)
    print(f"\n建議的最佳聚類數: {optimal_results['optimal_k']}")
    
    # K-Means 聚類
    kmeans_results = analyzer.kmeans_clustering(X, n_clusters=3)
    print(f"\nK-Means 聚類結果:")
    print(f"  輪廓係數: {kmeans_results['silhouette_score']:.4f}")
    print(f"  Davies-Bouldin 指數: {kmeans_results['davies_bouldin_score']:.4f}")
    
    # 視覺化
    analyzer.visualize_clusters_2d(X, kmeans_results['labels'], "K-Means 客戶分群")
    
    # DBSCAN 聚類
    dbscan_results = analyzer.dbscan_clustering(X, eps=15, min_samples=5)
    print(f"\nDBSCAN 聚類結果:")
    print(f"  發現聚類數: {dbscan_results['n_clusters']}")
    print(f"  雜訊點數: {dbscan_results['n_noise']}")
    
    analyzer.visualize_clusters_2d(X, dbscan_results['labels'], "DBSCAN 客戶分群")

非監督式學習在沒有標記資料的情況下發現資料的內在結構。K-Means 是最常用的聚類演算法,透過最小化聚類內的方差來分組資料點。然而,它需要預先指定聚類數量,且對初始質心的選擇敏感。手肘法與輪廓係數分析是確定最佳聚類數的常用方法。階層式聚類建立資料點的樹狀結構,不需要預先指定聚類數,但計算複雜度較高。DBSCAN 能夠發現任意形狀的聚類並自動識別雜訊點,但需要仔細調整鄰域半徑與最小點數參數。

AI 倫理與負責任的機器學習

隨著機器學習模型在關鍵決策中的廣泛應用,模型的公平性、透明度與可解釋性已成為不可忽視的議題。一個在銀行貸款審批中使用的模型,若存在性別或種族偏見,將直接影響人們的生活。醫療診斷輔助系統的錯誤預測可能危及患者生命。司法系統中的風險評估工具若不公平,將違反司法正義的基本原則。因此,建構負責任的機器學習系統需要在技術與倫理兩個維度都進行深入考量。

資料偏見是機器學習公平性問題的主要根源。訓練資料可能反映了歷史上的不公平現象,如某些職業中的性別失衡。模型會學習並延續這些偏見,甚至可能放大它們。解決資料偏見需要在資料收集階段就建立公平性意識,確保資料的代表性。同時,需要使用技術手段檢測與緩解偏見,如重新加權樣本、對抗性去偏見等方法。

模型的可解釋性對於建立信任與問責機制至關重要。黑箱模型雖然可能有更高的預測準確率,但缺乏可解釋性使得我們無法理解模型的決策依據,更難以發現與修正模型的問題。LIME、SHAP 等可解釋性技術能夠提供局部或全局的模型解釋,協助理解模型的行為。在高風險應用中,應該優先選擇本質上更可解釋的模型,如決策樹、線性模型等。

資料隱私保護是另一個重要的倫理議題。模型訓練需要大量的個人資料,如何在利用資料價值的同時保護個人隱私是一個挑戰。差分隱私技術透過在資料或模型中加入雜訊,在數學上保證單一個體資料的洩露風險受到控制。聯邦學習允許在不集中資料的情況下訓練模型,讓資料保留在原始位置。這些技術的發展為隱私保護的機器學習提供了新的可能性。

模型的持續監控與審計機制是確保負責任 AI 的重要保障。模型部署後,其行為可能因資料分佈的變化而改變,需要持續監控模型的效能與公平性指標。定期的模型審計能夠發現潛在的問題,及時採取修正措施。建立清晰的問責機制,明確模型開發者、部署者與使用者的責任,是負責任 AI 治理的基礎。

玄貓認為,技術的進步不應以犧牲倫理為代價。機器學習從業者需要培養倫理意識,在模型開發的每個階段都考慮公平性、透明度與隱私保護。組織應該建立 AI 倫理委員會,審查高風險 AI 應用的開發與部署。政府與產業需要共同制定 AI 倫理準則與監管框架,在促進創新與保護公眾利益之間取得平衡。只有當技術與倫理並重,機器學習才能真正成為造福人類的工具,而非製造新問題的根源。

預測模型的建構是科學與藝術的結合,需要紮實的數學基礎、豐富的實務經驗,以及對倫理議題的深刻認識。從資料準備到模型部署,從演算法選擇到效能評估,每個環節都可能影響最終模型的品質。持續學習新的技術與方法,關注領域內的最新發展,同時保持對倫理議題的敏感度,是每個機器學習從業者應該追求的目標。在資料驅動決策已成為常態的今天,建構負責任、高效能且符合倫理規範的預測模型,不僅是技術挑戰,更是對社會的責任。