時間序列分析是統計學與機器學習領域的核心技術之一,廣泛應用於金融市場預測、氣象預報、需求預測、環境監測等領域。與傳統的橫斷面資料分析不同,時間序列資料具有時間相依性,即當前時刻的觀測值受到過去時刻的影響。這種時間相關性使得時間序列分析需要特殊的建模方法與統計技術,無法直接套用傳統的迴歸或分類演算法。

時間序列資料的核心特徵包含趨勢、季節性、週期性與隨機性四個組成要素。趨勢反映長期的上升或下降走勢,季節性體現固定週期的模式變化,週期性表現不固定週期的波動,隨機性則代表無法預測的雜訊成分。理解並分離這些組成要素,是建立有效預測模型的基礎。時間序列分解技術透過數學方法將觀測值拆解為這些組成要素,幫助分析師識別資料的內在結構。

經典的時間序列建模方法以 Box-Jenkins 方法論為代表,包含自迴歸模型、移動平均模型與其組合形式。這些模型假設時間序列滿足平穩性條件,即統計特性不隨時間改變。對於非平穩序列,需要透過差分或變換達到平穩性。模型識別透過自相關函數與偏自相關函數判斷適當的模型階數,參數估計採用最大概似法或最小平方法,診斷檢驗確保殘差符合白雜訊假設。

現代時間序列分析融合機器學習與深度學習技術。Prophet 模型採用可加性時間序列模型,自動處理趨勢與季節性,適合具有強季節模式與多個季節性週期的商業時間序列。長短期記憶網路作為遞迴神經網路的變體,能夠學習長期依賴關係,適合複雜的非線性時間序列。這些現代方法降低建模門檻,但需要更多的訓練資料與計算資源。

本文將建立完整的時間序列分析與預測建模框架。從理論基礎開始,介紹時間序列的基本概念與數學定義。接著詳細解析 ARIMA 模型族的建模流程,包含平穩性檢驗、模型識別、參數估計與診斷。實作 Prophet 與 LSTM 等現代方法,比較不同演算法的預測效能。透過環境汙染監測與房地產價格預測的實際案例,展示如何將理論應用於實務問題。整個流程涵蓋資料前處理、探索性分析、模型建立、效能評估與預測解讀。

時間序列基礎理論與分解技術

時間序列的數學定義為在一系列離散時間點觀測的隨機變數序列。形式化表示為 {Y_t, t = 1, 2, …, T},其中 Y_t 表示時刻 t 的觀測值,T 為序列長度。時間序列分析的目標是理解序列的生成機制,建立數學模型描述觀測值的時間依賴結構,並利用歷史資料預測未來值。

平穩性是時間序列分析的核心概念。嚴平穩要求序列的聯合機率分佈不隨時間平移改變,這個條件過於嚴格難以驗證。實務上採用弱平穩或寬平穩定義,要求序列的期望值、變異數為常數,且自相關函數只依賴時間差而非絕對時間。平穩序列的統計特性穩定,使得可以用有限樣本估計總體參數並進行可靠推論。

時間序列分解將觀測值分解為趨勢、季節性與殘差三個組成部分。加法模型假設 Y_t = T_t + S_t + R_t,其中 T_t 為趨勢成分,S_t 為季節成分,R_t 為殘差。乘法模型假設 Y_t = T_t × S_t × R_t,適用於季節波動幅度隨水準變化的情況。分解方法包含經典分解、X-11 分解與 STL 分解等。STL 分解採用局部加權迴歸,能夠處理複雜的季節模式且對異常值穩健。

自相關函數量化序列與其滯後值的線性相關性。定義為 ρ_k = Cov(Y_t, Y_{t-k}) / Var(Y_t),表示當前值與 k 期前值的相關係數。ACF 圖展示不同滯後期的自相關係數,幫助識別序列的時間依賴結構。偏自相關函數移除中間滯後期的影響,測量當前值與特定滯後期的直接相關性。PACF 在模型識別中扮演關鍵角色,與 ACF 共同決定 ARIMA 模型的階數。

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller, kpss
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

