深度學習技術已廣泛應用於金融領域,本文將以特斯拉股票價格預測為例,示範如何使用 TensorFlow 建構一個有效的預測模型。首先,我們利用 Spark 的 randomSplit 方法將資料集分割為訓練集和測試集,並設定隨機種子以確保結果可重現。接著,使用 convert_to_numpy 方法將 Spark DataFrame 轉換為 NumPy 陣列,以供 TensorFlow 使用。模型的建立採用 Keras API,設計一個包含多層感知器(MLP)的迴歸模型,並使用 Adam 最佳化器與均方誤差(MSE)損失函式進行訓練。最後,我們使用測試損失和 R-squared 分數評估模型的預測效能,並繪製實際值與預測值的散點圖,以視覺化模型的預測結果。程式碼中也包含了詳細的引數說明、傳回值定義以及流程解析,方便讀者理解和應用。
TSLARegressor 類別方法實作與深度解析
資料分割方法(split_data)深度剖析
TSLARegressor 類別中的 split_data 方法是資料預處理的關鍵步驟,負責將原始資料集分割為訓練集與測試集。此方法採用 Apache Spark 的 randomSplit 功能,能夠根據設定的比例精確地將 DataFrame 分割。
實作細節與技術考量
def split_data(self, df, train_ratio=0.8, seed=42):
train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=seed)
return train_df, test_df
內容解密:
- 資料分割比例控制:
train_ratio引數允許使用者自訂訓練資料的比例,預設值為 0.8,確保模型有足夠的資料進行訓練。 - 隨機種子設定:透過
seed引數設定隨機種子,確保實驗結果的可重現性,這是機器學習實驗中非常重要的環節。 - Spark 資料分割機制:利用 Spark 的
randomSplit方法進行資料分割,這種方式能夠有效地處理大規模資料集。
資料轉換為 NumPy 陣列(convert_to_numpy)方法解析
convert_to_numpy 方法的主要功能是將 Spark DataFrame 轉換為 TensorFlow 相容的 NumPy 陣列,這是因為 TensorFlow 的運算核心是根據 NumPy 資料結構。
實作細節與技術考量
def convert_to_numpy(self, train_df, test_df):
train_features = np.array(train_df.select('scaled_features').rdd.map(lambda x: x.scaled_features.toArray()).collect())
train_labels = np.array(train_df.select('Close').rdd.map(lambda x: x.Close).collect())
test_features = np.array(test_df.select('scaled_features').rdd.map(lambda x: x.scaled_features.toArray()).collect())
test_labels = np.array(test_df.select('Close').rdd.map(lambda x: x.Close).collect())
return train_features, train_labels, test_features, test_labels
內容解密:
- 欄位選擇與轉換:方法中使用
select操作選擇特定的欄位,並透過rdd.map將資料轉換為 NumPy 陣列。 - RDD 操作最佳化:利用 Spark 的 RDD(Resilient Distributed Dataset)進行分散式資料處理,提升大資料集的處理效率。
- NumPy 陣列建立:最終將處理好的資料收集並轉換為 NumPy 陣列,以供 TensorFlow 使用。
建立與訓練模型(create_and_train_model)方法詳解
此方法負責建立神經網路模型並進行訓練,使用 TensorFlow 的 Keras API 建構多層感知器(MLP)模型。
實作細節與技術考量
def create_and_train_model(self, train_features, train_labels, epochs=100, batch_size=32):
self.input_shape = (train_features.shape[1],)
self.model = Sequential([
Dense(64, activation='relu', input_shape=self.input_shape),
Dense(32, activation='relu'),
Dense(16, activation='relu'),
Dense(1)
])
self.model.compile(optimizer='adam', loss='mse')
self.model.fit(train_features, train_labels, epochs=epochs, batch_size=batch_size)
內容解密:
- 模型架構設計:採用四層的神經網路結構,使用 ReLU 作為隱藏層的啟用函式,最後一層輸出單一值。
- 模型編譯與訓練:使用 Adam 最佳化器和均方誤差(MSE)作為損失函式進行模型編譯,並根據設定的 epoch 和 batch size 進行訓練。
- 超引數選擇:預設的 epoch 為 100,batch size 為 32,這些引數根據常見的實踐經驗進行設定,以平衡訓練效率和模型效能。
模型評估(evaluate_model)方法解析
此方法用於評估訓練好的模型在測試資料上的表現。
實作細節與技術考量
def evaluate_model(self, test_features, test_labels):
test_loss = self.model.evaluate(test_features, test_labels)
return test_loss
內容解密:
- 模型評估指標:使用測試資料集對模型進行評估,回傳測試損失值(test loss),這是評估迴歸模型的重要指標。
- TensorFlow 模型評估機制:直接呼叫 TensorFlow 模型的
evaluate方法,對測試資料進行損失計算。
使用 TensorFlow 進行迴歸分析的深度學習模型評估
在深度學習領域中,模型的評估是至關重要的步驟。本章節將探討如何使用 TensorFlow 建立迴歸模型,並透過各種指標評估其效能。
模型評估方法:evaluate_model
evaluate_model 方法的主要功能是評估訓練好的神經網路模型在測試資料上的表現。該方法透過計算測試資料的損失值來評估模型的效能。
引數說明
test_features(ndarray):包含測試特徵的 NumPy 陣列test_labels(ndarray):包含對應測試標籤的 NumPy 陣列
傳回值
- float:測試損失值,代表模型在測試資料上的表現
評估流程
- 使用訓練好的模型 (
self.model) 的evaluate方法計算測試資料的損失值。 - 將測試特徵和標籤傳遞給
evaluate方法,該方法根據模型的預測值和真實標籤計算損失值。
模型預測與評估:predict_and_evaluate
predict_and_evaluate 方法的主要功能是使用訓練好的神經網路模型進行預測,並透過 R-squared 分數評估模型的效能。
引數說明
test_features(ndarray):包含測試特徵的 NumPy 陣列test_labels(ndarray):包含對應測試標籤的 NumPy 陣列
傳回值
- float:R-squared 分數,代表模型對測試資料的預測能力
預測與評估流程
- 使用訓練好的模型 (
self.model) 的predict方法對測試特徵進行預測。 - 將預測值與真實標籤進行比較,計算 R-squared 分數。
- R-squared 分數代表模型對測試資料的預測能力,分數越高代表模型越準確。
實際值與預測值視覺化:plot_actual_vs_predicted
plot_actual_vs_predicted 方法的主要功能是視覺化實際值與預測值的關係。
引數說明
tsla_regressor(TSLARegressor):TSLARegressor 類別的例項,包含訓練好的神經網路模型。test_features(ndarray):包含測試特徵的 NumPy 陣列。test_labels(ndarray):包含對應測試標籤的 NumPy 陣列。
視覺化流程
- 使用訓練好的模型進行預測,並將預測值儲存在
test_predictions中。 - 列印實際值與預測值的對比。
- 使用 Matplotlib 繪製實際值與預測值的散點圖,並新增對角線以代表理想的預測結果。
def plot_actual_vs_predicted(tsla_regressor, test_features, test_labels):
test_predictions = tsla_regressor.model.predict(test_features)
print("Actual Predicted")
for actual, predicted in zip(test_labels, test_predictions):
print(f"{actual:.2f} {predicted[0]:.2f}")
plt.figure(figsize=(10, 6))
plt.scatter(test_labels, test_predictions, color='blue')
plt.plot([test_labels.min(), test_labels.max()], [test_labels.min(), test_labels.max()], 'k--', lw=3)
plt.xlabel('Actual')
plt.ylabel('Predicted')
plt.title('Actual vs. Predicted Values')
plt.show()
程式碼解析:
- 該函式首先使用訓練好的模型進行預測,並將結果儲存在
test_predictions。 - 接著,列印實際值與預測值的對比,以便進行詳細比較。
- 使用 Matplotlib 繪製散點圖,將實際值與預測值進行視覺化比較。
- 對角線代表理想的預測結果,點越接近對角線,表示模型的預測越準確。
TensorFlow深度學習應用於特斯拉股票價格預測
主函式定義與工作流程協調
在這個步驟中,我們定義了主函式main,它作為指令碼的主要進入點,負責協調整個工作流程。該函式接受一個引數file_path,代表包含待處理資料的CSV檔案路徑。
def main(file_path):
"""
主函式,協調整個工作流程。
引數:
- file_path (str):CSV檔案的路徑。
傳回:
- None
"""
try:
tsla_regressor = TSLARegressor()
train_df, test_df = tsla_regressor.preprocess_data(file_path)
train_features, train_labels, test_features, test_labels = tsla_regressor.convert_to_numpy(train_df, test_df)
tsla_regressor.create_and_train_model(train_features, train_labels)
test_loss = tsla_regressor.evaluate_model(test_features, test_labels)
r2_score_value = tsla_regressor.predict_and_evaluate(test_features, test_labels)
logging.info("測試損失:{}".format(test_loss))
logging.info("R2評分:{}".format(r2_score_value))
plot_actual_vs_predicted(tsla_regressor, test_features, test_labels)
except Exception as e:
logging.error(f"發生錯誤:{e}")
內容解密:
- 主函式定義:定義了
main函式作為程式的主要進入點,接受一個CSV檔案路徑作為引數。 - TSLARegressor例項化:建立了一個
TSLARegressor類別的例項tsla_regressor,用於處理資料和模型訓練。 - 資料預處理:呼叫了
preprocess_data方法對指定路徑的CSV檔案進行資料預處理,將資料分為訓練集和測試集。 - 轉換為NumPy陣列:使用
convert_to_numpy方法將Spark DataFrames轉換為NumPy陣列,以便於後續的模型訓練和評估。 - 模型建立與訓練:透過
create_and_train_model方法建立並訓練神經網路模型,使用訓練特徵和標籤進行訓練。 - 模型評估:使用
evaluate_model方法評估模型在測試資料上的表現,計算測試損失。 - 預測與評估:呼叫
predict_and_evaluate方法對測試資料進行預測,並計算R2評分。 - 結果記錄:將測試損失和R2評分透過logging模組記錄下來。
- 錯誤處理:捕捉並記錄執行過程中可能發生的任何例外。
條件區塊與程式執行
最後,我們使用一個條件區塊來檢查指令碼是否被直接執行。
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
file_path = "/home/ubuntu/airflow/dags/TSLA_stock.csv"
main(file_path)
內容解密:
__name__ == "__main__"檢查:判斷指令碼是否被直接執行,而不是被當作模組匯入到其他指令碼中。- 日誌組態:設定日誌系統以顯示INFO級別或更高階別的日誌訊息。
- 檔案路徑指定:定義了包含TSLA股票資料的CSV檔案路徑。
- 主函式呼叫:呼叫
main函式並傳入指定的檔案路徑,開始整個工作流程。
程式執行與輸出結果
執行程式後,我們得到了模型的訓練損失、測試損失和R2評分等輸出結果。
訓練損失
顯示了每個訓練迭代(epoch)的損失值,隨著訓練的進行,損失值逐漸降低,表明模型正在學習並更好地擬合訓練資料。
測試損失
代表模型在測試資料集上的平均損失值(均方誤差),數值越低表示模型的預測越準確。
R2評分
用於衡量模型對目標變數(股票價格)的解釋能力,越接近1表示模型的預測能力越強。