Moirai 模型採用先進的神經網路架構,能處理多頻率資料並提供多場景機率預測,適用於各種時序預測任務。使用 Moirai 進行預測,首先需準備時序資料,並將其分割成訓練集和測試集。接著,初始化 Moirai 模型並設定相關引數,例如預測長度、上下文長度、片段大小和樣本數等。然後,計算過去目標值並進行預測。最後,計算錯誤度量(例如 MSE、RMSE、MAPE、R² 和調整後的 R²)以評估模型效能,並將預測結果視覺化。TimesFM 模型則是一個零樣本學習的通用時序資料預測模型,它利用過去的時序資料作為上下文來預測未來的資料。TimesFM 模型的核心概念包括 Decoder-only 架構、Patching、Output patches 長度和 Patch masking 等技術。其架構包含 Token 化、區塊化、殘差塊、位置編碼和 Transformer 層堆積疊等模組。在實際應用中,TimesFM 模型的程式碼實作包含殘差塊的定義、TimesFM 模型的構建、前向傳播的執行以及損失計算和反向傳播的過程。
時序預測的全方位模型:Moirai
在時間序列預測領域,Moirai 模型因其強大的預測能力和靈活性,迅速成為學術界和產業界的焦點。Moirai 模型採用了先進的神經網路架構,能夠處理各種頻率的資料並提供多場景的機率預測。本文將探討 Moirai 模型的使用方法,並透過實際案例展示其在時間序列預測中的應用。
資料準備
首先,我們需要準備好時間序列資料。這裡我們使用了經典的 AirPassengers 資料集,該資料集包含了 12 年的月度乘客資料。以下是我們如何載入和處理這些資料:
import pandas as pd
from gluonts.dataset.pandas import PandasDataset
from gluonts.dataset.split import split
from uni2ts.eval_util.plot import plot_single, plot_next_multi
from uni2ts.model.moirai import MoiraiForecast, MoiraiModule
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, r2_score
import warnings
import torch
from einops import rearrange
warnings.filterwarnings('ignore')
# 載入 AirPassengers 資料集
df = pd.read_csv('AirPassengersDataset.csv')
df.rename(columns={'y': 'target'}, inplace=True)
df.drop(columns=['unique_id'], inplace=True)
df["ds"] = pd.to_datetime(df["ds"])
df.set_index("ds", inplace=True)
print(f"總長度: {df.shape[0]}")
print(f"時間頻率: {df.index.diff()[1]}")
內容解密:
在這段程式碼中,我們首先載入了 AirPassengers 資料集,並對其進行基本處理。具體步驟如下:
- 使用
pandas讀取 CSV 檔案並將目標欄位命名為target。 - 移除不必要的
unique_id欄位。 - 將日期欄位轉換為
datetime格式並設定為索引。 - 輸出資料集的總長度和時間頻率。
資料切割
接下來,我們需要將資料集切割為訓練集和測試集。這裡我們選擇前 120 個月的資料作為訓練集,後 24 個月作為測試集。
# 建立訓練和測試資料
inp = {
"target": df["target"].to_numpy()[:120],
"start": df.index[0].to_period(freq="M"),
}
label = {
"target": df["target"].to_numpy()[120:],
"start": df.index[120].to_period(freq="M"),
}
內容解密:
這段程式碼實作了資料集的切割:
- 將前 120 個月的乘客資料作為訓練輸入 (
inp)。 - 將後 24 個月的乘客資料作為標籤 (
label)。
初始化模型
現在我們初始化 Moirai 模型並定義其引數。這裡我們使用預訓練的 MoiraiModule 模型。
# 初始化 Moirai 模型
model = MoiraiForecast(
module=MoiraiModule.from_pretrained("Salesforce/moirai-1.1-R-small"),
prediction_length=24,
context_length=120,
patch_size=32,
num_samples=100,
target_dim=1,
feat_dynamic_real_dim=0,
past_feat_dynamic_real_dim=0,
)
內容解密:
這段程式碼初始化了 Moirai 模型並設定了相關引數:
- 使用預訓練的
MoiraiModule模型。 - 預測長度設為 24 個月。
- 上下文長度設為 120 個月。
- 其他引數如片段大小、樣本數等根據需求設定。
風險評估
接著,我們計算過去目標值並進行預測。
# 計算過去目標值
past_target = rearrange(
torch.as_tensor(inp["target"], dtype=torch.float32),
"t -> 1 t 1"
)
past_observed_target = torch.ones_like(past_target, dtype=torch.bool)
past_is_pad = torch.zeros_like(past_target, dtype=torch.bool).squeeze(-1)
# 執行預測
forecast = model(
past_target=past_target,
past_observed_target=past_observed_target,
past_is_pad=past_is_pad,
)
內容解密:
這段程式碼計算了過去目標值並進行預測:
- 使用
rearrange函式將訓練輸入轉換為適當的張量格式。 - 建立過去觀察到目標值和填充標誌。
- 呼叫模型進行預測。
錯誤度量計算
最後,我們計算錯誤度量以評估模型效能。
# 計算錯誤度量
def calculate_error_metrics(y_true, y_pred):
mse = mean_squared_error(y_true, y_pred)
rmse = mse ** 0.5
mape = mean_absolute_percentage_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
adjusted_r2 = 1 - (1 - r2) * (len(y_true) - 1) / (len(y_true) - len(model.parameters()) - 1)
return mse, rmse, mape, r2, adjusted_r2
error_metrics = calculate_error_metrics(label["target"], np.round(np.median(forecast[0], axis=0), decimals=4))
print(f"MSE: {error_metrics[0]}")
print(f"RMSE: {error_metrics[1]}")
print(f"MAPE: {error_metrics[2]}")
print(f"R²: {error_metrics[3]}")
print(f"Adjusted R²: {error_metrics[4]}")
內容解密:
這段程式碼計算了錯誤度量以評估模型效能:
- 喚用自定義函式
calculate_error_metrics,計算均方誤差(MSE)、均方根誤差(RMSE)、平均絕對百分比誤差(MAPE)、R² 和調整後的 R²。 - 輸出各項錯誤度量。
預測視覺化
最終,我們可以將模型的預測結果視覺化以便更直觀地瞭解其效能。
# 資料準備
df_test = df["target"][120:]
df_train = df["target"][:120]
df_test = df_test.reset_index().rename(columns={"index":"ds"})
df_train = df_train.reset_index().rename(columns={"index":"ds"})
df_test['Predicted'] = pd.Series(np.round(np.median(forecast[0], axis=0), decimals=4))
df_train.set_index('ds', inplace=True)
df_test.set_index('ds', inplace=True)
# 基礎圖表繪製
import matplotlib.pyplot as plt
plt.figure(figsize=(20, 5))
y_past = df_train["target"]
y_pred = df_test['Predicted']
y_test = df_test["target"]
plt.plot(y_past, label="歷史時序值")
plt.plot(y_pred, label="預測")
plt.plot(y_test, label="實際時序值")
plt.title('AirPassengers 風險評估', fontsize=16)
plt.ylabel('月度乘客', fontsize=14)
plt.xlabel('時間戳 [t]', fontsize=14)
plt.legend()
plt.show()
接下來你會看到以下
此圖示展示了 Moirai 模型對於乘客數量的預測結果。可以看出,模型的預測結果與實際值相較之下是接近現實情況。
時間序列預測的新里程碑:TimesFM
TimesFM 的技術概述
時序資料預測模型的設計挑戰與自然語言處理(NLP)或電腦視覺領域的模型不同。語言有語法規則和字母限制,影像可以用有限的畫素來描述,然而時間無始無終。TimesFM 模型旨在解決這些挑戰,開發一個零複製(zero-shot)預測能力的通用時序資料預測模型。這個模型利用過去的時序資料作為上下文,來預測未來的資料。
TimesFM 的核心概念
TimesFM 的設計利用了以下幾個關鍵概念:
- Decoder-only 模型:只包含解碼器階段,專注於自迴歸任務,即根據先前生成的標記來預測下一個標記。
- Patching:將時序資料分割成多個小區塊(patches),這些區塊在訓練過程中被使用。
- Output patches 的長度:較長的輸出區塊有助於長期預測。
- Patch masking:在訓練過程中隨機隱藏一些區塊,以避免過擬合。
完整架構
以下是 TimesFM 在訓練過程中的整體架構:
@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle
title Moirai 與 TimesFM:時序預測模型深度解析
package "機器學習流程" {
package "資料處理" {
component [資料收集] as collect
component [資料清洗] as clean
component [特徵工程] as feature
}
package "模型訓練" {
component [模型選擇] as select
component [超參數調優] as tune
component [交叉驗證] as cv
}
package "評估部署" {
component [模型評估] as eval
component [模型部署] as deploy
component [監控維護] as monitor
}
}
collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型
note right of feature
特徵工程包含:
- 特徵選擇
- 特徵轉換
- 降維處理
end note
note right of eval
評估指標:
- 準確率/召回率
- F1 Score
- AUC-ROC
end note
@enduml此圖示展示了從輸入時序資料到生成預測區塊的整個過程。資料首先被分割成區塊(patches),然後轉換成向量,經過位置編碼後進入 Transformer 層堆積疊,最終生成輸出區塊。
內容解密:
- Token化:將時序資料轉換為標記(tokens),這些標記是模型處理和計算的基礎。
- 區塊化:將標記分割成小區塊(patches),這樣可以減少輸入給解碼器的標記數量,提高訓練和推理速度。
- 殘差塊:將區塊轉換成向量,這些向量是高維度空間中的數值表示。
- 位置編碼:為每個標記新增位置資訊,幫助模型理解標記的順序。
- Transformer 層堆積疊:利用多頭自注意力機制和前鋒網路來處理標記,確保模型只考慮之前的標記來預測下一個標記。
- Output patches:將輸出標記轉換為輸出區塊,這些區塊代表模型對未來時序資料的預測。
TimesFM 的實際應用
以下是 TimesFM 的具體實作範例:
import torch
import torch.nn as nn
import torch.optim as optim
class ResidualBlock(nn.Module):
def __init__(self, input_dim, output_dim):
super(ResidualBlock, self).__init__()
self.fc1 = nn.Linear(input_dim, input_dim)
self.fc2 = nn.Linear(input_dim, output_dim)
self.skip_connection = nn.Linear(input_dim, output_dim)
def forward(self, x):
residual = self.skip_connection(x)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x + residual
class TimesFM(nn.Module):
def __init__(self, input_patch_len, output_patch_len, model_dim):
super(TimesFM, self).__init__()
self.input_patch_len = input_patch_len
self.output_patch_len = output_patch_len
self.model_dim = model_dim
self.residual_block = ResidualBlock(input_patch_len, model_dim)
self.transformer_layers = nn.TransformerEncoderLayer(d_model=model_dim, nhead=8)
self.transformer_stack = nn.TransformerEncoder(self.transformer_layers, num_layers=6)
def forward(self, x):
x = self.residual_block(x)
x = x.permute(1, 0) # Change sequence length to batch size
x = self.transformer_stack(x)
x = x.permute(1, 0) # Change back to original shape
return x
# 模型引數設定
input_patch_len = 64
output_patch_len = 128
model_dim = 512
# 建立模型、損失函式及最佳化器
model = TimesFM(input_patch_len, output_patch_len, model_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 假設我們有一些訓練資料
input_data = torch.randn((32, input_patch_len)) # 假設 batch size 是 32
# 前向傳播
output_data = model(input_data)
# 損失計算及反向傳播
loss = criterion(output_data, target_data) # 假設 target_data 是目標值
optimizer.zero_grad()
loss.backward()
optimizer.step()
內容解密:
- ResidualBlock:實作殘差連線(skip connection)以保持深層網路中的資訊流通。這裡我們使用兩層全連線層來進行轉換。
- TimesFM 模型構建:TimesFM 模型包含一個殘差連線層、多頭自注意力層和 Transformer 編碼器層堆積疊。這些層次結合在一起來處理和預測時序資料。
- 前向傳播:我們首先使用殘差連線層將輸入轉換成高維度向量,然後透過 Transformer 編碼器層堆積疊進行處理。最後將結果再次轉換回原始形狀以進行後續操作。
- 損失計算與反向傳播:使用均方誤差(MSE)作為損失函式進行損失計算及反向傳播更新模型引數。