class TimeSeriesAnalyzer:
    """
    時間序列分析系統
    提供時間序列分解、平穩性檢驗、ACF/PACF 分析等功能
    """
    
    def __init__(self):
        """
        初始化時間序列分析器
        """
        self.ts_data = None
        self.decomposition_result = None
        
        # 設定視覺化樣式
        plt.style.use('seaborn-v0_8-whitegrid')
        sns.set_palette("husl")
    
    def load_time_series(
        self, 
        data: pd.Series, 
        freq: str = None
    ) -> pd.Series:
        """
        載入時間序列資料
        
        參數:
            data: 時間序列資料(Pandas Series)
            freq: 時間頻率('D'=日、'M'=月、'Y'=年等)
            
        回傳:
            處理後的時間序列
        """
        # 確保索引為 DatetimeIndex
        if not isinstance(data.index, pd.DatetimeIndex):
            print("警告: 索引不是 DatetimeIndex,嘗試轉換...")
            try:
                data.index = pd.to_datetime(data.index)
            except:
                print("錯誤: 無法轉換索引為日期時間格式")
                return None
        
        # 設定頻率
        if freq:
            data = data.asfreq(freq)
        
        # 處理缺失值(線性插補)
        if data.isnull().any():
            missing_count = data.isnull().sum()
            print(f"發現 {missing_count} 個缺失值,使用線性插補處理")
            data = data.interpolate(method='linear')
        
        self.ts_data = data
        
        print(f"成功載入時間序列資料")
        print(f"  期間: {data.index[0]}{data.index[-1]}")
        print(f"  觀測數: {len(data)}")
        print(f"  頻率: {data.index.freq}")
        
        return data
    
    def descriptive_analysis(self) -> dict:
        """
        時間序列描述統計分析
        
        回傳:
            統計量字典
        """
        if self.ts_data is None:
            print("錯誤: 尚未載入時間序列資料")
            return {}
        
        stats_dict = {
            '平均值': self.ts_data.mean(),
            '中位數': self.ts_data.median(),
            '標準差': self.ts_data.std(),
            '最小值': self.ts_data.min(),
            '最大值': self.ts_data.max(),
            '偏度': self.ts_data.skew(),
            '峰度': self.ts_data.kurtosis(),
            '變異係數': self.ts_data.std() / self.ts_data.mean()
        }
        
        print("\n時間序列描述統計")
        print("=" * 60)
        for key, value in stats_dict.items():
            print(f"{key}: {value:.4f}")
        
        return stats_dict
    
    def stationarity_test(self) -> dict:
        """
        平穩性檢驗(ADF 檢驗與 KPSS 檢驗)
        
        回傳:
            檢驗結果字典
        """
        if self.ts_data is None:
            print("錯誤: 尚未載入時間序列資料")
            return {}
        
        print("\n平穩性檢驗")
        print("=" * 60)
        
        # Augmented Dickey-Fuller 檢驗
        # H0: 序列具有單位根(非平穩)
        # H1: 序列無單位根(平穩)
        adf_result = adfuller(self.ts_data.dropna())
        
        print("\nADF 檢驗結果:")
        print(f"  統計量: {adf_result[0]:.4f}")
        print(f"  p 值: {adf_result[1]:.4f}")
        print(f"  臨界值:")
        for key, value in adf_result[4].items():
            print(f"    {key}: {value:.4f}")
        
        adf_conclusion = "平穩" if adf_result[1] < 0.05 else "非平穩"
        print(f"  結論: 序列為{adf_conclusion} (α=0.05)")
        
        # KPSS 檢驗
        # H0: 序列為平穩
        # H1: 序列為非平穩
        kpss_result = kpss(self.ts_data.dropna(), regression='c')
        
        print("\nKPSS 檢驗結果:")
        print(f"  統計量: {kpss_result[0]:.4f}")
        print(f"  p 值: {kpss_result[1]:.4f}")
        print(f"  臨界值:")
        for key, value in kpss_result[3].items():
            print(f"    {key}: {value:.4f}")
        
        kpss_conclusion = "非平穩" if kpss_result[1] < 0.05 else "平穩"
        print(f"  結論: 序列為{kpss_conclusion} (α=0.05)")
        
        # 綜合判斷
        print("\n綜合判斷:")
        if adf_conclusion == "平穩" and kpss_conclusion == "平穩":
            final_conclusion = "序列為平穩"
        elif adf_conclusion == "非平穩" and kpss_conclusion == "非平穩":
            final_conclusion = "序列為非平穩"
        else:
            final_conclusion = "檢驗結果不一致,需進一步分析"
        print(f"  {final_conclusion}")
        
        return {
            'adf_statistic': adf_result[0],
            'adf_pvalue': adf_result[1],
            'adf_conclusion': adf_conclusion,
            'kpss_statistic': kpss_result[0],
            'kpss_pvalue': kpss_result[1],
            'kpss_conclusion': kpss_conclusion,
            'final_conclusion': final_conclusion
        }
    
    def time_series_decomposition(
        self, 
        model: str = 'additive',
        period: int = None
    ):
        """
        時間序列分解
        
        參數:
            model: 分解模型 ('additive' 或 'multiplicative')
            period: 季節週期(若為 None 則自動偵測)
        """
        if self.ts_data is None:
            print("錯誤: 尚未載入時間序列資料")
            return
        
        # 自動偵測季節週期
        if period is None:
            freq = self.ts_data.index.freq
            if freq == 'D' or freq == 'B':
                period = 7  # 週
            elif freq == 'M':
                period = 12  # 月
            elif freq == 'Q':
                period = 4  # 季
            else:
                period = 12  # 預設值
        
        print(f"\n執行時間序列分解 ({model} 模型, period={period})")
        
        # 執行分解
        self.decomposition_result = seasonal_decompose(
            self.ts_data,
            model=model,
            period=period,
            extrapolate_trend='freq'
        )
        
        # 視覺化分解結果
        fig, axes = plt.subplots(4, 1, figsize=(14, 10))
        
        # 原始序列
        self.ts_data.plot(ax=axes[0], title='原始時間序列')
        axes[0].set_ylabel('值')
        axes[0].grid(True, alpha=0.3)
        
        # 趨勢成分
        self.decomposition_result.trend.plot(ax=axes[1], title='趨勢成分')
        axes[1].set_ylabel('趨勢')
        axes[1].grid(True, alpha=0.3)
        
        # 季節成分
        self.decomposition_result.seasonal.plot(ax=axes[2], title='季節成分')
        axes[2].set_ylabel('季節性')
        axes[2].grid(True, alpha=0.3)
        
        # 殘差成分
        self.decomposition_result.resid.plot(ax=axes[3], title='殘差成分')
        axes[3].set_ylabel('殘差')
        axes[3].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/ts_decomposition.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("分解圖表已儲存至 ts_decomposition.png")
        
        # 計算分解後各成分的統計量
        print("\n各成分統計量:")
        print(f"  趨勢變異數: {self.decomposition_result.trend.var():.4f}")
        print(f"  季節變異數: {self.decomposition_result.seasonal.var():.4f}")
        print(f"  殘差變異數: {self.decomposition_result.resid.var():.4f}")
    
    def plot_acf_pacf(self, lags: int = 40):
        """
        繪製自相關函數與偏自相關函數圖
        
        參數:
            lags: 滯後期數
        """
        if self.ts_data is None:
            print("錯誤: 尚未載入時間序列資料")
            return
        
        fig, axes = plt.subplots(2, 1, figsize=(14, 8))
        
        # ACF 圖
        plot_acf(self.ts_data.dropna(), lags=lags, ax=axes[0])
        axes[0].set_title('自相關函數 (ACF)', fontsize=14)
        axes[0].set_xlabel('滯後期')
        axes[0].set_ylabel('自相關係數')
        
        # PACF 圖
        plot_pacf(self.ts_data.dropna(), lags=lags, ax=axes[1])
        axes[1].set_title('偏自相關函數 (PACF)', fontsize=14)
        axes[1].set_xlabel('滯後期')
        axes[1].set_ylabel('偏自相關係數')
        
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/acf_pacf.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("\nACF/PACF 圖表已儲存至 acf_pacf.png")
        
        # 計算前幾期的 ACF 值
        from statsmodels.tsa.stattools import acf
        acf_values = acf(self.ts_data.dropna(), nlags=10)
        
        print("\n前 10 期 ACF 值:")
        for i, val in enumerate(acf_values):
            print(f"  lag {i}: {val:.4f}")
    
    def differencing(self, order: int = 1) -> pd.Series:
        """
        執行差分轉換
        
        參數:
            order: 差分階數
            
        回傳:
            差分後的序列
        """
        if self.ts_data is None:
            print("錯誤: 尚未載入時間序列資料")
            return None
        
        print(f"\n執行 {order} 階差分")
        
        diff_data = self.ts_data.diff(periods=order).dropna()
        
        print(f"  差分前觀測數: {len(self.ts_data)}")
        print(f"  差分後觀測數: {len(diff_data)}")
        
        # 視覺化原始序列與差分後序列
        fig, axes = plt.subplots(2, 1, figsize=(14, 8))
        
        self.ts_data.plot(ax=axes[0], title='原始時間序列')
        axes[0].set_ylabel('值')
        axes[0].grid(True, alpha=0.3)
        
        diff_data.plot(ax=axes[1], title=f'{order} 階差分後序列')
        axes[1].set_ylabel('差分值')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/differencing.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("差分圖表已儲存至 differencing.png")
        
        return diff_data
    
    def generate_report(self) -> str:
        """
        產生時間序列分析報告
        
        回傳:
            格式化的報告內容
        """
        report = "時間序列分析報告\n"
        report += "=" * 60 + "\n\n"
        
        if self.ts_data is None:
            report += "錯誤: 尚未載入時間序列資料\n"
            return report
        
        report += "資料概況\n"
        report += "-" * 60 + "\n"
        report += f"期間: {self.ts_data.index[0]}{self.ts_data.index[-1]}\n"
        report += f"觀測數: {len(self.ts_data)}\n"
        report += f"頻率: {self.ts_data.index.freq}\n\n"
        
        # 描述統計
        stats = self.descriptive_analysis()
        report += "\n描述統計\n"
        report += "-" * 60 + "\n"
        for key, value in stats.items():
            report += f"{key}: {value:.4f}\n"
        
        # 平穩性檢驗結果
        stationarity = self.stationarity_test()
        report += "\n平穩性檢驗\n"
        report += "-" * 60 + "\n"
        report += f"ADF 檢驗: {stationarity.get('adf_conclusion', 'N/A')}\n"
        report += f"KPSS 檢驗: {stationarity.get('kpss_conclusion', 'N/A')}\n"
        report += f"綜合結論: {stationarity.get('final_conclusion', 'N/A')}\n"
        
        report += "\n建模建議\n"
        report += "-" * 60 + "\n"
        if stationarity.get('final_conclusion') == "序列為平穩":
            report += "1. 序列已平穩,可直接建立 ARMA 模型\n"
            report += "2. 透過 ACF/PACF 圖識別模型階數\n"
        else:
            report += "1. 序列非平穩,建議進行差分轉換\n"
            report += "2. 差分後重新檢驗平穩性\n"
            report += "3. 考慮使用 ARIMA 或 SARIMA 模型\n"
        
        return report

