PyTorch 和 TensorFlow 是目前深度學習領域最常用的兩個框架,本文將分別使用這兩個框架來建構糖尿病分類別器。首先,我們會從 S3 下載資料集,並使用 Spark 進行預處理,包含資料清理、特徵縮放等步驟。接著,使用 PyTorch 建立深度學習模型,並使用 K-fold 跨驗證方法進行訓練和評估,確保模型的泛化能力。同時,我們也使用 TensorFlow 和 Keras API 構建相同的模型,並比較兩者的效能差異。過程中,我們會使用多種評估指標,例如準確率、精確率、召回率和 F1 分數,來評估模型的預測效能,並透過日誌記錄和自定義回撥函式監控訓練過程。

深度學習糖尿病分類別器實作與評估

在現代醫療保健領域中,利用機器學習技術進行疾病預測已成為重要的研究方向。本文將探討如何使用 PyTorch 實作一個糖尿病分類別器,並結合 K-fold 跨驗證技術進行模型訓練與評估。

資料預處理與載入

首先,我們需要從 S3 儲存桶中下載糖尿病資料集(diabetes.csv)至本地目錄。這個過程由 copy_file_from_s3 函式負責實作:

def copy_file_from_s3(bucket_name, file_name, local_dir):
    """
    從 S3 儲存桶複製檔案到本地目錄。
    
    Args:
        bucket_name (str): S3 儲存桶名稱。
        file_name (str): 要複製的檔案名稱。
        local_dir (str): 本地存放目錄。
    """
    try:
        s3 = boto3.client("s3")
        s3.download_file(bucket_name, file_name, local_dir + file_name)
    except Exception as e:
        logging.error(f"檔案複製過程中發生錯誤:{str(e)}")
        raise e

內容解密:

  1. 使用 boto3 函式庫與 S3 服務進行互動。
  2. s3.download_file 方法負責下載指設定檔案。
  3. 錯誤處理機制確保在發生問題時能夠正確記錄日誌並丟擲例外。

資料預處理流程

下載完成後,DataPreprocessor 類別負責進行資料預處理:

data_preprocessor = DataPreprocessor(spark, "/home/ubuntu/airflow/dags/diabetes.csv")
diabetes_df = data_preprocessor.preprocess()

內容解密:

  1. 使用 Spark 進行資料處理。
  2. preprocess 方法包含資料清理、特徵縮放等步驟。
  3. 處理後的資料儲存在 diabetes_df DataFrame 中。

K-fold 跨驗證實作

為了提升模型評估的可靠性,我們採用 K-fold 跨驗證技術:

k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
fold_results = []
for fold, (train_idx, test_idx) in enumerate(kfold.split(X_np)):
    # 分割訓練與測試資料
    X_train, X_test = X_np[train_idx], X_np[test_idx]
    y_train, y_test = y_np[train_idx], y_np[test_idx]
    # 建立 PyTorch 資料集
    train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
    test_dataset = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
    # 建立 DataLoader
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

內容解密:

  1. 設定 K-fold 引數(K=5)。
  2. 使用 KFold 類別進行資料分割。
  3. 將 numpy array 轉換為 PyTorch TensorDataset。
  4. 建立 DataLoader 以批次方式處理資料。

模型訓練流程

模型訓練由 ModelTrainer 類別負責:

