PyTorch 提供了靈活且高效的深度學習框架,結合 PySpark 的大資料處理能力,能有效處理 Tesla 股票價格預測任務。資料預處理階段,使用 Spark 的 VectorAssemblerStandardScaler 分別進行特徵組裝和標準化,確保模型輸入資料的一致性。模型訓練階段,採用多層感知器架構,並使用 ReLU 啟用函式提升模型非線性表達能力。模型評估使用測試集資料,計算測試損失和 R-squared 分數,以衡量模型的預測準確度和泛化能力。最後,將預測結果與實際股價繪製成圖表,直觀地展現模型的預測效果。

PyTorch 在 Tesla 股價預測中的應用:深入解析主函式與輔助函式

在 Tesla 股價預測的專案中,PyTorch 被用於建立和訓練深度學習模型,以預測未來的股價走勢。本文將探討主函式(main)和輔助函式(spark_df_to_tensor)在該專案中的作用和實作細節。

主函式的角色與流程

主函式是整個 Tesla 股價預測流程的核心,負責協調資料載入、預處理、模型初始化、訓練、儲存和評估等步驟。

資料載入與預處理

主函式首先使用 subprocess.run 方法將 AWS S3 儲存桶中的資料複製到 EC2 的本地目錄。接著,透過 load_data 函式載入 Tesla 股價資料,並進行必要的預處理。

模型初始化與訓練

模型初始化採用 PyTorch 框架,定義了一個包含三個隱藏層(分別具有 64、32 和 16 個單元)的神經網路模型,並使用 ReLU 啟用函式。模型訓練過程由 train_model 函式負責,指定了訓練引數如 epoch 數量、batch size、學習率、損失函式和最佳化器等。

模型儲存與評估

訓練完成後,主函式使用 torch.save 方法將訓練好的模型引數儲存到檔案中,以便後續佈署或進一步分析。同時,使用 evaluate_model 函式評估模型在測試資料上的表現,計算測試損失和 R-squared 分數等指標。

錯誤處理機制

主函式包含錯誤處理機制,例如捕捉 FileNotFoundError 以確保在資料載入過程中遇到問題時能夠提供有用的錯誤訊息。

輔助函式:spark_df_to_tensor

spark_df_to_tensor 函式負責將 Spark DataFrame 中的欄位轉換為 PyTorch 張量,以便於神經網路模型的訓練和評估。

實作細節

  1. 特徵和標籤提取:函式從輸入的 Spark DataFrame 中提取特徵向量和標籤,並將其轉換為 NumPy 陣列。
  2. 轉換為 PyTorch 張量:使用 torch.tensor 建構子將 NumPy 陣列轉換為 PyTorch 張量。
  3. 傳回張量元組:函式傳回特徵張量和標籤張量的元組,供後續的神經網路模型訓練或評估使用。

程式碼解析