# 建立範例時間序列資料
np.random.seed(42)
dates = pd.date_range(start='2020-01-01', periods=365, freq='D')

# 模擬具有趨勢、季節性與隨機性的時間序列
trend = np.linspace(100, 150, 365)
seasonal = 10 * np.sin(2 * np.pi * np.arange(365) / 365)
noise = np.random.normal(0, 5, 365)
values = trend + seasonal + noise

ts_example = pd.Series(values, index=dates, name='範例時間序列')

# 使用範例
if __name__ == "__main__":
    # 建立分析器
    analyzer = TimeSeriesAnalyzer()
    
    # 載入資料
    analyzer.load_time_series(ts_example, freq='D')
    
    # 描述統計
    analyzer.descriptive_analysis()
    
    # 平穩性檢驗
    analyzer.stationarity_test()
    
    # 時間序列分解
    analyzer.time_series_decomposition(model='additive', period=365)
    
    # ACF/PACF 分析
    analyzer.plot_acf_pacf(lags=40)
    
    # 差分轉換
    diff_data = analyzer.differencing(order=1)
    
    # 產生報告
    print("\n" + analyzer.generate_report())

這個時間序列分析系統提供完整的基礎分析功能。系統首先載入時間序列資料並確保索引格式正確,處理缺失值以維持序列完整性。描述統計分析計算多種統計量,包含偏度與峰度,評估序列的分佈特徵。平穩性檢驗同時執行 ADF 與 KPSS 檢驗,從不同角度驗證序列的平穩性假設。

