在 AWS 上建構大規模深度學習管線需要整合多項服務與框架。本文介紹的管線以 Amazon S3 作為核心儲存,利用其高用性和可擴充套件性儲存原始資料、預處理資料和模型構件。PySpark 則負責高效的資料預處理,其分散式處理能力能有效處理大規模資料集。模型訓練階段則採用 PyTorch 和 TensorFlow 兩種主流深度學習框架,並比較兩者的特性與差異,提供更全面的模型開發選擇。後續章節將探討各個階段的實作細節,包含程式碼範例和實際應用案例。
大規模深度學習管線在AWS上的架構概述
本章節將介紹如何在Amazon Web Services(AWS)上建立一個可擴充套件的深度學習管線,涵蓋從使用S3進行資料擷取到使用Apache Airflow排程模型執行的完整流程。後續章節將探討該管線的每個階段,提供清晰的解釋、實作範例以及來自實際應用的見解。
AWS上的深度學習工作流程元件
圖1-1展示了一個全面的深度學習管線,利用AWS的功能處理大規模資料處理、模型訓練、評估和佈署任務。該管線的核心是Amazon S3,作為集中式資料來源,用於儲存原始資料集、預處理資料和模型構件。
資料來源(S3)
這代表了資料從Amazon S3擷取的初始點,S3是AWS提供的根據雲端的儲存服務。S3通常用於儲存各種型別的資料,包括原始資料集、預處理資料和模型檢查點(如權重)。儲存在S3中的資料可以被管線中的其他元件存取和使用,包括資料預處理、模型訓練、模型評估、模型佈署以及監控和維護。
儲存在S3中的資料可以被複製到EC2上的本地目錄,然後輸入到深度學習模型中,或者作為管線的一部分進行複製,這意味著從S3檢索和整合資料到機器學習(ML)管線的各個階段是自動化的,以提高效率和可擴充套件性。
以下是兩個範例(使用AWS命令列介面(CLI)與subprocess和Boto3),展示如何將名為“TSLA_stock.csv”的CSV檔案(將在第4章和第5章中使用PyTorch和TensorFlow預測特斯拉股票價格)從名為“instance1bucket”的S3儲存桶複製到名為“/home/ubuntu/airflow/dags”的EC2本地目錄。
使用AWS CLI與subprocess
import subprocess
try:
subprocess.run([
"aws", "s3", "cp",
"s3://instance1bucket/TSLA_stock.csv",
"/home/ubuntu/airflow/dags/"
], check=True)
print("下載成功。")
except subprocess.CalledProcessError as e:
print(f"發生錯誤:{e}")
內容解密:
此程式碼利用Python中的subprocess模組透過AWS命令列介面(CLI)複製檔案“TSLA_stock.csv”從S3儲存桶“instance1bucket”到EC2例項上的本地目錄“/home/ubuntu/airflow/dags/”。subprocess.run()函式執行AWS CLI命令“aws s3 cp”,並指定源和目標路徑。 try/except區塊用於處理命令執行期間可能發生的潛在錯誤。如果命令成功,try區塊列印“下載成功。”如果命令失敗並引發subprocess.CalledProcessError,except區塊捕捉錯誤並列印一條訊息,指示發生了錯誤以及錯誤詳細資訊。 subprocess.run()中的check=True引數確保如果命令傳回非零離開狀態,則引發CalledProcessError,觸發except區塊。在此範例中,輸出指示檔案已成功從S3儲存桶下載到本地目錄。
使用Boto3
import boto3
try:
s3_client = boto3.client('s3')
bucket_name = 'instance1bucket'
key = 'TSLA_stock.csv'
local_path = '/home/ubuntu/airflow/dags/TSLA_stock.csv'
s3_client.download_file(bucket_name, key, local_path)
print("下載成功。")
except Exception as e:
print(f"發生錯誤:{e}")
內容解密:
在此程式碼中,Boto3(AWS的官方Python SDK)用於與Amazon S3服務互動。s3_client.download_file()方法用於下載檔案“TSLA_stock.csv”從S3儲存桶“instance1bucket”到EC2例項上的本地目錄“/home/ubuntu/airflow/dags/”。執行此程式碼後,檔案“TSLA_stock.csv”應該存在於指定的本地目錄中。
將檔案從目錄複製到S3儲存桶的過程本質上是從S3複製檔案到目錄的逆過程。
使用AWS CLI與subprocess上傳檔案
import subprocess
try:
subprocess.run([
"aws", "s3", "cp",
"/home/ubuntu/airflow/dags/TSLA_stock.csv",
"s3://instance1bucket/"
], check=True)
print("檔案上傳成功。")
except Exception as e:
print(f"發生錯誤:{e}")
內容解密:
在此程式碼中,我們顛倒了與從S3下載的程式碼相比的源和目標路徑的順序。
使用Boto3上傳檔案
import boto3
try:
s3_client = boto3.client('s3')
bucket_name = 'instance1bucket'
key = 'TSLA_stock.csv'
# 省略了本地路徑的指定,實際使用時需補充完整
except Exception as e:
print(f"發生錯誤:{e}")
內容解密:
這段程式碼同樣使用Boto3進行檔案上傳,但未完整展示上傳所需的全部引數,特別是本地檔案路徑。在實際應用中,需要指定完整的本地檔案路徑以完成上傳操作。
上傳資料至 Amazon S3 與 PySpark 資料預處理
在建構可擴充套件的深度學習管線時,資料的存取與預處理為首要步驟。本文將介紹如何使用 AWS 的 S3 服務進行資料的上傳與下載,以及如何利用 PySpark 進行高效的資料預處理。
將檔案上傳至 Amazon S3
在前一步驟中,我們已說明如何從 S3 下載資料。現在,我們將探討如何將本地檔案上傳至 S3。以下是範例程式碼:
import boto3
try:
s3_client = boto3.client('s3')
bucket_name = 'your_bucket_name'
key = 'TSLA_stock.csv'
local_path = '/home/ubuntu/airflow/dags/TSLA_stock.csv'
s3_client.upload_file(local_path, bucket_name, key)
print("檔案上傳成功。")
except Exception as e:
print(f"發生錯誤:{e}")
程式碼解析:
• s3_client = boto3.client('s3'):建立一個 S3 的客戶端物件,用於與 S3 服務進行互動。
• s3_client.upload_file(local_path, bucket_name, key):將本地檔案上傳至指定的 S3 儲存桶。其中,local_path 為本地檔案路徑,bucket_name 為 S3 儲存桶名稱,key 為上傳後在 S3 中的檔案名稱。
• 注意:若 S3 中已存在同名的檔案,則會被新上傳的檔案覆寫。
使用 PySpark 進行資料預處理
當資料從 S3 成功檢索後,將使用 PySpark 進行預處理。PySpark 是一個分散式資料處理框架,能夠高效地進行大規模資料的清理、轉換和特徵工程。
為何選擇 PySpark?
- 分散式處理能力:相較於傳統的單節點處理工具(如 Pandas 和 Scikit-Learn),PySpark 能夠在多個節點上平行處理資料,大幅提升處理效率。
- 無縫整合:PySpark 能夠與機器學習管線的其他元件無縫整合,從資料擷取到模型佈署形成一體化的工作流程。
以下是一個簡單的範例,展示如何使用 PySpark 建立 DataFrame 並顯示其內容:
from pyspark.sql import SparkSession
try:
spark = (SparkSession.builder
.appName("TacosDataFrame")
.getOrCreate())
data = [("Taco 1", "Beef", 2.50),
("Taco 2", "Chicken", 2.00),
("Taco 3", "Vegetarian", 2.00),
("Taco 4", "Fish", 3.00)]
schema = ["Taco Name", "Type", "Price"]
tacos_df = spark.createDataFrame(data, schema)
# 將 DataFrame 快取以最佳化效能
tacos_df.cache()
tacos_df.show()
except Exception as e:
print(f"發生錯誤:{e}")
finally:
spark.stop()
程式碼解析:
• from pyspark.sql import SparkSession:匯入 SparkSession 類別,它是使用 DataFrame API 程式設計 Spark 的主要介面。
• spark = SparkSession.builder.appName("TacosDataFrame").getOrCreate():建立或取得一個名為 “TacosDataFrame” 的 SparkSession。
• tacos_df = spark.createDataFrame(data, schema):根據提供的資料和結構描述建立 DataFrame。
• tacos_df.cache():將 DataFrame 快取於記憶體中,以提高後續操作的效能。
• tacos_df.show():顯示 DataFrame 的內容,預設顯示前 20 行。
大規模深度學習管線在AWS上的概述
執行PySpark程式碼
要執行此程式碼,需要正確安裝和組態Python 3的PySpark(我們將在下一章節介紹如何進行此操作)。將PySpark程式碼儲存在一個檔案中,例如在EC2例項上的「tacos_dataframe.py」,然後使用以下兩種選項之一執行指令碼:
以Python指令碼執行: 可以SSH進入EC2例項,並使用Python直譯器執行指令碼,如下所示:
python3 tacos_dataframe.py這種方法利用EC2例項的本地資源和Spark安裝來建立和處理DataFrame。
使用spark-submit執行: 或者,可以透過將指令碼提交給本地Spark例項,使用「spark-submit」來利用Spark的分散式計算能力,如下所示:
spark-submit tacos_dataframe.py此方法允許指令碼以分散式方式執行,利用Spark提供的計算資源。
模型建立(PyTorch和TensorFlow)
在此管線步驟中,來自PySpark的預處理資料用於使用PyTorch和TensorFlow框架建立深度學習模型。這些是流行的深度學習框架,以其在構建和訓練神經網路模型方面的靈活性和可擴充套件性而聞名。
PyTorch
PyTorch是由Facebook的AI研究實驗室(FAIR)開發的開源機器學習框架。它於2016年10月發布,建立在Torch函式庫之上,提供更具Python風格的介面。該框架的建立著重於靈活性、易用性和動態計算圖。它的關鍵特性之一是動態計算,允許直觀的模型構建和除錯。由於其使用者友好的API、豐富的檔案和對自動微分等高階特性的支援,PyTorch在研究人員和開發人員中獲得了人氣。隨著時間的推移,PyTorch已成為深度學習社群中使用最廣泛的框架之一,為研究專案、生產系統和教育計劃提供動力。
TensorFlow
TensorFlow是由Google Brain開發的,首先於2015年11月作為開源機器學習函式庫發布。該框架設計為可擴充套件、靈活和高效,能夠處理研究和生產工作負載。它引入了計算圖的概念,允許使用者定義複雜的神經網路架構,並在CPU、GPU或TPU上高效執行。TensorFlow的初始版本為構建和訓練深度學習模型提供了低階API。然而,為了使深度學習更容易被接受,Google於2017年3月將Keras作為TensorFlow的高階API引入。Keras簡化了構建和訓練神經網路的過程,提供了一個更友好的介面,同時保留了TensorFlow的效能和可擴充套件性。Keras與TensorFlow的整合促進了其在各個行業和學術機構中的廣泛採用,使其成為世界上最流行的深度學習框架之一。
共同步驟
在本文中,我們概述了兩個框架之間的共同步驟,而在第4-7章中,我們將更深入地探討具體的實作細節,並演示如何使用兩個框架執行每個步驟:
- 資料載入和預處理:PyTorch和TensorFlow都需要資料載入和預處理步驟來準備資料進行訓練。這可能涉及從檔案中讀取資料、正規化和將資料分成訓練集和驗證集等任務。
- 模型建立:在兩個框架中,我們定義神經網路模型的架構。這包括指定層數、層型別(例如密集、卷積、遞迴)、啟用函式和其他相關引數。
- 定義損失函式和最佳化器:PyTorch和TensorFlow允許我們根據任務選擇損失函式(例如,迴歸的均方誤差、分類別的類別交叉熵)和最佳化器來最佳化模型的引數(例如,隨機梯度下降(SGD)、Adam)。
- 模型訓練:兩個框架中的訓練迴圈都涉及遍歷資料批次,將它們傳遞給模型以獲得預測、計算損失、反向傳播梯度和使用最佳化器更新模型引數。
- 模型評估:訓練後,我們使用適當的指標(如準確率、精確率、召回率或均方誤差,取決於任務)在單獨的驗證或測試資料集上評估模型的效能。
- 推理:一旦模型經過訓練和評估,我們就可以用它對新的、未見過的資料進行預測。這涉及將新資料傳遞給訓練好的模型並獲得預測結果。
PyTorch與TensorFlow的比較
雖然PyTorch和TensorFlow之間的一般工作流程相似,但它們的API、語法和具體實作細節存在差異。例如:
- 在PyTorch中,我們使用
nn.Module類別定義模型,並在forward方法中建立前向傳遞,而在TensorFlow中,我們使用tf.keras.Model類別並在call方法中定義前向傳遞。 - PyTorch強調動態計算圖,允許更大的靈活性和更簡單的除錯,而TensorFlow最初使用靜態計算圖,但隨著TensorFlow 2.0及更高版本的發布,採用了更動態的方法。
- TensorFlow具有高階API Keras,它為構建和訓練神經網路模型提供了更簡單、更友好的介面,而PyTorch的API更底層,提供更大的靈活性。
範例:定義神經網路模型
以下是使用PyTorch和TensorFlow/Keras定義神經網路模型的範例,以說明它們的API和語法的差異。首先,我們使用PyTorch中的nn.Module定義一個神經網路模型,該模型指定了一個具有一個隱藏層的前饋網路(我們將在書的其他章節中探討如何使用更多層):
import torch
import torch.nn as nn
class PyTorchModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(PyTorchModel, self).__init__()
# 初始化模型的層
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 定義前向傳遞
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
內容解密:
nn.Module類別:PyTorch的神經網路模型都是nn.Module的子類別,這提供了構建神經網路的基本結構。__init__方法:初始化模型的層和其他必要的元件。在這個例子中,我們定義了兩個全連線層(fc1和fc2)。forward方法:定義了資料透過網路的前向傳遞過程。在這個例子中,輸入x首先透過第一個全連線層,然後經過ReLU啟用函式,最後透過第二個全連線層輸出。
同樣地,在TensorFlow/Keras中定義神經網路模型的範例如下:
import tensorflow as tf
class TensorFlowModel(tf.keras.Model):
def __init__(self, input_size, hidden_size, output_size):
super(TensorFlowModel, self).__init__()
# 初始化模型的層
self.fc1 = tf.keras.layers.Dense(hidden_size, activation='relu', input_shape=(input_size,))
self.fc2 = tf.keras.layers.Dense(output_size)
def call(self, x):
# 定義前向傳遞
x = self.fc1(x)
x = self.fc2(x)
return x
內容解密:
tf.keras.Model類別:TensorFlow/Keras的神經網路模型都是tf.keras.Model的子類別,這提供了構建神經網路的基本結構。__init__方法:初始化模型的層。在這個例子中,我們定義了兩個密集層(fc1和fc2),其中第一個層具有ReLU啟用函式。call方法:定義了資料透過網路的前向傳遞過程。在這個例子中,輸入x首先透過第一個密集層,然後透過第二個密集層輸出。
這些範例展示瞭如何在PyTorch和TensorFlow/Keras中定義簡單的神經網路模型,並突出了它們在API和語法上的差異。