PyTorch 和 TensorFlow 是目前深度學習領域最常用的兩個框架,本文將分別使用這兩個框架來建構糖尿病分類別器。首先,我們會從 S3 下載資料集,並使用 Spark 進行預處理,包含資料清理、特徵縮放等步驟。接著,使用 PyTorch 建立深度學習模型,並使用 K-fold 跨驗證方法進行訓練和評估,確保模型的泛化能力。同時,我們也使用 TensorFlow 和 Keras API 構建相同的模型,並比較兩者的效能差異。過程中,我們會使用多種評估指標,例如準確率、精確率、召回率和 F1 分數,來評估模型的預測效能,並透過日誌記錄和自定義回撥函式監控訓練過程。
深度學習糖尿病分類別器實作與評估
在現代醫療保健領域中,利用機器學習技術進行疾病預測已成為重要的研究方向。本文將探討如何使用 PyTorch 實作一個糖尿病分類別器,並結合 K-fold 跨驗證技術進行模型訓練與評估。
資料預處理與載入
首先,我們需要從 S3 儲存桶中下載糖尿病資料集(diabetes.csv)至本地目錄。這個過程由 copy_file_from_s3 函式負責實作:
def copy_file_from_s3(bucket_name, file_name, local_dir):
"""
從 S3 儲存桶複製檔案到本地目錄。
Args:
bucket_name (str): S3 儲存桶名稱。
file_name (str): 要複製的檔案名稱。
local_dir (str): 本地存放目錄。
"""
try:
s3 = boto3.client("s3")
s3.download_file(bucket_name, file_name, local_dir + file_name)
except Exception as e:
logging.error(f"檔案複製過程中發生錯誤:{str(e)}")
raise e
內容解密:
- 使用
boto3函式庫與 S3 服務進行互動。 s3.download_file方法負責下載指設定檔案。- 錯誤處理機制確保在發生問題時能夠正確記錄日誌並丟擲例外。
資料預處理流程
下載完成後,DataPreprocessor 類別負責進行資料預處理:
data_preprocessor = DataPreprocessor(spark, "/home/ubuntu/airflow/dags/diabetes.csv")
diabetes_df = data_preprocessor.preprocess()
內容解密:
- 使用 Spark 進行資料處理。
preprocess方法包含資料清理、特徵縮放等步驟。- 處理後的資料儲存在
diabetes_dfDataFrame 中。
K-fold 跨驗證實作
為了提升模型評估的可靠性,我們採用 K-fold 跨驗證技術:
k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
fold_results = []
for fold, (train_idx, test_idx) in enumerate(kfold.split(X_np)):
# 分割訓練與測試資料
X_train, X_test = X_np[train_idx], X_np[test_idx]
y_train, y_test = y_np[train_idx], y_np[test_idx]
# 建立 PyTorch 資料集
train_dataset = TensorDataset(torch.tensor(X_train), torch.tensor(y_train))
test_dataset = TensorDataset(torch.tensor(X_test), torch.tensor(y_test))
# 建立 DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
內容解密:
- 設定 K-fold 引數(K=5)。
- 使用
KFold類別進行資料分割。 - 將 numpy array 轉換為 PyTorch TensorDataset。
- 建立 DataLoader 以批次方式處理資料。
模型訓練流程
模型訓練由 ModelTrainer 類別負責:
class ModelTrainer:
def __init__(self, model, criterion, optimizer, train_loader):
"""
初始化模型訓練器。
Args:
model (torch.nn.Module): 神經網路模型。
criterion (torch.nn.Module): 損失函式。
optimizer (torch.optim.Optimizer): 最佳化器。
train_loader (torch.utils.data.DataLoader): 訓練資料載入器。
"""
self.model = model
self.criterion = criterion
self.optimizer = optimizer
self.train_loader = train_loader
def train(self, epochs=100, lr=0.01):
"""
訓練模型。
Args:
epochs (int): 訓練週期數。
lr (float): 學習率。
"""
try:
for epoch in range(epochs):
self.model.train()
for inputs, targets in self.train_loader:
self.optimizer.zero_grad()
outputs = self.model(inputs)
loss = self.criterion(outputs.squeeze(), targets)
loss.backward()
self.optimizer.step()
if (epoch + 1) % 10 == 0:
logging.info(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")
except Exception as e:
logging.error(f"模型訓練過程中發生錯誤:{str(e)}")
raise e
內容解密:
- 初始化模型、損失函式、最佳化器和訓練資料載入器。
- 在
train方法中執行實際的訓練迴圈。 - 使用 BCEWithLogitsLoss 作為二元分類別的損失函式。
- 每 10 個 epoch 紀錄一次損失值。
模型評估指標
模型評估由 ModelEvaluator 類別負責:
class ModelEvaluator:
def evaluate(self):
"""
評估模型效能。
"""
try:
with torch.no_grad():
self.model.eval()
predictions = []
probabilities = []
for inputs, _ in self.test_loader:
outputs = self.model(inputs)
probabilities.extend(torch.sigmoid(outputs).squeeze().tolist())
predictions.extend(outputs.squeeze().tolist())
# 計算評估指標
y_pred = np.array([1 if pred > 0.5 else 0 for pred in probabilities])
accuracy = accuracy_score(self.y_test_np, y_pred)
precision = precision_score(self.y_test_np, y_pred)
recall = recall_score(self.y_test_np, y_pred)
f1 = f1_score(self.y_test_np, y_pred)
# 紀錄評估結果
logging.info(f"準確率:{accuracy:.4f}")
logging.info(f"精確率:{precision:.4f}")
logging.info(f"召回率:{recall:.4f}")
logging.info(f"F1 分數:{f1:.4f}")
except Exception as e:
logging.error(f"模型評估過程中發生錯誤:{str(e)}")
raise e
內容解密:
- 在評估模式下進行模型推論。
- 計算並記錄多項評估指標(準確率、精確率、召回率、F1 分數)。
- 使用混淆矩陣和 ROC-AUC 分數進行進一步分析。
使用TensorFlow進行深度學習分類別
在前一章中,我們使用PyTorch構建、訓練並評估了一個多層深度學習模型,用於預測糖尿病診斷的機率。本章將繼續探索深度學習分類別的主題,但這次使用的是TensorFlow框架。我們將展示如何複製與PyTorch相同的步驟,以實作兩種模型的效能比較。
資料集介紹
我們在前一章已經詳細介紹了Pima糖尿病資料集。在本文中,我們將編寫程式碼,將CSV檔案從S3儲存桶複製到EC2本地目錄,然後列印預出前十條記錄,以重新整理我們對特徵和目標變數的記憶。
from pyspark.sql import SparkSession
import os
# 建立SparkSession
spark = SparkSession.builder \
.appName("Read CSV from S3") \
.getOrCreate()
# 定義S3儲存桶和檔案詳情
bucket_name = "instance1bucket"
file_key = "diabetes.csv"
local_file_path = "/home/ubuntu/airflow/dags/diabetes.csv"
# 使用AWS CLI命令將檔案從S3複製到本地目錄
os.system(f"aws s3 cp s3://{bucket_name}/{file_key} {local_file_path}")
# 讀取CSV檔案
df = spark.read.csv(local_file_path, header=True, inferSchema=True)
# 選擇特定的列
selected_columns = [
'Pregnancies',
'Glucose',
'BloodPressure',
'BMI',
'DiabetesPedigreeFunction',
'Age',
'Outcome'
]
df_selected = df.select(selected_columns)
# 顯示前10條記錄
df_selected.show(10)
# 停止SparkSession
spark.stop()
內容解密:
- 匯入必要的函式庫:我們匯入了
pyspark.sql中的SparkSession類別和內建的os模組。SparkSession是使用Dataset和DataFrame API程式設計Spark的入口點,而os模組提供了使用作業系統相關功能的便攜方式。 - 建立SparkSession:我們建立了一個新的SparkSession,並指定了應用程式名稱為“Read CSV from S3”。如果已經存在一個SparkSession,則會檢索現有的那一個。
- 定義S3儲存桶和檔案詳情:我們定義了S3儲存桶名稱、檔案鍵(儲存桶內檔案的路徑)和本地檔案路徑,用於存放從S3複製的檔案。
- 使用AWS CLI命令複製檔案:透過
os.system函式執行shell命令,將diabetes.csv檔案從S3儲存桶複製到本地目錄。 - 讀取CSV檔案:使用Spark讀取CSV檔案,並推斷其結構。
- 選擇特定的列:我們選擇了與糖尿病預測相關的列,包括懷孕次數、葡萄糖水平、血壓、BMI、糖尿病譜系功能、年齡和結果。
- 顯示前10條記錄:使用
show方法顯示資料框的前10條記錄,以驗證資料是否正確載入。 - 停止SparkSession:最後,停止SparkSession以釋放資源。
本章將繼續使用TensorFlow進行深度學習分類別模型的構建、訓練和評估,並與PyTorch模型的效能進行比較。此外,我們還將介紹使用keras_tuner進行超引數調優,以自動搜尋最佳模型架構和學習率,從而可能獲得比手動選擇的超引數更好的效能指標。
利用 TensorFlow 進行糖尿病預測的深度學習模型
本章節將介紹如何使用 TensorFlow 訓練深度學習模型,以預測皮馬印第安女性患有糖尿病的可能性。我們將遵循與前一章相似的步驟,但這次使用 TensorFlow 和 Keras API 來構建和訓練模型。
步驟 1:匯入必要的函式庫
首先,我們需要匯入必要的函式庫,包括 PySpark 用於資料預處理、TensorFlow 用於構建和訓練深度學習模型,以及 Scikit-Learn 用於評估模型效能。
import os
import logging
from pyspark.sql import SparkSession
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
內容解密:
os和logging用於處理作業系統相關任務和日誌記錄。SparkSession是 PySpark 的入口點,用於建立 DataFrame 和執行 Spark 作業。tensorflow和keras用於構建和訓練深度學習模型。sklearn.metrics中的函式用於計算模型的評估指標,如準確率、精確率、召回率和 F1 分數。
步驟 2:組態日誌記錄
組態日誌記錄對於除錯和監控模型訓練過程非常重要。
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
內容解密:
logging.basicConfig(level=logging.INFO)設定日誌記錄的級別為 INFO,意味著 INFO 級別及以上的日誌將被記錄。logger = logging.getLogger(__name__)建立一個日誌記錄器,用於記錄日誌。
步驟 3:自定義回撥函式
自定義回撥函式可以在模型訓練過程中列印 epoch 資訊。
class CustomCallback(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
logger.info(f"Epoch {epoch+1}, Loss: {logs['loss']:.4f}, Accuracy: {logs['accuracy']:.4f}")
內容解密:
CustomCallback類別繼承自keras.callbacks.Callback,允許我們在訓練過程中執行自定義操作。on_epoch_end方法在每個 epoch 結束時被呼叫,用於列印當前的 epoch 號、損失值和準確率。
步驟 4:資料預處理
使用 PySpark 對糖尿病資料集進行預處理,包括讀取資料、過濾無效資料、特徵工程、資料標準化以及將資料分割為訓練集和測試集。
# 初始化 SparkSession
spark = SparkSession.builder.appName("DiabetesPrediction").getOrCreate()
# 從 S3 下載資料集到本地
bucket_name = "your-bucket-name"
file_key = "path/to/your/dataset.csv"
local_file_path = "/tmp/dataset.csv"
os.system(f"aws s3 cp s3://{bucket_name}/{file_key} {local_file_path}")
# 讀取 CSV 檔案到 DataFrame
df = spark.read.csv(local_file_path, header=True, inferSchema=True)
# 選擇相關列
selected_columns = ['Pregnancies', 'Glucose', 'BloodPressure', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
df_selected = df.select(selected_columns)
# 顯示前 10 筆記錄
df_selected.show(10)
# 停止 SparkSession
spark.stop()
內容解密:
- 這段程式碼首先初始化一個 SparkSession,然後從 S3 下載資料集到本地。
- 使用
spark.read.csv方法讀取 CSV 檔案到 DataFrame,並選擇相關列。 df_selected.show(10)用於顯示 DataFrame 的前 10 筆記錄。- 最後,停止 SparkSession 以釋放資源。
步驟 5:定義和訓練模型
定義 TensorFlow (Keras) 順序模型,並使用預處理後的資料進行訓練。
# 定義模型架構
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(7,)),
keras.layers.Dense(32, activation='relu'),
keras.layers.Dense(1, activation='sigmoid')
])
# 編譯模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# 訓練模型
model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test), callbacks=[CustomCallback()])
內容解密:
- 使用
keras.Sequential定義一個順序模型,包含多個全連線層(Dense layers)。 model.compile方法用於組態模型的最佳化器、損失函式和評估指標。model.fit方法用於訓練模型,傳入訓練資料、epoch 數、批次大小和驗證資料,並使用自定義回撥函式。