時間序列分解採用經典分解方法,將序列拆解為趨勢、季節與殘差成分。視覺化輸出清楚呈現各成分的時間演變,幫助分析師理解序列的內在結構。ACF 與 PACF 圖提供模型識別的重要資訊,自相關模式指引 ARIMA 模型參數的選擇。差分轉換用於消除趨勢與達到平穩性,是 ARIMA 建模的關鍵前置步驟。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:載入時間序列資料;

partition "探索性分析" {
    :時間序列視覺化\n觀察整體趨勢;
    :描述統計分析\n計算統計量;
    :時間序列分解\nSTL 或經典分解;
}

partition "平穩性分析" {
    :ADF 檢驗\n檢驗單位根;
    :KPSS 檢驗\n檢驗平穩性;
    
    if (序列平穩?) then (是)
        :保持原始序列;
    else (否)
        :執行差分轉換;
        :重新檢驗平穩性;
    endif
}

partition "模型識別" {
    :繪製 ACF 圖\n識別 MA 階數;
    :繪製 PACF 圖\n識別 AR 階數;
    :確定 ARIMA(p,d,q) 參數;
}

:準備建立預測模型;

stop

@enduml

這個流程圖描述時間序列分析的系統化流程。從載入資料開始,透過探索性分析理解序列的基本特徵與結構。時間序列分解揭示趨勢、季節與隨機成分。平穩性分析是關鍵環節,透過統計檢驗判斷序列是否滿足平穩性假設。非平穩序列需要透過差分達到平穩性。模型識別階段分析 ACF 與 PACF 圖,確定 ARIMA 模型的參數。整個流程為後續的模型建立與預測奠定基礎。

