深度學習技術已廣泛應用於金融領域,本文將以特斯拉股票價格預測為例,示範如何使用 TensorFlow 建構一個有效的預測模型。首先,我們利用 Spark 的 randomSplit 方法將資料集分割為訓練集和測試集,並設定隨機種子以確保結果可重現。接著,使用 convert_to_numpy 方法將 Spark DataFrame 轉換為 NumPy 陣列,以供 TensorFlow 使用。模型的建立採用 Keras API,設計一個包含多層感知器(MLP)的迴歸模型,並使用 Adam 最佳化器與均方誤差(MSE)損失函式進行訓練。最後,我們使用測試損失和 R-squared 分數評估模型的預測效能,並繪製實際值與預測值的散點圖,以視覺化模型的預測結果。程式碼中也包含了詳細的引數說明、傳回值定義以及流程解析,方便讀者理解和應用。

TSLARegressor 類別方法實作與深度解析

資料分割方法(split_data)深度剖析

TSLARegressor 類別中的 split_data 方法是資料預處理的關鍵步驟,負責將原始資料集分割為訓練集與測試集。此方法採用 Apache Spark 的 randomSplit 功能,能夠根據設定的比例精確地將 DataFrame 分割。

實作細節與技術考量

def split_data(self, df, train_ratio=0.8, seed=42):
    train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=seed)
    return train_df, test_df

內容解密:

  1. 資料分割比例控制train_ratio 引數允許使用者自訂訓練資料的比例,預設值為 0.8,確保模型有足夠的資料進行訓練。
  2. 隨機種子設定:透過 seed 引數設定隨機種子,確保實驗結果的可重現性,這是機器學習實驗中非常重要的環節。
  3. Spark 資料分割機制:利用 Spark 的 randomSplit 方法進行資料分割,這種方式能夠有效地處理大規模資料集。

資料轉換為 NumPy 陣列(convert_to_numpy)方法解析

convert_to_numpy 方法的主要功能是將 Spark DataFrame 轉換為 TensorFlow 相容的 NumPy 陣列,這是因為 TensorFlow 的運算核心是根據 NumPy 資料結構。

實作細節與技術考量

def convert_to_numpy(self, train_df, test_df):
    train_features = np.array(train_df.select('scaled_features').rdd.map(lambda x: x.scaled_features.toArray()).collect())
    train_labels = np.array(train_df.select('Close').rdd.map(lambda x: x.Close).collect())
    test_features = np.array(test_df.select('scaled_features').rdd.map(lambda x: x.scaled_features.toArray()).collect())
    test_labels = np.array(test_df.select('Close').rdd.map(lambda x: x.Close).collect())
    return train_features, train_labels, test_features, test_labels

內容解密:

  1. 欄位選擇與轉換:方法中使用 select 操作選擇特定的欄位,並透過 rdd.map 將資料轉換為 NumPy 陣列。
  2. RDD 操作最佳化:利用 Spark 的 RDD(Resilient Distributed Dataset)進行分散式資料處理,提升大資料集的處理效率。
  3. NumPy 陣列建立:最終將處理好的資料收集並轉換為 NumPy 陣列,以供 TensorFlow 使用。

建立與訓練模型(create_and_train_model)方法詳解

此方法負責建立神經網路模型並進行訓練,使用 TensorFlow 的 Keras API 建構多層感知器(MLP)模型。

實作細節與技術考量

def create_and_train_model(self, train_features, train_labels, epochs=100, batch_size=32):
    self.input_shape = (train_features.shape[1],)
    self.model = Sequential([
        Dense(64, activation='relu', input_shape=self.input_shape),
        Dense(32, activation='relu'),
        Dense(16, activation='relu'),
        Dense(1)
    ])
    self.model.compile(optimizer='adam', loss='mse')
    self.model.fit(train_features, train_labels, epochs=epochs, batch_size=batch_size)

