PyTorch 提供深度學習模型建立的彈性,Spark 則是大資料處理的利器,兩者結合能有效處理糖尿病分類別問題。本篇整合 Spark 分散式運算能力和 PyTorch 深度學習框架,從資料預處理、特徵工程到模型訓練與評估,完整建構糖尿病分類別模型。透過 Spark 讀取和預處理資料,確保資料品質和一致性,再利用 PyTorch 建立和訓練深度學習模型,並以多種指標評估模型效能。
深度學習與 PyTorch 在糖尿病分類別中的應用
本章節將探討如何使用 PyTorch 進行糖尿病分類別,涵蓋模型建立、訓練和評估,並介紹如何使用 K-fold 交叉驗證來提升模型效能。
資料預處理
在進行模型訓練之前,需要對資料進行預處理。以下是一個用於預處理糖尿病資料集的類別:
class DataPreprocessor:
def __init__(self, spark, data_file_path):
self.spark = spark
self.data_file_path = data_file_path
def preprocess(self):
try:
diabetes_df = self.spark.read.csv(self.data_file_path, header=True, inferSchema=True)
diabetes_df = diabetes_df.filter((col("Glucose") != 0) & (col("BloodPressure") != 0) & (col("BMI") != 0))
feature_cols = ["Pregnancies", "Glucose", "BloodPressure", "BMI", "DiabetesPedigreeFunction", "Age"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
diabetes_df = assembler.transform(diabetes_df)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(diabetes_df)
diabetes_df = scaler_model.transform(diabetes_df)
train_df, test_df = diabetes_df.randomSplit([0.8, 0.2], seed=42)
return train_df, test_df
except Exception as e:
logging.error(f"Error occurred during data preprocessing: {str(e)}")
raise e
內容解密:
- 資料讀取與過濾:使用 SparkSession 讀取 CSV 檔案,並過濾掉 Glucose、BloodPressure 和 BMI 為 0 的資料列。
- 特徵組合:使用 VectorAssembler 將多個特徵欄位組合成一個向量欄位 “features”。
- 標準化:使用 StandardScaler 對 “features” 進行標準化,生成 “scaled_features”。
- 資料分割:將資料分割為訓練集和測試集,比例為 8:2。
糖尿病分類別模型
以下是定義的糖尿病分類別神經網路模型:
class DiabetesClassifierModel(nn.Module):
def __init__(self, input_size, output_size):
super(DiabetesClassifierModel, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, output_size),
)
def forward(self, x):
return self.model(x)
內容解密:
- 模型結構:定義了一個四層的神經網路,分別有 64、32、16 個神經元,使用 ReLU 作為啟用函式。
- 輸入與輸出:輸入大小為 input_size,輸出大小為 output_size。
模型訓練
以下是模型訓練的類別:
class ModelTrainer:
def __init__(self, model, criterion, optimizer, train_loader):
self.model = model
self.criterion = criterion
self.optimizer = optimizer
self.train_loader = train_loader
內容解密:
- 初始化引數:接收模型、損失函式、最佳化器和訓練資料載入器作為初始化引數。
- 模型訓練準備:為後續的模型訓練過程做準備。
AWS S3 檔案下載
以下是從 AWS S3 下載檔案的函式:
def copy_file_from_s3(bucket_name, file_key, local_path):
try:
s3 = boto3.client('s3')
s3.download_file(bucket_name, file_key, local_path)
print(f"File {file_key} downloaded to {local_path}")
except Exception as e:
print(f"An error occurred while downloading file {file_key}: {str(e)}")
內容解密:
- S3 使用者端建立:使用 boto3 建立 S3 使用者端。
- 檔案下載:下載指定 bucket 中的檔案到本地路徑。
- 錯誤處理:捕捉並列印下載過程中發生的錯誤。
主程式執行流程
if __name__ == "__main__":
spark = SparkSession.builder.appName("PimaDatasetExplorer").getOrCreate()
local_file_path = "/home/ubuntu/airflow/dags/diabetes.csv"
s3_bucket_name = "instance1bucket"
s3_file_key = "diabetes.csv"
copy_file_from_s3(s3_bucket_name, s3_file_key, local_file_path)
explorer = PimaDatasetExplorer(local_file_path)
explorer.preprocess_data()
explorer.handle_missing_values()
explorer.count_zeros()
explorer.data_summary()
explorer.count_outcome()
explorer.explore_feature_distributions()
explorer.calculate_feature_target_correlation()
內容解密:
- SparkSession 建立:建立一個名為 “PimaDatasetExplorer” 的 SparkSession。
- S3 檔案下載:從 S3 下載 diabetes.csv 到本地。
- 資料探索:執行一系列資料探索任務,包括預處理、缺失值處理、零值計數、資料摘要、結果計數、特徵分佈探索和特徵與目標變數的相關性計算。
深度學習模型訓練與評估系統之完整實作
本篇內容將探討如何使用 PyTorch 結合 Spark 進行糖尿病分類別模型的訓練與評估,同時包含完整的資料預處理、模型訓練、模型評估及錯誤處理機制。
資料預處理模組實作
資料預處理是機器學習流程中的關鍵步驟,負責將原始資料轉換為適合模型訓練的形式。以下為 DataPreprocessor 類別的實作細節:
class DataPreprocessor:
def __init__(self, spark, data_file_path):
self.spark = spark
self.data_file_path = data_file_path
def preprocess(self):
try:
diabetes_df = self.spark.read.csv(
self.data_file_path,
header=True, inferSchema=True
)
diabetes_df = diabetes_df.filter(
(col("Glucose") != 0)
& (col("BloodPressure") != 0)
& (col("BMI") != 0)
)
feature_cols = [
"Pregnancies",
"Glucose",
"BloodPressure",
"BMI",
"DiabetesPedigreeFunction",
"Age",
]
assembler = VectorAssembler(
inputCols=feature_cols,
outputCol="features"
)
diabetes_df = assembler.transform(diabetes_df)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features"
)
scaler_model = scaler.fit(diabetes_df)
diabetes_df = scaler_model.transform(diabetes_df)
return diabetes_df
except Exception as e:
logging.error(f"資料預處理過程中發生錯誤: {str(e)}")
raise e
資料預處理解密:
- 使用 SparkSession 讀取 CSV 資料並自動推斷資料型別
- 過濾掉關鍵特徵欄位為零的異常資料
- 透過
VectorAssembler將多個特徵欄位組合成單一特徵向量 - 使用
StandardScaler對特徵進行標準化處理,使資料符合常態分佈
模型訓練流程實作
模型訓練是整個深度學習流程的核心,負責最佳化模型引數以達到最佳的預測效果。以下為 ModelTrainer 類別的實作:
class ModelTrainer:
def __init__(self, model, criterion, optimizer, train_loader):
self.model = model
self.criterion = criterion
self.optimizer = optimizer
self.train_loader = train_loader
def train(self, epochs=100, lr=0.01):
try:
for epoch in range(epochs):
self.model.train()
for inputs, targets in self.train_loader:
self.optimizer.zero_grad()
outputs = self.model(inputs)
loss = self.criterion(outputs.squeeze(), targets)
loss.backward()
self.optimizer.step()
if (epoch + 1) % 10 == 0:
logging.info(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")
except Exception as e:
logging.error(f"模型訓練過程中發生錯誤: {str(e)}")
raise e
模型訓練解密:
- 使用指定的損失函式(BCEWithLogitsLoss)進行二元分類別任務的損失計算
- 採用 Adam 最佳化器動態調整學習率以加速收斂
- 每 10 個 epoch 輸出一次當前損失值以監控訓練進度
- 完整的錯誤處理機制確保訓練過程中的異常能夠被捕捉並記錄
模型評估指標實作
模型評估是驗證模型效能的關鍵步驟,以下為 ModelEvaluator 類別的實作細節:
class ModelEvaluator:
def __init__(self, model, test_loader, y_test_np):
self.model = model
self.test_loader = test_loader
self.y_test_np = y_test_np
def evaluate(self):
try:
with torch.no_grad():
self.model.eval()
predictions = []
probabilities = []
for inputs, _ in self.test_loader:
outputs = self.model(inputs)
probabilities.extend(torch.sigmoid(outputs).squeeze().tolist())
predictions.extend(outputs.squeeze().tolist())
y_pred = np.array([1 if pred > 0.5 else 0 for pred in probabilities])
accuracy = accuracy_score(self.y_test_np, y_pred)
precision = precision_score(self.y_test_np, y_pred)
recall = recall_score(self.y_test_np, y_pred)
f1 = f1_score(self.y_test_np, y_pred)
logging.info(f"準確率: {accuracy:.4f}")
logging.info(f"精確率: {precision:.4f}")
logging.info(f"召回率: {recall:.4f}")
logging.info(f"F1 分數: {f1:.4f}")
except Exception as e:
logging.error(f"模型評估過程中發生錯誤: {str(e)}")
raise e
模型評估解密:
- 使用多項評估指標(準確率、精確率、召回率、F1分數)全面評估模型效能
- 透過 ROC-AUC 分數評估模型的區分能力
- 使用混淆矩陣分析模型的預測結果分佈
- 詳細的錯誤處理機制確保評估過程的穩定性
主程式執行流程
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
spark = SparkSession.builder.appName("DiabetesClassifier").getOrCreate()
copy_file_from_s3("instance1bucket", "diabetes.csv", "/home/ubuntu/airflow/dags/")
data_preprocessor = DataPreprocessor(spark, "/home/ubuntu/airflow/dags/diabetes.csv")
train_df, test_df = data_preprocessor.preprocess()
# ... 後續模型訓練與評估流程 ...
主程式流程解密:
- 初始化 SparkSession 建立大資料處理環境
- 從 S3 儲存桶下載必要的資料檔案
- 建立資料前處理器進行資料清洗與轉換
- 後續流程包含模型訓練、評估等步驟,確保整個流程的完整性與可追蹤性
本系統透過嚴謹的模組化設計與完整的錯誤處理機制,實作了一個高效穩定的糖尿病分類別模型訓練與評估流程,為醫療領域的 AI 應用提供了堅實的技術基礎。