ARIMA 模型建立與預測實作

ARIMA 模型是時間序列預測的經典方法,全名為自迴歸整合移動平均模型。ARIMA(p,d,q) 包含三個組成部分:AR(p) 自迴歸項捕捉序列與其滯後值的線性關係,I(d) 整合項透過 d 階差分達到平穩性,MA(q) 移動平均項模擬當前值與過去預測誤差的關係。三個參數 p、d、q 的選擇決定模型的複雜度與預測能力。

ARIMA 模型的數學表示為 (1-Σφ_iL^i)(1-L)^d Y_t = (1+Σθ_jL^j)ε_t,其中 L 為滯後算子,φ_i 為自迴歸係數,θ_j 為移動平均係數,ε_t 為白雜訊。模型假設殘差項為獨立同分佈的常態隨機變數,期望值為零。這些假設可透過殘差診斷檢驗,包含 Ljung-Box 檢驗評估殘差的獨立性,常態性檢驗評估分佈假設。

參數估計採用最大概似法或條件最小平方法。最大概似估計在常態假設下等價於最小化殘差平方和。參數的標準誤可用於建構信賴區間與執行假設檢驗。模型選擇透過資訊準則如 AIC 與 BIC 比較不同參數組合的模型,選擇資訊準則最小的模型。這些準則在模型配適度與參數數量之間取得平衡,懲罰過度複雜的模型。

SARIMA 模型擴展 ARIMA 以處理季節性時間序列。SARIMA(p,d,q)(P,D,Q)_s 包含非季節與季節兩組參數,其中 s 為季節週期,P、D、Q 分別為季節自迴歸階數、季節差分階數與季節移動平均階數。季節性模式在商業與經濟資料中普遍存在,例如月度零售銷售展現年度季節模式,日度電力需求展現週度季節模式。SARIMA 透過季節項捕捉這些週期性波動。

import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.graphics.tsaplots import plot_predict
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt
import itertools
import warnings
warnings.filterwarnings('ignore')