def spark_df_to_tensor(df: DataFrame) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    將 Spark DataFrame 欄位轉換為 PyTorch 張量。
    """
    features = torch.tensor(
        np.array(df.rdd.map(lambda x: x.scaled_features.toArray()).collect()),
        dtype=torch.float32
    )
    labels = torch.tensor(
        np.array(df.rdd.map(lambda x: x.Close).collect()),
        dtype=torch.float32
    )
    return features, labels

內容解密:

  1. spark_df_to_tensor 函式定義:該函式接受一個 Spark DataFrame 作為輸入,並傳回一個包含特徵張量和標籤張量的元組。
  2. 特徵提取與轉換:使用 df.rdd.map 將 DataFrame 中的 scaled_features 欄位對映為 NumPy 陣列,再轉換為 PyTorch 張量。
  3. 標籤提取與轉換:同樣地,將 Close 欄位對映並轉換為 PyTorch 張量。
  4. 傳回結果:最終傳回特徵張量和標籤張量的元組,為 PyTorch 模型訓練做好準備。

深度學習模型評估與視覺化

在前一步驟中,我們成功訓練了一個根據 PyTorch 的深度學習模型來預測 Tesla 的股票價格。現在,我們將進一步評估模型的表現並視覺化預測結果。

載入必要的模組與資料

首先,我們需要匯入必要的函式庫,包括 PyTorch、NumPy 和 Matplotlib。同時,我們也需要從 tesla_stock_price_prediction 模組中匯入 load_datapreprocess_data 函式。

import torch
import numpy as np
import matplotlib.pyplot as plt
from tesla_stock_price_prediction import (
    load_data,
    preprocess_data
)

定義主函式進行模型評估與視覺化

接下來,我們定義一個名為 main 的函式,該函式負責執行以下任務:

  1. 載入測試資料
  2. 對資料進行預處理
  3. 定義神經網路模型架構
  4. 載入預訓練的模型權重
  5. 對測試資料進行預測
  6. 列印前十個實際值與預測值的對比
  7. 繪製所有實際值與預測值的折線圖
def main():
    """
    載入測試資料,進行預處理,並使用預訓練模型進行預測。
    此函式載入測試資料,進行預處理,載入預訓練的 PyTorch 模型,
    對測試資料進行預測,並視覺化實際與預測的股票價格。
    """
    test_data_file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
    test_data = load_data(test_data_file_path)
    test_data = preprocess_data(test_data)
    test_features = test_data.select('scaled_features').collect()
    # ...(其他程式碼)

內容解密:

  1. import torch:匯入 PyTorch 函式庫,用於深度學習模型的建立與訓練。
  2. import numpy as np:匯入 NumPy 函式庫,用於高效的數值運算。
  3. import matplotlib.pyplot as plt:匯入 Matplotlib 的 pyplot 模組,用於繪製圖表。
  4. from tesla_stock_price_prediction import (load_data, preprocess_data):從自定義的 tesla_stock_price_prediction 模組中匯入 load_datapreprocess_data 函式,用於載入和預處理資料。
  5. def main()::定義主函式,負責協調整個評估與視覺化的流程。

載入測試資料與預處理

main 函式中,首先載入測試資料並進行預處理。測試資料的路徑被指定為 /home/ubuntu/airflow/dags/TSLA_stock.csv

test_data_file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
test_data = load_data(test_data_file_path)
test_data = preprocess_data(test_data)
test_features = test_data.select('scaled_features').collect()

內容解密:

  1. test_data = load_data(test_data_file_path):呼叫 load_data 函式載入指定路徑下的測試資料。
  2. test_data = preprocess_data(test_data):對載入的測試資料進行預處理,包括特徵縮放等操作。
  3. test_features = test_data.select('scaled_features').collect():從預處理後的資料中提取縮放後的特徵。

模型評估指標分析

在之前的步驟中,我們已經獲得了模型的訓練損失、測試損失和 R-squared 評分。這些指標對於評估模型的表現至關重要。

  • 訓練損失(Train Loss):隨著訓練的進行,損失值不斷下降,表明模型在訓練資料上的表現越來越好。
  • 測試損失(Test Loss):測試損失略高於訓練損失,但仍然保持在合理的範圍內,表明模型具有良好的泛化能力。
  • R-squared 評分:0.998 的 R-squared 評分表明模型能夠很好地解釋依賴變數的變異,表現出強大的預測能力。

深度學習於迴歸分析的應用:以PyTorch預測Tesla股票價格

簡介

本章節將探討如何結合PyTorch與PySpark進行Tesla股票價格的預測。我們將從資料載入、預處理、模型建立、訓練到最終的預測與視覺化呈現整個流程。

資料載入與預處理

首先,我們需要從CSV檔案中載入測試資料,並對其進行預處理。以下是實作該功能的程式碼:

import numpy as np
from pyspark.sql import SparkSession

def load_data(file_path: str):
    """
    使用SparkSession從CSV檔案載入股票價格資料。
    """
    spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df

def preprocess_data(df):
    # 省略具體實作細節
    pass

test_data = load_data("/home/ubuntu/airflow/dags/TSLA_stock.csv")
test_features = preprocess_data(test_data)

內容解密:

  1. load_data函式使用SparkSession從指定的CSV檔案路徑載入資料。
  2. 建立SparkSession例項,並設定應用名稱為"StockPricePrediction"。
  3. 使用spark.read.csv方法讀取CSV檔案,啟用header選項以讀取列名,並使用inferSchema自動推斷資料型別。

模型定義與載入

接下來,我們定義一個根據PyTorch的深度學習模型,並載入預訓練的模型權重。

import torch
import torch.nn as nn

input_size = 4
output_size = 1
model = nn.Sequential(
    nn.Linear(input_size, 64),
    nn.ReLU(),
    nn.Linear(64, 32),
    nn.ReLU(),
    nn.Linear(32, 16),
    nn.ReLU(),
    nn.Linear(16, output_size)
)

model.load_state_dict(torch.load('/home/ubuntu/airflow/dags/trained_model.pth', map_location=torch.device('cpu')))
model.eval()

內容解密:

  1. 定義一個順序性的神經網路模型,包含多個線性層和ReLU啟用函式。
  2. 使用torch.load載入預訓練模型權重,並透過model.load_state_dict載入模型引數。
  3. 將模型設為評估模式,以關閉dropout和batch normalization的訓練特定行為。

預測與視覺化

完成模型載入後,我們進行預測並視覺化結果。

test_features = np.array([row.scaled_features.toArray() for row in test_features], dtype=np.float64)
test_features_tensor = torch.tensor(test_features, dtype=torch.float64)
model = model.double()
with torch.no_grad():
    predictions = model(test_features_tensor).squeeze().numpy()

actual_labels = test_data.select('Close').collect()
actual_labels = np.array([row.Close for row in actual_labels], dtype=np.float64)
actual_labels = actual_labels[::-1]
predictions = predictions[::-1]

print("Top 10 Actual vs Predicted Values:")
print("Actual\tPredicted")
for i in range(10):
    print(f"{actual_labels[i]:.4f}\t{predictions[i]:.4f}")

import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(actual_labels, label='Actual')
plt.plot(predictions, label='Predicted')
plt.xlabel('Index')
plt.ylabel('Price')
plt.title('Actual vs Predicted Tesla Stock Prices (Feb 26, 2019 to Feb 23, 2024)')
plt.legend()
plt.grid(True)
plt.show()

內容解密:

  1. 將測試特徵轉換為PyTorch張量,並進行預測。
  2. 取得實際標籤值,並將實際和預測值反轉以符合時間順序。
  3. 列印前10個實際值與預測值,並進行視覺化比較。

使用 PySpark 和 PyTorch 預測特斯拉股票價格

本篇文章將介紹如何結合 PySpark 和 PyTorch 來建立一個深度學習模型,以預測特斯拉的股票價格。我們將逐步講解資料載入、預處理、模型訓練和評估的過程。

資料載入與預處理

首先,我們需要載入股票價格資料,並進行必要的預處理。這包括使用 VectorAssembler 組裝特徵向量,以及使用 StandardScaler 對特徵進行標準化。

載入資料

def load_data(file_path: str) -> DataFrame:
    """
    從 CSV 檔案載入股票價格資料。
    """
    try:
        spark = SparkSession.builder.appName("StockPricePrediction").getOrCreate()
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        return df
    except Exception as e:
        raise RuntimeError(f"載入資料失敗:{e}")

預處理資料

def preprocess_data(df: DataFrame) -> DataFrame:
    """
    對資料進行預處理,包括組裝特徵向量和標準化。
    """
    assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')
    df = assembler.transform(df)
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df)
    df = df.select('scaled_features', 'Close')
    return df

建立資料載入器

接下來,我們需要將預處理後的資料轉換成 PyTorch 的 DataLoader 物件,以便於批次處理資料。

def create_data_loader(features, labels, batch_size=32) -> DataLoader:
    """
    將資料轉換成 PyTorch 的 DataLoader 物件。
    """
    dataset = TensorDataset(features, labels)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

模型訓練與評估

我們定義了一個簡單的深度學習模型,使用均方誤差(MSE)作為損失函式,並使用 Adam 最佳化器進行訓練。

模型定義與訓練

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    """
    訓練模型。
    """
    for epoch in range(num_epochs):
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            loss.backward()
            optimizer.step()
        logger.info(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

模型評估

def evaluate_model(model, test_loader, criterion) -> Tuple[float, float]:
    """
    評估模型的效能。
    """
    with torch.no_grad():
        model.eval()
        predictions = []
        targets = []
        test_loss = 0.0
        for inputs, labels in test_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels.unsqueeze(1))
            test_loss += loss.item() * inputs.size(0)
            predictions.extend(outputs.squeeze().tolist())
            targets.extend(labels.tolist())
        test_loss /= len(test_loader.dataset)
        predictions = torch.tensor(predictions)
        targets = torch.tensor(targets)
        ss_res = torch.sum((targets - predictions) ** 2)
        ss_tot = torch.sum((targets - torch.mean(targets)) ** 2)
        r_squared = 1 - ss_res / ss_tot
        logger.info(f"Test Loss: {test_loss:.4f}")
        logger.info(f"R-squared Score: {r_squared:.4f}")
        return test_loss, r_squared.item()

主函式

最後,我們定義了主函式來整合上述步驟,包括資料載入、預處理、模型訓練和評估。

def main(data_file_path: str, num_epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.001):
    """
    主函式。
    """
    try:
        # 載入資料
        df = load_data(data_file_path)
        # 預處理資料
        df = preprocess_data(df)
        # 分割資料集
        train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
        # 轉換成 PyTorch 張量
        train_features, train_labels = spark_df_to_tensor(train_df)
        test_features, test_labels = spark_df_to_tensor(test_df)
        # 建立 DataLoader 物件
        train_loader = create_data_loader(train_features, train_labels, batch_size=batch_size)
        test_loader = create_data_loader(test_features, test_labels, batch_size=batch_size)
        # 定義模型、損失函式和最佳化器
        input_size = train_features.shape[1]
        output_size = 1
        model = nn.Sequential(
            nn.Linear(input_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, output_size)
        )
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        # 訓練模型
        train_model(model, train_loader, criterion, optimizer, num_epochs)
        # 評估模型
        evaluate_model(model, test_loader, criterion)
        # 儲存模型
        torch.save(model.state_dict(), 'trained_model.pth')
    except FileNotFoundError:
        logger.error(f"找不到資料檔案:{data_file_path}")