內容解密:

  1. 模型架構設計:採用四層的神經網路結構,使用 ReLU 作為隱藏層的啟用函式,最後一層輸出單一值。
  2. 模型編譯與訓練:使用 Adam 最佳化器和均方誤差(MSE)作為損失函式進行模型編譯,並根據設定的 epoch 和 batch size 進行訓練。
  3. 超引數選擇:預設的 epoch 為 100,batch size 為 32,這些引數根據常見的實踐經驗進行設定,以平衡訓練效率和模型效能。

模型評估(evaluate_model)方法解析

此方法用於評估訓練好的模型在測試資料上的表現。

實作細節與技術考量

def evaluate_model(self, test_features, test_labels):
    test_loss = self.model.evaluate(test_features, test_labels)
    return test_loss

內容解密:

  1. 模型評估指標:使用測試資料集對模型進行評估,回傳測試損失值(test loss),這是評估迴歸模型的重要指標。
  2. TensorFlow 模型評估機制:直接呼叫 TensorFlow 模型的 evaluate 方法,對測試資料進行損失計算。

使用 TensorFlow 進行迴歸分析的深度學習模型評估

在深度學習領域中,模型的評估是至關重要的步驟。本章節將探討如何使用 TensorFlow 建立迴歸模型,並透過各種指標評估其效能。

模型評估方法:evaluate_model

evaluate_model 方法的主要功能是評估訓練好的神經網路模型在測試資料上的表現。該方法透過計算測試資料的損失值來評估模型的效能。

引數說明

  • test_features (ndarray):包含測試特徵的 NumPy 陣列
  • test_labels (ndarray):包含對應測試標籤的 NumPy 陣列

傳回值

  • float:測試損失值,代表模型在測試資料上的表現

評估流程

  1. 使用訓練好的模型 (self.model) 的 evaluate 方法計算測試資料的損失值。
  2. 將測試特徵和標籤傳遞給 evaluate 方法,該方法根據模型的預測值和真實標籤計算損失值。

模型預測與評估:predict_and_evaluate

predict_and_evaluate 方法的主要功能是使用訓練好的神經網路模型進行預測,並透過 R-squared 分數評估模型的效能。

引數說明

  • test_features (ndarray):包含測試特徵的 NumPy 陣列
  • test_labels (ndarray):包含對應測試標籤的 NumPy 陣列

傳回值

  • float:R-squared 分數,代表模型對測試資料的預測能力

預測與評估流程

  1. 使用訓練好的模型 (self.model) 的 predict 方法對測試特徵進行預測。
  2. 將預測值與真實標籤進行比較,計算 R-squared 分數。
  3. R-squared 分數代表模型對測試資料的預測能力,分數越高代表模型越準確。

實際值與預測值視覺化:plot_actual_vs_predicted

plot_actual_vs_predicted 方法的主要功能是視覺化實際值與預測值的關係。

引數說明

  • tsla_regressor (TSLARegressor):TSLARegressor 類別的例項,包含訓練好的神經網路模型。
  • test_features (ndarray):包含測試特徵的 NumPy 陣列。
  • test_labels (ndarray):包含對應測試標籤的 NumPy 陣列。

視覺化流程

  1. 使用訓練好的模型進行預測,並將預測值儲存在 test_predictions 中。
  2. 列印實際值與預測值的對比。
  3. 使用 Matplotlib 繪製實際值與預測值的散點圖,並新增對角線以代表理想的預測結果。
def plot_actual_vs_predicted(tsla_regressor, test_features, test_labels):
    test_predictions = tsla_regressor.model.predict(test_features)
    print("Actual Predicted")
    for actual, predicted in zip(test_labels, test_predictions):
        print(f"{actual:.2f} {predicted[0]:.2f}")
    
    plt.figure(figsize=(10, 6))
    plt.scatter(test_labels, test_predictions, color='blue')
    plt.plot([test_labels.min(), test_labels.max()], [test_labels.min(), test_labels.max()], 'k--', lw=3)
    plt.xlabel('Actual')
    plt.ylabel('Predicted')
    plt.title('Actual vs. Predicted Values')
    plt.show()