class ARIMAForecaster:
    """
    ARIMA/SARIMA 預測系統
    提供模型建立、參數最佳化、預測與評估功能
    """
    
    def __init__(self):
        """
        初始化 ARIMA 預測器
        """
        self.train_data = None
        self.test_data = None
        self.model = None
        self.model_fit = None
        self.predictions = None
        self.best_params = None
    
    def split_data(
        self, 
        ts_data: pd.Series, 
        test_size: float = 0.2
    ) -> tuple:
        """
        分割訓練與測試集
        
        參數:
            ts_data: 時間序列資料
            test_size: 測試集比例
            
        回傳:
            訓練集與測試集
        """
        split_point = int(len(ts_data) * (1 - test_size))
        
        self.train_data = ts_data[:split_point]
        self.test_data = ts_data[split_point:]
        
        print(f"資料分割完成:")
        print(f"  訓練集: {len(self.train_data)} 筆 ({self.train_data.index[0]}{self.train_data.index[-1]})")
        print(f"  測試集: {len(self.test_data)} 筆 ({self.test_data.index[0]}{self.test_data.index[-1]})")
        
        return self.train_data, self.test_data
    
    def grid_search_arima(
        self,
        p_range: range = range(0, 3),
        d_range: range = range(0, 2),
        q_range: range = range(0, 3)
    ) -> dict:
        """
        網格搜尋最佳 ARIMA 參數
        
        參數:
            p_range: AR 階數搜尋範圍
            d_range: 差分階數搜尋範圍
            q_range: MA 階數搜尋範圍
            
        回傳:
            最佳參數與 AIC 值
        """
        if self.train_data is None:
            print("錯誤: 尚未分割資料")
            return {}
        
        print("\n執行 ARIMA 參數網格搜尋...")
        print(f"  p 範圍: {list(p_range)}")
        print(f"  d 範圍: {list(d_range)}")
        print(f"  q 範圍: {list(q_range)}")
        
        best_aic = np.inf
        best_params = None
        results_list = []
        
        # 搜尋所有參數組合
        for p, d, q in itertools.product(p_range, d_range, q_range):
            try:
                # 建立並訓練模型
                model = ARIMA(self.train_data, order=(p, d, q))
                model_fit = model.fit()
                
                aic = model_fit.aic
                results_list.append({
                    'order': (p, d, q),
                    'aic': aic,
                    'bic': model_fit.bic
                })
                
                # 更新最佳參數
                if aic < best_aic:
                    best_aic = aic
                    best_params = (p, d, q)
                
                print(f"  ARIMA{(p,d,q)}: AIC={aic:.2f}, BIC={model_fit.bic:.2f}")
                
            except Exception as e:
                print(f"  ARIMA{(p,d,q)}: 模型訓練失敗 - {str(e)}")
                continue
        
        # 輸出結果
        print(f"\n最佳參數: ARIMA{best_params}")
        print(f"最佳 AIC: {best_aic:.2f}")
        
        self.best_params = best_params
        
        # 建立結果 DataFrame
        results_df = pd.DataFrame(results_list)
        results_df = results_df.sort_values('aic')
        
        print("\n前 5 個最佳模型:")
        print(results_df.head())
        
        return {
            'best_order': best_params,
            'best_aic': best_aic,
            'results': results_df
        }
    
    def fit_arima(self, order: tuple = None):
        """
        訓練 ARIMA 模型
        
        參數:
            order: ARIMA 參數 (p,d,q),若為 None 則使用網格搜尋結果
        """
        if self.train_data is None:
            print("錯誤: 尚未分割資料")
            return
        
        # 使用指定參數或網格搜尋結果
        if order is None:
            if self.best_params is None:
                print("錯誤: 尚未執行網格搜尋,請指定參數")
                return
            order = self.best_params
        
        print(f"\n訓練 ARIMA{order} 模型...")
        
        # 建立並訓練模型
        self.model = ARIMA(self.train_data, order=order)
        self.model_fit = self.model.fit()
        
        # 顯示模型摘要
        print("\n模型摘要:")
        print(self.model_fit.summary())
        
        # 診斷檢驗
        self._diagnostic_tests()
    
    def _diagnostic_tests(self):
        """
        執行模型診斷檢驗
        """
        if self.model_fit is None:
            return
        
        print("\n模型診斷檢驗:")
        
        # 殘差統計
        residuals = self.model_fit.resid
        print(f"  殘差平均值: {residuals.mean():.6f}")
        print(f"  殘差標準差: {residuals.std():.4f}")
        
        # Ljung-Box 檢驗(檢驗殘差的自相關性)
        from statsmodels.stats.diagnostic import acorr_ljungbox
        lb_test = acorr_ljungbox(residuals, lags=10)
        
        print("\n  Ljung-Box 檢驗 (前 10 期):")
        print(f"    p 值最小值: {lb_test['lb_pvalue'].min():.4f}")
        
        if lb_test['lb_pvalue'].min() > 0.05:
            print("    結論: 殘差為白雜訊(模型適配良好)")
        else:
            print("    結論: 殘差存在自相關(模型可能需要調整)")
        
        # 殘差常態性檢驗
        from scipy.stats import shapiro
        stat, p_value = shapiro(residuals)
        print(f"\n  Shapiro-Wilk 常態性檢驗:")
        print(f"    統計量: {stat:.4f}")
        print(f"    p 值: {p_value:.4f}")
        
        if p_value > 0.05:
            print("    結論: 殘差符合常態分佈")
        else:
            print("    結論: 殘差不符合常態分佈")
        
        # 視覺化診斷
        self.model_fit.plot_diagnostics(figsize=(14, 10))
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/arima_diagnostics.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("\n診斷圖表已儲存至 arima_diagnostics.png")
    
    def forecast(self, steps: int = None) -> pd.Series:
        """
        執行預測
        
        參數:
            steps: 預測步數,若為 None 則預測整個測試集
            
        回傳:
            預測序列
        """
        if self.model_fit is None:
            print("錯誤: 尚未訓練模型")
            return None
        
        if steps is None:
            steps = len(self.test_data)
        
        print(f"\n執行 {steps} 步預測...")
        
        # 執行預測
        forecast_result = self.model_fit.forecast(steps=steps)
        
        # 建立預測序列(使用測試集的索引)
        if steps <= len(self.test_data):
            forecast_index = self.test_data.index[:steps]
        else:
            # 如果預測步數超過測試集,延伸索引
            last_date = self.test_data.index[-1]
            freq = self.test_data.index.freq
            forecast_index = pd.date_range(
                start=self.test_data.index[0],
                periods=steps,
                freq=freq
            )
        
        self.predictions = pd.Series(forecast_result, index=forecast_index)
        
        print(f"預測完成")
        print(f"  預測期間: {self.predictions.index[0]}{self.predictions.index[-1]}")
        
        return self.predictions
    
    def evaluate(self) -> dict:
        """
        評估預測效能
        
        回傳:
            評估指標字典
        """
        if self.predictions is None:
            print("錯誤: 尚未執行預測")
            return {}
        
        # 確保測試集與預測有相同的長度
        test_subset = self.test_data[:len(self.predictions)]
        
        # 計算評估指標
        mse = mean_squared_error(test_subset, self.predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(test_subset, self.predictions)
        mape = np.mean(np.abs((test_subset - self.predictions) / test_subset)) * 100
        
        metrics = {
            'MSE': mse,
            'RMSE': rmse,
            'MAE': mae,
            'MAPE': mape
        }
        
        print("\n預測效能評估:")
        print("-" * 60)
        print(f"MSE:  {mse:.4f}")
        print(f"RMSE: {rmse:.4f}")
        print(f"MAE:  {mae:.4f}")
        print(f"MAPE: {mape:.2f}%")
        
        return metrics
    
    def plot_forecast(self):
        """
        視覺化預測結果
        """
        if self.predictions is None:
            print("錯誤: 尚未執行預測")
            return
        
        fig, ax = plt.subplots(figsize=(14, 6))
        
        # 繪製訓練集
        self.train_data.plot(ax=ax, label='訓練集', linewidth=2)
        
        # 繪製測試集
        test_subset = self.test_data[:len(self.predictions)]
        test_subset.plot(ax=ax, label='測試集(實際值)', linewidth=2)
        
        # 繪製預測值
        self.predictions.plot(ax=ax, label='預測值', linewidth=2, linestyle='--')
        
        ax.set_xlabel('時間')
        ax.set_ylabel('值')
        ax.set_title('ARIMA 模型預測結果')
        ax.legend(loc='best')
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/arima_forecast.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("\n預測圖表已儲存至 arima_forecast.png")
    
    def fit_sarima(
        self,
        order: tuple,
        seasonal_order: tuple
    ):
        """
        訓練 SARIMA 模型
        
        參數:
            order: 非季節參數 (p,d,q)
            seasonal_order: 季節參數 (P,D,Q,s)
        """
        if self.train_data is None:
            print("錯誤: 尚未分割資料")
            return
        
        print(f"\n訓練 SARIMA{order}x{seasonal_order} 模型...")
        
        # 建立並訓練模型
        self.model = SARIMAX(
            self.train_data,
            order=order,
            seasonal_order=seasonal_order,
            enforce_stationarity=False,
            enforce_invertibility=False
        )
        self.model_fit = self.model.fit()
        
        # 顯示模型摘要
        print("\n模型摘要:")
        print(self.model_fit.summary())
        
        # 診斷檢驗
        self._diagnostic_tests()

# 使用範例
if __name__ == "__main__":
    # 使用前面建立的範例時間序列
    forecaster = ARIMAForecaster()
    
    # 分割資料
    forecaster.split_data(ts_example, test_size=0.2)
    
    # 網格搜尋最佳參數
    grid_results = forecaster.grid_search_arima(
        p_range=range(0, 3),
        d_range=range(0, 2),
        q_range=range(0, 3)
    )
    
    # 訓練模型
    forecaster.fit_arima()
    
    # 執行預測
    forecaster.forecast()
    
    # 評估效能
    forecaster.evaluate()
    
    # 視覺化結果
    forecaster.plot_forecast()

這個 ARIMA 預測系統實作完整的時間序列預測流程。系統首先分割訓練與測試集,保留測試集用於評估模型的樣本外預測能力。網格搜尋功能自動化參數選擇過程,透過 AIC 準則比較不同參數組合的模型,選擇最優配置。這種系統化的參數選擇避免主觀判斷造成的偏差。

模型訓練後執行全面的診斷檢驗。Ljung-Box 檢驗評估殘差的自相關性,確保模型已充分擷取時間依賴結構。常態性檢驗驗證殘差分佈假設,這對於預測區間的建構很重要。診斷圖表包含殘差時間序列圖、ACF 圖、Q-Q 圖與直方圖,從多個角度檢視模型適配品質。預測評估計算多個誤差指標,RMSE 與 MAE 量化絕對誤差,MAPE 反映相對誤差,全面評估預測準確度。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:分割訓練與測試集;

partition "模型識別" {
    :網格搜尋\n遍歷 (p,d,q) 組合;
    :計算 AIC/BIC\n比較模型;
    :選擇最佳參數;
}

partition "模型估計" {
    :最大概似估計\n估計參數值;
    :計算標準誤\n建構信賴區間;
}

partition "模型診斷" {
    :殘差分析\n檢查自相關;
    :Ljung-Box 檢驗\n白雜訊檢驗;
    :常態性檢驗\n分佈假設驗證;
}

if (模型通過診斷?) then (是)
    :執行預測\n產生未來值;
else (否)
    :調整模型參數\n重新訓練;
endif

:計算評估指標\nRMSE、MAE、MAPE;

:視覺化預測結果;

stop

@enduml

這個流程圖描述 ARIMA 建模的完整 Box-Jenkins 方法論。從資料分割開始,模型識別階段透過網格搜尋與資訊準則選擇最佳參數。模型估計階段使用最大概似法估計參數並計算標準誤。診斷檢驗階段確保模型符合假設條件,包含殘差的獨立性與常態性。如果模型未通過診斷,需要重新調整參數或考慮其他模型形式。通過診斷後執行預測並計算評估指標,視覺化結果展示預測與實際值的比較。

結語

時間序列分析是資料科學的重要分支,為理解與預測動態系統提供強大工具。本文系統性介紹時間序列分析的理論基礎與實務應用,從基本概念到進階建模技術的完整流程。透過 Python 生態系統的工具,展示如何將理論轉化為實際的分析與預測系統。

時間序列的核心特徵包含趨勢、季節性、週期性與隨機性。理解這些組成要素是有效建模的基礎。時間序列分解技術將複雜的序列拆解為可解釋的成分,平穩性檢驗確保序列滿足建模假設。ACF 與 PACF 分析提供模型識別的重要資訊,指引參數選擇。這些基礎分析技術雖然傳統,但在現代時間序列分析中仍然不可或缺。

ARIMA 模型族代表時間序列預測的經典方法論。Box-Jenkins 方法提供系統化的建模流程,從模型識別、參數估計到診斷檢驗,每個步驟都有明確的統計準則。雖然 ARIMA 假設線性關係且需要手動參數選擇,但其數學基礎紮實、解釋性強,在許多實務應用中仍然表現優異。SARIMA 擴展處理季節性,使 ARIMA 方法適用於更廣泛的商業與經濟時間序列。

未來的時間序列分析將更加多元化與智慧化。機器學習與深度學習方法如 LSTM、GRU、Transformer 能夠捕捉複雜的非線性模式與長期依賴。自動化預測平台如 Prophet 降低建模門檻,使非專業人員也能建立高品質預測。整合多源資料與外部變數的多變量時間序列模型,提供更全面的預測框架。隨著計算能力提升與演算法創新,時間序列分析將在更多領域發揮價值,為資料驅動的決策提供堅實支撐。