class ModelTrainer:
    def __init__(self, model, criterion, optimizer, train_loader):
        """
        初始化模型訓練器。
        
        Args:
            model (torch.nn.Module): 神經網路模型。
            criterion (torch.nn.Module): 損失函式。
            optimizer (torch.optim.Optimizer): 最佳化器。
            train_loader (torch.utils.data.DataLoader): 訓練資料載入器。
        """
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader
    
    def train(self, epochs=100, lr=0.01):
        """
        訓練模型。
        
        Args:
            epochs (int): 訓練週期數。
            lr (float): 學習率。
        """
        try:
            for epoch in range(epochs):
                self.model.train()
                for inputs, targets in self.train_loader:
                    self.optimizer.zero_grad()
                    outputs = self.model(inputs)
                    loss = self.criterion(outputs.squeeze(), targets)
                    loss.backward()
                    self.optimizer.step()
                if (epoch + 1) % 10 == 0:
                    logging.info(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")
        except Exception as e:
            logging.error(f"模型訓練過程中發生錯誤:{str(e)}")
            raise e

內容解密:

  1. 初始化模型、損失函式、最佳化器和訓練資料載入器。
  2. train 方法中執行實際的訓練迴圈。
  3. 使用 BCEWithLogitsLoss 作為二元分類別的損失函式。
  4. 每 10 個 epoch 紀錄一次損失值。

模型評估指標

模型評估由 ModelEvaluator 類別負責:

class ModelEvaluator:
    def evaluate(self):
        """
        評估模型效能。
        """
        try:
            with torch.no_grad():
                self.model.eval()
                predictions = []
                probabilities = []
                for inputs, _ in self.test_loader:
                    outputs = self.model(inputs)
                    probabilities.extend(torch.sigmoid(outputs).squeeze().tolist())
                    predictions.extend(outputs.squeeze().tolist())
                # 計算評估指標
                y_pred = np.array([1 if pred > 0.5 else 0 for pred in probabilities])
                accuracy = accuracy_score(self.y_test_np, y_pred)
                precision = precision_score(self.y_test_np, y_pred)
                recall = recall_score(self.y_test_np, y_pred)
                f1 = f1_score(self.y_test_np, y_pred)
                # 紀錄評估結果
                logging.info(f"準確率:{accuracy:.4f}")
                logging.info(f"精確率:{precision:.4f}")
                logging.info(f"召回率:{recall:.4f}")
                logging.info(f"F1 分數:{f1:.4f}")
        except Exception as e:
            logging.error(f"模型評估過程中發生錯誤:{str(e)}")
            raise e

內容解密:

  1. 在評估模式下進行模型推論。
  2. 計算並記錄多項評估指標(準確率、精確率、召回率、F1 分數)。
  3. 使用混淆矩陣和 ROC-AUC 分數進行進一步分析。

使用TensorFlow進行深度學習分類別

在前一章中,我們使用PyTorch構建、訓練並評估了一個多層深度學習模型,用於預測糖尿病診斷的機率。本章將繼續探索深度學習分類別的主題,但這次使用的是TensorFlow框架。我們將展示如何複製與PyTorch相同的步驟,以實作兩種模型的效能比較。

資料集介紹

我們在前一章已經詳細介紹了Pima糖尿病資料集。在本文中,我們將編寫程式碼,將CSV檔案從S3儲存桶複製到EC2本地目錄,然後列印預出前十條記錄,以重新整理我們對特徵和目標變數的記憶。

from pyspark.sql import SparkSession
import os

# 建立SparkSession
spark = SparkSession.builder \
    .appName("Read CSV from S3") \
    .getOrCreate()

# 定義S3儲存桶和檔案詳情
bucket_name = "instance1bucket"
file_key = "diabetes.csv"
local_file_path = "/home/ubuntu/airflow/dags/diabetes.csv"

# 使用AWS CLI命令將檔案從S3複製到本地目錄
os.system(f"aws s3 cp s3://{bucket_name}/{file_key} {local_file_path}")

# 讀取CSV檔案
df = spark.read.csv(local_file_path, header=True, inferSchema=True)

# 選擇特定的列
selected_columns = [
    'Pregnancies',
    'Glucose',
    'BloodPressure',
    'BMI',
    'DiabetesPedigreeFunction',
    'Age',
    'Outcome'
]

df_selected = df.select(selected_columns)

# 顯示前10條記錄
df_selected.show(10)

# 停止SparkSession
spark.stop()

內容解密:

  1. 匯入必要的函式庫:我們匯入了pyspark.sql中的SparkSession類別和內建的os模組。SparkSession是使用Dataset和DataFrame API程式設計Spark的入口點,而os模組提供了使用作業系統相關功能的便攜方式。
  2. 建立SparkSession:我們建立了一個新的SparkSession,並指定了應用程式名稱為“Read CSV from S3”。如果已經存在一個SparkSession,則會檢索現有的那一個。
  3. 定義S3儲存桶和檔案詳情:我們定義了S3儲存桶名稱、檔案鍵(儲存桶內檔案的路徑)和本地檔案路徑,用於存放從S3複製的檔案。
  4. 使用AWS CLI命令複製檔案:透過os.system函式執行shell命令,將diabetes.csv檔案從S3儲存桶複製到本地目錄。
  5. 讀取CSV檔案:使用Spark讀取CSV檔案,並推斷其結構。
  6. 選擇特定的列:我們選擇了與糖尿病預測相關的列,包括懷孕次數、葡萄糖水平、血壓、BMI、糖尿病譜系功能、年齡和結果。
  7. 顯示前10條記錄:使用show方法顯示資料框的前10條記錄,以驗證資料是否正確載入。
  8. 停止SparkSession:最後,停止SparkSession以釋放資源。

本章將繼續使用TensorFlow進行深度學習分類別模型的構建、訓練和評估,並與PyTorch模型的效能進行比較。此外,我們還將介紹使用keras_tuner進行超引數調優,以自動搜尋最佳模型架構和學習率,從而可能獲得比手動選擇的超引數更好的效能指標。

利用 TensorFlow 進行糖尿病預測的深度學習模型

本章節將介紹如何使用 TensorFlow 訓練深度學習模型,以預測皮馬印第安女性患有糖尿病的可能性。我們將遵循與前一章相似的步驟,但這次使用 TensorFlow 和 Keras API 來構建和訓練模型。

步驟 1:匯入必要的函式庫

首先,我們需要匯入必要的函式庫,包括 PySpark 用於資料預處理、TensorFlow 用於構建和訓練深度學習模型,以及 Scikit-Learn 用於評估模型效能。

import os
import logging
from pyspark.sql import SparkSession
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

內容解密:

  • oslogging 用於處理作業系統相關任務和日誌記錄。
  • SparkSession 是 PySpark 的入口點,用於建立 DataFrame 和執行 Spark 作業。
  • tensorflowkeras 用於構建和訓練深度學習模型。
  • sklearn.metrics 中的函式用於計算模型的評估指標,如準確率、精確率、召回率和 F1 分數。

步驟 2:組態日誌記錄

組態日誌記錄對於除錯和監控模型訓練過程非常重要。

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

內容解密:

  • logging.basicConfig(level=logging.INFO) 設定日誌記錄的級別為 INFO,意味著 INFO 級別及以上的日誌將被記錄。
  • logger = logging.getLogger(__name__) 建立一個日誌記錄器,用於記錄日誌。

步驟 3:自定義回撥函式

自定義回撥函式可以在模型訓練過程中列印 epoch 資訊。

class CustomCallback(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        logger.info(f"Epoch {epoch+1}, Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}")

內容解密:

  • CustomCallback 類別繼承自 keras.callbacks.Callback,允許我們在訓練過程中執行自定義操作。
  • on_epoch_end 方法在每個 epoch 結束時被呼叫,用於列印當前的 epoch 號、損失值和準確率。

步驟 4:資料預處理

使用 PySpark 對糖尿病資料集進行預處理,包括讀取資料、過濾無效資料、特徵工程、資料標準化以及將資料分割為訓練集和測試集。

# 初始化 SparkSession
spark = SparkSession.builder.appName("DiabetesPrediction").getOrCreate()

# 從 S3 下載資料集到本地
bucket_name = "your-bucket-name"
file_key = "path/to/your/dataset.csv"
local_file_path = "/tmp/dataset.csv"
os.system(f"aws s3 cp s3://{bucket_name}/{file_key} {local_file_path}")

# 讀取 CSV 檔案到 DataFrame
df = spark.read.csv(local_file_path, header=True, inferSchema=True)

# 選擇相關列
selected_columns = ['Pregnancies', 'Glucose', 'BloodPressure', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
df_selected = df.select(selected_columns)

# 顯示前 10 筆記錄
df_selected.show(10)

# 停止 SparkSession
spark.stop()

內容解密:

  • 這段程式碼首先初始化一個 SparkSession,然後從 S3 下載資料集到本地。
  • 使用 spark.read.csv 方法讀取 CSV 檔案到 DataFrame,並選擇相關列。
  • df_selected.show(10) 用於顯示 DataFrame 的前 10 筆記錄。
  • 最後,停止 SparkSession 以釋放資源。

步驟 5:定義和訓練模型

定義 TensorFlow (Keras) 順序模型,並使用預處理後的資料進行訓練。

# 定義模型架構
model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(7,)),
    keras.layers.Dense(32, activation='relu'),
    keras.layers.Dense(1, activation='sigmoid')
])

# 編譯模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 訓練模型
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test), callbacks=[CustomCallback()])

內容解密:

  • 使用 keras.Sequential 定義一個順序模型,包含多個全連線層(Dense layers)。
  • model.compile 方法用於組態模型的最佳化器、損失函式和評估指標。
  • model.fit 方法用於訓練模型,傳入訓練資料、epoch 數、批次大小和驗證資料,並使用自定義回撥函式。