PyTorch 提供深度學習模型建立的彈性,Spark 則是大資料處理的利器,兩者結合能有效處理糖尿病分類別問題。本篇整合 Spark 分散式運算能力和 PyTorch 深度學習框架,從資料預處理、特徵工程到模型訓練與評估,完整建構糖尿病分類別模型。透過 Spark 讀取和預處理資料,確保資料品質和一致性,再利用 PyTorch 建立和訓練深度學習模型,並以多種指標評估模型效能。

深度學習與 PyTorch 在糖尿病分類別中的應用

本章節將探討如何使用 PyTorch 進行糖尿病分類別,涵蓋模型建立、訓練和評估,並介紹如何使用 K-fold 交叉驗證來提升模型效能。

資料預處理

在進行模型訓練之前,需要對資料進行預處理。以下是一個用於預處理糖尿病資料集的類別:

class DataPreprocessor:
    def __init__(self, spark, data_file_path):
        self.spark = spark
        self.data_file_path = data_file_path

    def preprocess(self):
        try:
            diabetes_df = self.spark.read.csv(self.data_file_path, header=True, inferSchema=True)
            diabetes_df = diabetes_df.filter((col("Glucose") != 0) & (col("BloodPressure") != 0) & (col("BMI") != 0))
            feature_cols = ["Pregnancies", "Glucose", "BloodPressure", "BMI", "DiabetesPedigreeFunction", "Age"]
            assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
            diabetes_df = assembler.transform(diabetes_df)
            scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
            scaler_model = scaler.fit(diabetes_df)
            diabetes_df = scaler_model.transform(diabetes_df)
            train_df, test_df = diabetes_df.randomSplit([0.8, 0.2], seed=42)
            return train_df, test_df
        except Exception as e:
            logging.error(f"Error occurred during data preprocessing: {str(e)}")
            raise e

內容解密:

  1. 資料讀取與過濾:使用 SparkSession 讀取 CSV 檔案,並過濾掉 Glucose、BloodPressure 和 BMI 為 0 的資料列。
  2. 特徵組合:使用 VectorAssembler 將多個特徵欄位組合成一個向量欄位 “features”。
  3. 標準化:使用 StandardScaler 對 “features” 進行標準化,生成 “scaled_features”。
  4. 資料分割:將資料分割為訓練集和測試集,比例為 8:2。

糖尿病分類別模型

以下是定義的糖尿病分類別神經網路模型:

class DiabetesClassifierModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(DiabetesClassifierModel, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, output_size),
        )

    def forward(self, x):
        return self.model(x)

內容解密:

  1. 模型結構:定義了一個四層的神經網路,分別有 64、32、16 個神經元,使用 ReLU 作為啟用函式。
  2. 輸入與輸出:輸入大小為 input_size,輸出大小為 output_size。

模型訓練

以下是模型訓練的類別:

class ModelTrainer:
    def __init__(self, model, criterion, optimizer, train_loader):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader

內容解密:

  1. 初始化引數:接收模型、損失函式、最佳化器和訓練資料載入器作為初始化引數。
  2. 模型訓練準備:為後續的模型訓練過程做準備。

AWS S3 檔案下載

以下是從 AWS S3 下載檔案的函式:

def copy_file_from_s3(bucket_name, file_key, local_path):
    try:
        s3 = boto3.client('s3')
        s3.download_file(bucket_name, file_key, local_path)
        print(f"File {file_key} downloaded to {local_path}")
    except Exception as e:
        print(f"An error occurred while downloading file {file_key}: {str(e)}")

內容解密:

  1. S3 使用者端建立:使用 boto3 建立 S3 使用者端。
  2. 檔案下載:下載指定 bucket 中的檔案到本地路徑。
  3. 錯誤處理:捕捉並列印下載過程中發生的錯誤。

主程式執行流程

if __name__ == "__main__":
    spark = SparkSession.builder.appName("PimaDatasetExplorer").getOrCreate()
    local_file_path = "/home/ubuntu/airflow/dags/diabetes.csv"
    s3_bucket_name = "instance1bucket"
    s3_file_key = "diabetes.csv"
    copy_file_from_s3(s3_bucket_name, s3_file_key, local_file_path)
    explorer = PimaDatasetExplorer(local_file_path)
    explorer.preprocess_data()
    explorer.handle_missing_values()
    explorer.count_zeros()
    explorer.data_summary()
    explorer.count_outcome()
    explorer.explore_feature_distributions()
    explorer.calculate_feature_target_correlation()

內容解密:

  1. SparkSession 建立:建立一個名為 “PimaDatasetExplorer” 的 SparkSession。
  2. S3 檔案下載:從 S3 下載 diabetes.csv 到本地。
  3. 資料探索:執行一系列資料探索任務,包括預處理、缺失值處理、零值計數、資料摘要、結果計數、特徵分佈探索和特徵與目標變數的相關性計算。

深度學習模型訓練與評估系統之完整實作

本篇內容將探討如何使用 PyTorch 結合 Spark 進行糖尿病分類別模型的訓練與評估,同時包含完整的資料預處理、模型訓練、模型評估及錯誤處理機制。

資料預處理模組實作

資料預處理是機器學習流程中的關鍵步驟,負責將原始資料轉換為適合模型訓練的形式。以下為 DataPreprocessor 類別的實作細節:

