PyTorch 提供了靈活且高效的深度學習框架,結合 PySpark 的大資料處理能力,能有效處理 Tesla 股票價格預測任務。資料預處理階段,使用 Spark 的 VectorAssembler 與 StandardScaler 分別進行特徵組裝和標準化,確保模型輸入資料的一致性。模型訓練階段,採用多層感知器架構,並使用 ReLU 啟用函式提升模型非線性表達能力。模型評估使用測試集資料,計算測試損失和 R-squared 分數,以衡量模型的預測準確度和泛化能力。最後,將預測結果與實際股價繪製成圖表,直觀地展現模型的預測效果。
PyTorch 在 Tesla 股價預測中的應用:深入解析主函式與輔助函式
在 Tesla 股價預測的專案中,PyTorch 被用於建立和訓練深度學習模型,以預測未來的股價走勢。本文將探討主函式(main)和輔助函式(spark_df_to_tensor)在該專案中的作用和實作細節。
主函式的角色與流程
主函式是整個 Tesla 股價預測流程的核心,負責協調資料載入、預處理、模型初始化、訓練、儲存和評估等步驟。
資料載入與預處理
主函式首先使用 subprocess.run 方法將 AWS S3 儲存桶中的資料複製到 EC2 的本地目錄。接著,透過 load_data 函式載入 Tesla 股價資料,並進行必要的預處理。
模型初始化與訓練
模型初始化採用 PyTorch 框架,定義了一個包含三個隱藏層(分別具有 64、32 和 16 個單元)的神經網路模型,並使用 ReLU 啟用函式。模型訓練過程由 train_model 函式負責,指定了訓練引數如 epoch 數量、batch size、學習率、損失函式和最佳化器等。
模型儲存與評估
訓練完成後,主函式使用 torch.save 方法將訓練好的模型引數儲存到檔案中,以便後續佈署或進一步分析。同時,使用 evaluate_model 函式評估模型在測試資料上的表現,計算測試損失和 R-squared 分數等指標。
錯誤處理機制
主函式包含錯誤處理機制,例如捕捉 FileNotFoundError 以確保在資料載入過程中遇到問題時能夠提供有用的錯誤訊息。
輔助函式:spark_df_to_tensor
spark_df_to_tensor 函式負責將 Spark DataFrame 中的欄位轉換為 PyTorch 張量,以便於神經網路模型的訓練和評估。
實作細節
- 特徵和標籤提取:函式從輸入的 Spark DataFrame 中提取特徵向量和標籤,並將其轉換為 NumPy 陣列。
- 轉換為 PyTorch 張量:使用
torch.tensor建構子將 NumPy 陣列轉換為 PyTorch 張量。 - 傳回張量元組:函式傳回特徵張量和標籤張量的元組,供後續的神經網路模型訓練或評估使用。
程式碼解析
def spark_df_to_tensor(df: DataFrame) -> Tuple[torch.Tensor, torch.Tensor]:
"""
將 Spark DataFrame 欄位轉換為 PyTorch 張量。
"""
features = torch.tensor(
np.array(df.rdd.map(lambda x: x.scaled_features.toArray()).collect()),
dtype=torch.float32
)
labels = torch.tensor(
np.array(df.rdd.map(lambda x: x.Close).collect()),
dtype=torch.float32
)
return features, labels
內容解密:
spark_df_to_tensor函式定義:該函式接受一個 Spark DataFrame 作為輸入,並傳回一個包含特徵張量和標籤張量的元組。- 特徵提取與轉換:使用
df.rdd.map將 DataFrame 中的scaled_features欄位對映為 NumPy 陣列,再轉換為 PyTorch 張量。 - 標籤提取與轉換:同樣地,將
Close欄位對映並轉換為 PyTorch 張量。 - 傳回結果:最終傳回特徵張量和標籤張量的元組,為 PyTorch 模型訓練做好準備。
深度學習模型評估與視覺化
在前一步驟中,我們成功訓練了一個根據 PyTorch 的深度學習模型來預測 Tesla 的股票價格。現在,我們將進一步評估模型的表現並視覺化預測結果。
載入必要的模組與資料
首先,我們需要匯入必要的函式庫,包括 PyTorch、NumPy 和 Matplotlib。同時,我們也需要從 tesla_stock_price_prediction 模組中匯入 load_data 和 preprocess_data 函式。
import torch
import numpy as np
import matplotlib.pyplot as plt
from tesla_stock_price_prediction import (
load_data,
preprocess_data
)
定義主函式進行模型評估與視覺化
接下來,我們定義一個名為 main 的函式,該函式負責執行以下任務:
- 載入測試資料
- 對資料進行預處理
- 定義神經網路模型架構
- 載入預訓練的模型權重
- 對測試資料進行預測
- 列印前十個實際值與預測值的對比
- 繪製所有實際值與預測值的折線圖
def main():
"""
載入測試資料,進行預處理,並使用預訓練模型進行預測。
此函式載入測試資料,進行預處理,載入預訓練的 PyTorch 模型,
對測試資料進行預測,並視覺化實際與預測的股票價格。
"""
test_data_file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
test_data = load_data(test_data_file_path)
test_data = preprocess_data(test_data)
test_features = test_data.select('scaled_features').collect()
# ...(其他程式碼)
內容解密:
import torch:匯入 PyTorch 函式庫,用於深度學習模型的建立與訓練。import numpy as np:匯入 NumPy 函式庫,用於高效的數值運算。import matplotlib.pyplot as plt:匯入 Matplotlib 的 pyplot 模組,用於繪製圖表。from tesla_stock_price_prediction import (load_data, preprocess_data):從自定義的tesla_stock_price_prediction模組中匯入load_data和preprocess_data函式,用於載入和預處理資料。def main()::定義主函式,負責協調整個評估與視覺化的流程。
載入測試資料與預處理
在 main 函式中,首先載入測試資料並進行預處理。測試資料的路徑被指定為 /home/ubuntu/airflow/dags/TSLA_stock.csv。
test_data_file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
test_data = load_data(test_data_file_path)
test_data = preprocess_data(test_data)
test_features = test_data.select('scaled_features').collect()
內容解密:
test_data = load_data(test_data_file_path):呼叫load_data函式載入指定路徑下的測試資料。test_data = preprocess_data(test_data):對載入的測試資料進行預處理,包括特徵縮放等操作。test_features = test_data.select('scaled_features').collect():從預處理後的資料中提取縮放後的特徵。
模型評估指標分析
在之前的步驟中,我們已經獲得了模型的訓練損失、測試損失和 R-squared 評分。這些指標對於評估模型的表現至關重要。
- 訓練損失(Train Loss):隨著訓練的進行,損失值不斷下降,表明模型在訓練資料上的表現越來越好。
- 測試損失(Test Loss):測試損失略高於訓練損失,但仍然保持在合理的範圍內,表明模型具有良好的泛化能力。
- R-squared 評分:0.998 的 R-squared 評分表明模型能夠很好地解釋依賴變數的變異,表現出強大的預測能力。
深度學習於迴歸分析的應用:以PyTorch預測Tesla股票價格
簡介
本章節將探討如何結合PyTorch與PySpark進行Tesla股票價格的預測。我們將從資料載入、預處理、模型建立、訓練到最終的預測與視覺化呈現整個流程。
資料載入與預處理
首先,我們需要從CSV檔案中載入測試資料,並對其進行預處理。以下是實作該功能的程式碼:
import numpy as np
from pyspark.sql import SparkSession
def load_data(file_path: str):
"""
使用SparkSession從CSV檔案載入股票價格資料。
"""
spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
df = spark.read.csv(file_path, header=True, inferSchema=True)
return df
def preprocess_data(df):
# 省略具體實作細節
pass
test_data = load_data("/home/ubuntu/airflow/dags/TSLA_stock.csv")
test_features = preprocess_data(test_data)
內容解密:
load_data函式使用SparkSession從指定的CSV檔案路徑載入資料。- 建立SparkSession例項,並設定應用名稱為"StockPricePrediction"。
- 使用
spark.read.csv方法讀取CSV檔案,啟用header選項以讀取列名,並使用inferSchema自動推斷資料型別。
模型定義與載入
接下來,我們定義一個根據PyTorch的深度學習模型,並載入預訓練的模型權重。
import torch
import torch.nn as nn
input_size = 4
output_size = 1
model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, output_size)
)
model.load_state_dict(torch.load('/home/ubuntu/airflow/dags/trained_model.pth', map_location=torch.device('cpu')))
model.eval()
內容解密:
- 定義一個順序性的神經網路模型,包含多個線性層和ReLU啟用函式。
- 使用
torch.load載入預訓練模型權重,並透過model.load_state_dict載入模型引數。 - 將模型設為評估模式,以關閉dropout和batch normalization的訓練特定行為。
預測與視覺化
完成模型載入後,我們進行預測並視覺化結果。
test_features = np.array([row.scaled_features.toArray() for row in test_features], dtype=np.float64)
test_features_tensor = torch.tensor(test_features, dtype=torch.float64)
model = model.double()
with torch.no_grad():
predictions = model(test_features_tensor).squeeze().numpy()
actual_labels = test_data.select('Close').collect()
actual_labels = np.array([row.Close for row in actual_labels], dtype=np.float64)
actual_labels = actual_labels[::-1]
predictions = predictions[::-1]
print("Top 10 Actual vs Predicted Values:")
print("Actual\tPredicted")
for i in range(10):
print(f"{actual_labels[i]:.4f}\t{predictions[i]:.4f}")
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(actual_labels, label='Actual')
plt.plot(predictions, label='Predicted')
plt.xlabel('Index')
plt.ylabel('Price')
plt.title('Actual vs Predicted Tesla Stock Prices (Feb 26, 2019 to Feb 23, 2024)')
plt.legend()
plt.grid(True)
plt.show()
內容解密:
- 將測試特徵轉換為PyTorch張量,並進行預測。
- 取得實際標籤值,並將實際和預測值反轉以符合時間順序。
- 列印前10個實際值與預測值,並進行視覺化比較。
使用 PySpark 和 PyTorch 預測特斯拉股票價格
本篇文章將介紹如何結合 PySpark 和 PyTorch 來建立一個深度學習模型,以預測特斯拉的股票價格。我們將逐步講解資料載入、預處理、模型訓練和評估的過程。
資料載入與預處理
首先,我們需要載入股票價格資料,並進行必要的預處理。這包括使用 VectorAssembler 組裝特徵向量,以及使用 StandardScaler 對特徵進行標準化。
載入資料
def load_data(file_path: str) -> DataFrame:
"""
從 CSV 檔案載入股票價格資料。
"""
try:
spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
df = spark.read.csv(file_path, header=True, inferSchema=True)
return df
except Exception as e:
raise RuntimeError(f"載入資料失敗:{e}")
預處理資料
def preprocess_data(df: DataFrame) -> DataFrame:
"""
對資料進行預處理,包括組裝特徵向量和標準化。
"""
assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
df = assembler.transform(df)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
df = df.select('scaled_features', 'Close')
return df
建立資料載入器
接下來,我們需要將預處理後的資料轉換成 PyTorch 的 DataLoader 物件,以便於批次處理資料。
def create_data_loader(features, labels, batch_size=32) -> DataLoader:
"""
將資料轉換成 PyTorch 的 DataLoader 物件。
"""
dataset = TensorDataset(features, labels)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
模型訓練與評估
我們定義了一個簡單的深度學習模型,使用均方誤差(MSE)作為損失函式,並使用 Adam 最佳化器進行訓練。
模型定義與訓練
def train_model(model, train_loader, criterion, optimizer, num_epochs):
"""
訓練模型。
"""
for epoch in range(num_epochs):
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
loss.backward()
optimizer.step()
logger.info(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")
模型評估
def evaluate_model(model, test_loader, criterion) -> Tuple[float, float]:
"""
評估模型的效能。
"""
with torch.no_grad():
model.eval()
predictions = []
targets = []
test_loss = 0.0
for inputs, labels in test_loader:
outputs = model(inputs)
loss = criterion(outputs, labels.unsqueeze(1))
test_loss += loss.item() * inputs.size(0)
predictions.extend(outputs.squeeze().tolist())
targets.extend(labels.tolist())
test_loss /= len(test_loader.dataset)
predictions = torch.tensor(predictions)
targets = torch.tensor(targets)
ss_res = torch.sum((targets - predictions) ** 2)
ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
r_squared = 1 - ss_res / ss_tot
logger.info(f"Test Loss: {test_loss:.4f}")
logger.info(f"R-squared Score: {r_squared:.4f}")
return test_loss, r_squared.item()
主函式
最後,我們定義了主函式來整合上述步驟,包括資料載入、預處理、模型訓練和評估。
def main(data_file_path: str, num_epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.001):
"""
主函式。
"""
try:
# 載入資料
df = load_data(data_file_path)
# 預處理資料
df = preprocess_data(df)
# 分割資料集
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# 轉換成 PyTorch 張量
train_features, train_labels = spark_df_to_tensor(train_df)
test_features, test_labels = spark_df_to_tensor(test_df)
# 建立 DataLoader 物件
train_loader = create_data_loader(train_features, train_labels, batch_size=batch_size)
test_loader = create_data_loader(test_features, test_labels, batch_size=batch_size)
# 定義模型、損失函式和最佳化器
input_size = train_features.shape[1]
output_size = 1
model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, output_size)
)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 訓練模型
train_model(model, train_loader, criterion, optimizer, num_epochs)
# 評估模型
evaluate_model(model, test_loader, criterion)
# 儲存模型
torch.save(model.state_dict(), 'trained_model.pth')
except FileNotFoundError:
logger.error(f"找不到資料檔案:{data_file_path}")