程式碼解析:

  • 該函式首先使用訓練好的模型進行預測,並將結果儲存在 test_predictions
  • 接著,列印實際值與預測值的對比,以便進行詳細比較。
  • 使用 Matplotlib 繪製散點圖,將實際值與預測值進行視覺化比較。
  • 對角線代表理想的預測結果,點越接近對角線,表示模型的預測越準確。

TensorFlow深度學習應用於特斯拉股票價格預測

主函式定義與工作流程協調

在這個步驟中,我們定義了主函式main,它作為指令碼的主要進入點,負責協調整個工作流程。該函式接受一個引數file_path,代表包含待處理資料的CSV檔案路徑。

def main(file_path):
    """
    主函式,協調整個工作流程。
    引數:
    - file_path (str):CSV檔案的路徑。
    傳回:
    - None
    """
    try:
        tsla_regressor = TSLARegressor()
        train_df, test_df = tsla_regressor.preprocess_data(file_path)
        train_features, train_labels, test_features, test_labels = tsla_regressor.convert_to_numpy(train_df, test_df)
        tsla_regressor.create_and_train_model(train_features, train_labels)
        test_loss = tsla_regressor.evaluate_model(test_features, test_labels)
        r2_score_value = tsla_regressor.predict_and_evaluate(test_features, test_labels)
        logging.info("測試損失:{}".format(test_loss))
        logging.info("R2評分:{}".format(r2_score_value))
        plot_actual_vs_predicted(tsla_regressor, test_features, test_labels)
    except Exception as e:
        logging.error(f"發生錯誤:{e}")

內容解密:

  1. 主函式定義:定義了main函式作為程式的主要進入點,接受一個CSV檔案路徑作為引數。
  2. TSLARegressor例項化:建立了一個TSLARegressor類別的例項tsla_regressor,用於處理資料和模型訓練。
  3. 資料預處理:呼叫了preprocess_data方法對指定路徑的CSV檔案進行資料預處理,將資料分為訓練集和測試集。
  4. 轉換為NumPy陣列:使用convert_to_numpy方法將Spark DataFrames轉換為NumPy陣列,以便於後續的模型訓練和評估。
  5. 模型建立與訓練:透過create_and_train_model方法建立並訓練神經網路模型,使用訓練特徵和標籤進行訓練。
  6. 模型評估:使用evaluate_model方法評估模型在測試資料上的表現,計算測試損失。
  7. 預測與評估:呼叫predict_and_evaluate方法對測試資料進行預測,並計算R2評分。
  8. 結果記錄:將測試損失和R2評分透過logging模組記錄下來。
  9. 錯誤處理:捕捉並記錄執行過程中可能發生的任何例外。

條件區塊與程式執行

最後,我們使用一個條件區塊來檢查指令碼是否被直接執行。

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
    main(file_path)

內容解密:

  1. __name__ == "__main__"檢查:判斷指令碼是否被直接執行,而不是被當作模組匯入到其他指令碼中。
  2. 日誌組態:設定日誌系統以顯示INFO級別或更高階別的日誌訊息。
  3. 檔案路徑指定:定義了包含TSLA股票資料的CSV檔案路徑。
  4. 主函式呼叫:呼叫main函式並傳入指定的檔案路徑,開始整個工作流程。

程式執行與輸出結果

執行程式後,我們得到了模型的訓練損失、測試損失和R2評分等輸出結果。

訓練損失

顯示了每個訓練迭代(epoch)的損失值,隨著訓練的進行,損失值逐漸降低,表明模型正在學習並更好地擬合訓練資料。

測試損失

代表模型在測試資料集上的平均損失值(均方誤差),數值越低表示模型的預測越準確。

R2評分

用於衡量模型對目標變數(股票價格)的解釋能力,越接近1表示模型的預測能力越強。