class DataPreprocessor:
    def __init__(self, spark, data_file_path):
        self.spark = spark
        self.data_file_path = data_file_path

    def preprocess(self):
        try:
            diabetes_df = self.spark.read.csv(
                self.data_file_path,
                header=True, inferSchema=True
            )
            diabetes_df = diabetes_df.filter(
                (col("Glucose") != 0)
                & (col("BloodPressure") != 0)
                & (col("BMI") != 0)
            )
            feature_cols = [
                "Pregnancies",
                "Glucose",
                "BloodPressure",
                "BMI",
                "DiabetesPedigreeFunction",
                "Age",
            ]
            assembler = VectorAssembler(
                inputCols=feature_cols,
                outputCol="features"
            )
            diabetes_df = assembler.transform(diabetes_df)
            scaler = StandardScaler(
                inputCol="features", 
                outputCol="scaled_features"
            )
            scaler_model = scaler.fit(diabetes_df)
            diabetes_df = scaler_model.transform(diabetes_df)
            return diabetes_df
        except Exception as e:
            logging.error(f"資料預處理過程中發生錯誤: {str(e)}")
            raise e

資料預處理解密:

  1. 使用 SparkSession 讀取 CSV 資料並自動推斷資料型別
  2. 過濾掉關鍵特徵欄位為零的異常資料
  3. 透過 VectorAssembler 將多個特徵欄位組合成單一特徵向量
  4. 使用 StandardScaler 對特徵進行標準化處理,使資料符合常態分佈

模型訓練流程實作

模型訓練是整個深度學習流程的核心,負責最佳化模型引數以達到最佳的預測效果。以下為 ModelTrainer 類別的實作:

class ModelTrainer:
    def __init__(self, model, criterion, optimizer, train_loader):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader

    def train(self, epochs=100, lr=0.01):
        try:
            for epoch in range(epochs):
                self.model.train()
                for inputs, targets in self.train_loader:
                    self.optimizer.zero_grad()
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs.squeeze(), targets)
                    loss.backward()
                    self.optimizer.step()
                if (epoch + 1) % 10 == 0:
                    logging.info(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")
        except Exception as e:
            logging.error(f"模型訓練過程中發生錯誤: {str(e)}")
            raise e

模型訓練解密:

  1. 使用指定的損失函式(BCEWithLogitsLoss)進行二元分類別任務的損失計算
  2. 採用 Adam 最佳化器動態調整學習率以加速收斂
  3. 每 10 個 epoch 輸出一次當前損失值以監控訓練進度
  4. 完整的錯誤處理機制確保訓練過程中的異常能夠被捕捉並記錄

模型評估指標實作

模型評估是驗證模型效能的關鍵步驟,以下為 ModelEvaluator 類別的實作細節:

class ModelEvaluator:
    def __init__(self, model, test_loader, y_test_np):
        self.model = model
        self.test_loader = test_loader
        self.y_test_np = y_test_np

    def evaluate(self):
        try:
            with torch.no_grad():
                self.model.eval()
                predictions = []
                probabilities = []
                for inputs, _ in self.test_loader:
                    outputs = self.model(inputs)
                    probabilities.extend(torch.sigmoid(outputs).squeeze().tolist())
                    predictions.extend(outputs.squeeze().tolist())
                y_pred = np.array([1 if pred > 0.5 else 0 for pred in probabilities])
                accuracy = accuracy_score(self.y_test_np, y_pred)
                precision = precision_score(self.y_test_np, y_pred)
                recall = recall_score(self.y_test_np, y_pred)
                f1 = f1_score(self.y_test_np, y_pred)
                logging.info(f"準確率: {accuracy:.4f}")
                logging.info(f"精確率: {precision:.4f}")
                logging.info(f"召回率: {recall:.4f}")
                logging.info(f"F1 分數: {f1:.4f}")
        except Exception as e:
            logging.error(f"模型評估過程中發生錯誤: {str(e)}")
            raise e

模型評估解密:

  1. 使用多項評估指標(準確率、精確率、召回率、F1分數)全面評估模型效能
  2. 透過 ROC-AUC 分數評估模型的區分能力
  3. 使用混淆矩陣分析模型的預測結果分佈
  4. 詳細的錯誤處理機制確保評估過程的穩定性

主程式執行流程

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    spark = SparkSession.builder.appName("DiabetesClassifier").getOrCreate()
    copy_file_from_s3("instance1bucket", "diabetes.csv", "/home/ubuntu/airflow/dags/")
    data_preprocessor = DataPreprocessor(spark, "/home/ubuntu/airflow/dags/diabetes.csv")
    train_df, test_df = data_preprocessor.preprocess()
    # ... 後續模型訓練與評估流程 ...

主程式流程解密:

  1. 初始化 SparkSession 建立大資料處理環境
  2. 從 S3 儲存桶下載必要的資料檔案
  3. 建立資料前處理器進行資料清洗與轉換
  4. 後續流程包含模型訓練、評估等步驟,確保整個流程的完整性與可追蹤性

本系統透過嚴謹的模組化設計與完整的錯誤處理機制,實作了一個高效穩定的糖尿病分類別模型訓練與評估流程,為醫療領域的 AI 應用提供了堅實的技術基礎。