時間序列分析是資料科學領域中不可或缺的分析技術,它專注於研究隨時間推移而變化的資料模式,揭示資料中隱藏的趨勢、週期性和季節性特徵。無論是金融市場的股價波動、零售業的銷售預測,還是製造業的產能規劃,時間序列分析都能提供寶貴的洞察和預測能力。本文將從時間序列資料的基本特性出發,深入剖析自迴歸模型、移動平均模型以及它們的組合形式,並透過商店績效分析的實際案例,展示如何運用 Python 生態系統中的工具進行完整的時間序列分析與視覺化,幫助讀者建立從理論到實踐的完整知識體系。

時間序列資料特性與分析基礎

時間序列資料是按照時間順序排列的觀測值序列,這種資料結構在商業、科學和工程領域中極為普遍。理解時間序列資料的固有特性是進行有效分析的前提,因為不同的資料特性需要採用不同的分析方法和模型。時間序列資料具有幾個關鍵特徵,包括時間依賴性、趨勢、季節性和隨機波動,這些特徵共同構成了資料的複雜行為模式。

趨勢是時間序列資料在長期內呈現的系統性上升或下降模式。這種趨勢可能是線性的,也可能是非線性的,例如指數增長或對數增長。在商業環境中,趨勢通常反映了市場擴張、技術進步或人口變化等宏觀因素的影響。識別和量化趨勢是時間序列分析的首要任務,因為趨勢的存在會影響後續統計分析的有效性。

季節性是指資料在固定時間間隔內重複出現的週期性模式。這種模式通常與自然週期(如一年四季)或社會週期(如工作日和週末)相關。例如,零售業的銷售額通常在年終購物季達到高峰,而空調銷售在夏季明顯增加。季節性分析對於庫存管理、人力資源規劃和行銷策略制定都具有重要價值。

週期性與季節性相似,但週期長度不固定且通常較長。經濟週期就是一個典型的例子,它包括繁榮、衰退、蕭條和復甦四個階段,但每個週期的持續時間並不固定。週期性分析對於長期投資決策和策略規劃特別重要。

隨機波動或稱為白噪聲,是時間序列中無法用趨勢、季節性或週期性解釋的隨機變化。這些波動通常被認為是由眾多微小且相互獨立的因素共同造成的。在建模過程中,我們的目標是盡可能捕捉系統性的變化模式,讓殘差儘量接近白噪聲。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "時間序列組成成分" as components {
    rectangle "趨勢成分" as trend
    rectangle "季節性成分" as seasonal
    rectangle "週期性成分" as cyclical
    rectangle "隨機成分" as residual
}

rectangle "分解方法" as methods {
    rectangle "加法分解" as additive
    rectangle "乘法分解" as multiplicative
}

rectangle "分析應用" as applications {
    rectangle "預測建模" as forecasting
    rectangle "異常檢測" as anomaly
    rectangle "模式識別" as pattern
}

trend --> additive
seasonal --> additive
cyclical --> additive
residual --> additive

trend --> multiplicative
seasonal --> multiplicative
cyclical --> multiplicative
residual --> multiplicative

additive --> forecasting
additive --> anomaly
multiplicative --> forecasting
multiplicative --> pattern

@enduml

上圖展示了時間序列的組成成分及其分解方法。時間序列可以分解為趨勢、季節性、週期性和隨機四個成分。加法分解假設各成分相互獨立且具有相同的變異程度,而乘法分解則適用於各成分具有相互依賴關係的情況。分解後的結果可以應用於預測建模、異常檢測和模式識別等多種分析場景。

經典時間序列模型理論與數學基礎

時間序列分析擁有豐富的模型體系,從簡單的自迴歸模型到複雜的季節性整合模型,每種模型都有其特定的假設和適用場景。深入理解這些模型的數學基礎是正確應用它們的前提,也是診斷模型問題和改進預測效能的關鍵。

自迴歸模型是最基本的時間序列模型之一,其核心思想是當前值可以由過去若干個時間點的值線性組合來表示。p 階自迴歸模型假設當前觀測值是過去 p 個觀測值的加權和加上一個隨機誤差項。這種模型適用於具有自相關性的時間序列,即當前值與過去值存在統計相關的情況。自迴歸模型的參數估計通常使用最小平方法或最大似然估計法。

移動平均模型則從另一個角度建模時間序列,它假設當前值是過去若干個隨機誤差項的線性組合。q 階移動平均模型使用過去 q 個時間點的預測誤差來預測當前值。這種模型適用於受到短期隨機衝擊影響的時間序列,例如突發事件對市場的影響。移動平均模型能夠有效捕捉時間序列中的短期波動。

自迴歸整合移動平均模型結合了自迴歸和移動平均的優點,並引入了差分運算來處理非平穩時間序列。ARIMA(p,d,q) 模型中,p 是自迴歸項的階數,d 是差分次數,q 是移動平均項的階數。差分運算通過計算相鄰觀測值的差異來消除趨勢,使非平穩序列變得平穩。ARIMA 模型是處理非季節性時間序列的強大工具,廣泛應用於經濟預測和需求規劃。

季節性自迴歸整合移動平均模型進一步擴展了 ARIMA 模型,增加了處理季節性模式的能力。SARIMA(p,d,q)(P,D,Q)m 模型包含兩組參數,分別處理非季節性和季節性成分。其中 m 是季節週期的長度,例如月度資料的 m=12,季度資料的 m=4。SARIMA 模型能夠同時捕捉短期動態和長期季節性模式,是處理具有季節性特徵的時間序列的首選模型。

import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.seasonal import seasonal_decompose
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

class TimeSeriesAnalyzer:
    """
    時間序列分析器

    提供完整的時間序列分析功能,包括平穩性檢定、
    模型識別、參數估計和預測
    """

    def __init__(self, data, date_column=None, value_column=None):
        """
        初始化時間序列分析器

        Args:
            data: pandas DataFrame 或 Series
            date_column: 日期欄位名稱(如果是 DataFrame)
            value_column: 數值欄位名稱(如果是 DataFrame)
        """
        # 處理不同的輸入格式
        if isinstance(data, pd.DataFrame):
            if date_column and value_column:
                # 設定日期索引並提取數值欄位
                self.series = data.set_index(date_column)[value_column]
            else:
                # 假設第一欄是日期,第二欄是數值
                self.series = data.set_index(data.columns[0])[data.columns[1]]
        else:
            self.series = data

        # 確保索引是日期時間格式
        if not isinstance(self.series.index, pd.DatetimeIndex):
            self.series.index = pd.to_datetime(self.series.index)

        # 按時間排序
        self.series = self.series.sort_index()

        # 儲存分析結果
        self.decomposition = None
        self.model = None
        self.model_fit = None

    def check_stationarity(self, significance_level=0.05):
        """
        執行 Augmented Dickey-Fuller 檢定來檢查時間序列的平穩性

        平穩性是許多時間序列模型的基本假設
        非平穩序列需要進行差分處理才能建模

        Args:
            significance_level: 顯著性水準,預設 0.05

        Returns:
            dict: 包含檢定結果的字典
        """
        # 執行 ADF 檢定
        # ADF 檢定的虛無假設是序列存在單位根(非平穩)
        result = adfuller(self.series.dropna(), autolag='AIC')

        # 整理檢定結果
        adf_output = {
            'test_statistic': result[0],
            'p_value': result[1],
            'lags_used': result[2],
            'observations': result[3],
            'critical_values': result[4],
            'is_stationary': result[1] < significance_level
        }

        # 輸出分析報告
        print("=" * 50)
        print("ADF 平穩性檢定結果")
        print("=" * 50)
        print(f"檢定統計量: {adf_output['test_statistic']:.4f}")
        print(f"p 值: {adf_output['p_value']:.4f}")
        print(f"使用滯後數: {adf_output['lags_used']}")
        print(f"觀測值數量: {adf_output['observations']}")
        print("\n臨界值:")
        for key, value in adf_output['critical_values'].items():
            print(f"  {key}: {value:.4f}")

        if adf_output['is_stationary']:
            print(f"\n結論: 序列是平穩的 (p 值 < {significance_level})")
        else:
            print(f"\n結論: 序列是非平穩的 (p 值 >= {significance_level})")
            print("建議: 進行差分處理以達到平穩性")

        return adf_output

    def decompose_series(self, model='additive', period=None):
        """
        對時間序列進行季節性分解

        將序列分解為趨勢、季節性和殘差三個成分

        Args:
            model: 分解模型類型,'additive' 或 'multiplicative'
            period: 季節週期長度,若為 None 則自動推斷

        Returns:
            分解結果物件
        """
        # 如果未指定週期,嘗試從索引頻率推斷
        if period is None:
            if hasattr(self.series.index, 'freq') and self.series.index.freq:
                freq = self.series.index.freq
                if freq.name in ['M', 'MS']:
                    period = 12  # 月度資料
                elif freq.name in ['Q', 'QS']:
                    period = 4   # 季度資料
                elif freq.name in ['W']:
                    period = 52  # 週度資料
                else:
                    period = 12  # 預設值
            else:
                period = 12  # 預設假設月度資料

        # 執行季節性分解
        # additive: Y = Trend + Seasonal + Residual
        # multiplicative: Y = Trend * Seasonal * Residual
        self.decomposition = seasonal_decompose(
            self.series,
            model=model,
            period=period,
            extrapolate_trend='freq'  # 處理序列開頭和結尾的趨勢值
        )

        print("=" * 50)
        print(f"季節性分解完成 ({model} 模型)")
        print("=" * 50)
        print(f"季節週期: {period}")
        print(f"趨勢成分範圍: [{self.decomposition.trend.min():.2f}, "
              f"{self.decomposition.trend.max():.2f}]")
        print(f"季節性成分範圍: [{self.decomposition.seasonal.min():.2f}, "
              f"{self.decomposition.seasonal.max():.2f}]")
        print(f"殘差範圍: [{self.decomposition.resid.min():.2f}, "
              f"{self.decomposition.resid.max():.2f}]")

        return self.decomposition

    def identify_order(self, max_lag=20):
        """
        透過 ACF 和 PACF 圖幫助識別 ARIMA 模型的階數

        ACF (自相關函數): 幫助識別 MA 項的階數 q
        PACF (偏自相關函數): 幫助識別 AR 項的階數 p

        Args:
            max_lag: 計算的最大滯後數

        Returns:
            tuple: (acf_values, pacf_values)
        """
        # 計算自相關函數值
        # ACF 測量時間序列與其滯後版本之間的相關性
        acf_values = acf(self.series.dropna(), nlags=max_lag)

        # 計算偏自相關函數值
        # PACF 測量在控制中間滯後的情況下的相關性
        pacf_values = pacf(self.series.dropna(), nlags=max_lag)

        print("=" * 50)
        print("模型階數識別")
        print("=" * 50)
        print("\n前 10 個滯後的 ACF 值:")
        for i, val in enumerate(acf_values[:11]):
            print(f"  滯後 {i}: {val:.4f}")

        print("\n前 10 個滯後的 PACF 值:")
        for i, val in enumerate(pacf_values[:11]):
            print(f"  滯後 {i}: {val:.4f}")

        return acf_values, pacf_values

    def fit_arima(self, order=(1, 1, 1)):
        """
        擬合 ARIMA 模型

        Args:
            order: (p, d, q) 模型階數
                p: 自迴歸項數
                d: 差分次數
                q: 移動平均項數

        Returns:
            模型擬合結果
        """
        print(f"\n正在擬合 ARIMA{order} 模型...")

        # 建立 ARIMA 模型
        self.model = ARIMA(self.series, order=order)

        # 擬合模型
        # 使用最大似然估計法估計模型參數
        self.model_fit = self.model.fit()

        # 輸出模型摘要
        print("\n" + "=" * 50)
        print(f"ARIMA{order} 模型擬合結果")
        print("=" * 50)
        print(f"AIC: {self.model_fit.aic:.2f}")
        print(f"BIC: {self.model_fit.bic:.2f}")
        print(f"對數似然值: {self.model_fit.llf:.2f}")

        print("\n模型參數:")
        for param, value in self.model_fit.params.items():
            print(f"  {param}: {value:.4f}")

        return self.model_fit

    def fit_sarima(self, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)):
        """
        擬合 SARIMA 模型

        Args:
            order: (p, d, q) 非季節性模型階數
            seasonal_order: (P, D, Q, m) 季節性模型階數
                P: 季節性自迴歸項數
                D: 季節性差分次數
                Q: 季節性移動平均項數
                m: 季節週期長度

        Returns:
            模型擬合結果
        """
        print(f"\n正在擬合 SARIMA{order}x{seasonal_order} 模型...")

        # 建立 SARIMAX 模型(不含外生變數即為 SARIMA)
        self.model = SARIMAX(
            self.series,
            order=order,
            seasonal_order=seasonal_order,
            enforce_stationarity=False,  # 允許非平穩參數
            enforce_invertibility=False   # 允許非可逆參數
        )

        # 擬合模型
        self.model_fit = self.model.fit(disp=False)

        # 輸出模型摘要
        print("\n" + "=" * 50)
        print(f"SARIMA{order}x{seasonal_order} 模型擬合結果")
        print("=" * 50)
        print(f"AIC: {self.model_fit.aic:.2f}")
        print(f"BIC: {self.model_fit.bic:.2f}")
        print(f"對數似然值: {self.model_fit.llf:.2f}")

        print("\n模型參數:")
        for param, value in self.model_fit.params.items():
            print(f"  {param}: {value:.4f}")

        return self.model_fit

    def forecast(self, steps=12, confidence_level=0.95):
        """
        使用擬合的模型進行預測

        Args:
            steps: 預測的時間步數
            confidence_level: 信賴區間的信賴水準

        Returns:
            DataFrame: 包含預測值和信賴區間
        """
        if self.model_fit is None:
            raise ValueError("請先擬合模型再進行預測")

        # 進行預測
        # get_forecast 返回包含預測值和信賴區間的物件
        forecast_result = self.model_fit.get_forecast(steps=steps)

        # 取得預測值
        predictions = forecast_result.predicted_mean

        # 取得信賴區間
        alpha = 1 - confidence_level
        conf_int = forecast_result.conf_int(alpha=alpha)

        # 整理預測結果
        forecast_df = pd.DataFrame({
            'forecast': predictions,
            'lower_ci': conf_int.iloc[:, 0],
            'upper_ci': conf_int.iloc[:, 1]
        })

        print("\n" + "=" * 50)
        print(f"預測結果 ({steps} 期)")
        print("=" * 50)
        print(f"信賴水準: {confidence_level * 100:.0f}%")
        print(f"\n預測摘要:")
        print(f"  平均預測值: {predictions.mean():.2f}")
        print(f"  預測值範圍: [{predictions.min():.2f}, {predictions.max():.2f}]")

        return forecast_df

    def diagnose_model(self):
        """
        診斷模型殘差

        檢查殘差是否滿足白噪聲假設,
        這是模型有效性的重要指標
        """
        if self.model_fit is None:
            raise ValueError("請先擬合模型再進行診斷")

        # 取得殘差
        residuals = self.model_fit.resid

        print("\n" + "=" * 50)
        print("模型診斷")
        print("=" * 50)

        # 計算殘差統計量
        print("\n殘差統計:")
        print(f"  平均值: {residuals.mean():.4f}")
        print(f"  標準差: {residuals.std():.4f}")
        print(f"  偏度: {residuals.skew():.4f}")
        print(f"  峰度: {residuals.kurtosis():.4f}")

        # 執行 Ljung-Box 檢定
        # 虛無假設:殘差是白噪聲(無自相關)
        from statsmodels.stats.diagnostic import acorr_ljungbox
        lb_result = acorr_ljungbox(residuals, lags=[10, 20], return_df=True)

        print("\nLjung-Box 檢定:")
        print(lb_result)

        if all(lb_result['lb_pvalue'] > 0.05):
            print("\n結論: 殘差無顯著自相關,模型適當")
        else:
            print("\n結論: 殘差存在自相關,建議調整模型")

        return residuals

class StorePerformanceAnalyzer:
    """
    商店績效分析器

    專門用於分析商店的時間序列績效資料,
    包括收入、客戶數量和滿意度等指標
    """

    def __init__(self):
        """
        初始化商店績效分析器
        """
        self.data = None
        self.metrics = {}

    def load_data(self, data):
        """
        載入商店績效資料

        Args:
            data: pandas DataFrame,包含商店績效資料
        """
        self.data = data.copy()
        print(f"已載入 {len(self.data)} 筆商店績效資料")

    def generate_sample_data(self, n_stores=50, years=range(2020, 2024)):
        """
        生成範例商店績效資料

        Args:
            n_stores: 商店數量
            years: 年份範圍

        Returns:
            DataFrame: 範例資料
        """
        np.random.seed(42)

        records = []
        for store_id in range(1, n_stores + 1):
            for year in years:
                # 生成績效指標
                # 收入增長率:根據商店規模和年份調整
                base_growth = 0.05 + (store_id % 10) * 0.01
                yearly_factor = (year - 2020) * 0.02
                growth_rate = base_growth + yearly_factor + np.random.normal(0, 0.05)
                growth_rate = np.clip(growth_rate, -0.1, 0.3)

                # 客戶數量:與商店規模相關
                customer_count = int(np.random.poisson(5 + store_id % 10))

                # 客戶滿意度:根據服務品質和隨機因素
                satisfaction = 60 + 20 * np.random.beta(2, 2) + (store_id % 5)
                satisfaction = np.clip(satisfaction, 40, 100)

                # 生產力指數
                productivity = 70 + 15 * np.random.beta(3, 2)
                productivity = np.clip(productivity, 50, 95)

                records.append({
                    'store_id': store_id,
                    'year': year,
                    'revenue_growth': round(growth_rate, 3),
                    'customer_count': customer_count,
                    'satisfaction': round(satisfaction, 2),
                    'productivity': round(productivity, 2)
                })

        self.data = pd.DataFrame(records)
        print(f"已生成 {len(self.data)} 筆範例商店績效資料")

        return self.data

    def calculate_metrics(self):
        """
        計算各種績效指標

        Returns:
            dict: 績效指標摘要
        """
        if self.data is None:
            raise ValueError("請先載入資料")

        # 整體指標
        self.metrics['overall'] = {
            'total_stores': self.data['store_id'].nunique(),
            'years_covered': sorted(self.data['year'].unique().tolist()),
            'avg_growth': self.data['revenue_growth'].mean(),
            'avg_satisfaction': self.data['satisfaction'].mean(),
            'avg_productivity': self.data['productivity'].mean()
        }

        # 按年份統計
        yearly_stats = self.data.groupby('year').agg({
            'revenue_growth': ['mean', 'std', 'min', 'max'],
            'satisfaction': ['mean', 'std'],
            'productivity': ['mean', 'std']
        }).round(3)
        self.metrics['yearly'] = yearly_stats

        # 識別頂尖商店
        store_performance = self.data.groupby('store_id').agg({
            'revenue_growth': 'mean',
            'satisfaction': 'mean',
            'productivity': 'mean'
        })
        # 計算綜合得分
        store_performance['composite_score'] = (
            store_performance['revenue_growth'].rank(pct=True) * 0.4 +
            store_performance['satisfaction'].rank(pct=True) * 0.3 +
            store_performance['productivity'].rank(pct=True) * 0.3
        )
        top_stores = store_performance.nlargest(5, 'composite_score')
        self.metrics['top_stores'] = top_stores

        # 輸出摘要
        print("\n" + "=" * 50)
        print("商店績效指標摘要")
        print("=" * 50)
        print(f"\n整體統計:")
        print(f"  商店總數: {self.metrics['overall']['total_stores']}")
        print(f"  涵蓋年份: {self.metrics['overall']['years_covered']}")
        print(f"  平均收入增長率: {self.metrics['overall']['avg_growth']:.2%}")
        print(f"  平均客戶滿意度: {self.metrics['overall']['avg_satisfaction']:.1f}")
        print(f"  平均生產力指數: {self.metrics['overall']['avg_productivity']:.1f}")

        print(f"\n前 5 名商店:")
        print(top_stores.round(3))

        return self.metrics

    def analyze_trends(self):
        """
        分析績效趨勢

        Returns:
            DataFrame: 趨勢分析結果
        """
        if self.data is None:
            raise ValueError("請先載入資料")

        # 計算年度趨勢
        yearly_trend = self.data.groupby('year').agg({
            'revenue_growth': 'mean',
            'customer_count': 'sum',
            'satisfaction': 'mean',
            'productivity': 'mean'
        }).round(3)

        # 計算年度變化率
        yearly_trend['growth_change'] = yearly_trend['revenue_growth'].pct_change()
        yearly_trend['satisfaction_change'] = yearly_trend['satisfaction'].pct_change()
        yearly_trend['productivity_change'] = yearly_trend['productivity'].pct_change()

        print("\n" + "=" * 50)
        print("績效趨勢分析")
        print("=" * 50)
        print("\n年度趨勢:")
        print(yearly_trend)

        # 識別趨勢方向
        latest_year = yearly_trend.index[-1]
        if len(yearly_trend) > 1:
            second_latest = yearly_trend.index[-2]

            print(f"\n{second_latest}{latest_year} 的變化:")
            for col in ['revenue_growth', 'satisfaction', 'productivity']:
                change = yearly_trend.loc[latest_year, col] - yearly_trend.loc[second_latest, col]
                direction = "上升" if change > 0 else "下降"
                print(f"  {col}: {direction} {abs(change):.3f}")

        return yearly_trend

    def segment_stores(self, n_segments=3):
        """
        將商店分群

        根據績效指標將商店分為不同等級

        Args:
            n_segments: 分群數量

        Returns:
            DataFrame: 包含分群結果的商店資料
        """
        if self.data is None:
            raise ValueError("請先載入資料")

        # 計算每家商店的平均績效
        store_metrics = self.data.groupby('store_id').agg({
            'revenue_growth': 'mean',
            'satisfaction': 'mean',
            'productivity': 'mean'
        })

        # 計算綜合得分
        store_metrics['score'] = (
            (store_metrics['revenue_growth'] - store_metrics['revenue_growth'].min()) /
            (store_metrics['revenue_growth'].max() - store_metrics['revenue_growth'].min()) * 0.4 +
            (store_metrics['satisfaction'] - store_metrics['satisfaction'].min()) /
            (store_metrics['satisfaction'].max() - store_metrics['satisfaction'].min()) * 0.3 +
            (store_metrics['productivity'] - store_metrics['productivity'].min()) /
            (store_metrics['productivity'].max() - store_metrics['productivity'].min()) * 0.3
        )

        # 根據得分分群
        store_metrics['segment'] = pd.qcut(
            store_metrics['score'],
            q=n_segments,
            labels=[f'Tier {i}' for i in range(n_segments, 0, -1)]
        )

        # 輸出分群摘要
        segment_summary = store_metrics.groupby('segment').agg({
            'revenue_growth': 'mean',
            'satisfaction': 'mean',
            'productivity': 'mean',
            'score': ['count', 'mean']
        }).round(3)

        print("\n" + "=" * 50)
        print("商店分群結果")
        print("=" * 50)
        print("\n各群組摘要:")
        print(segment_summary)

        return store_metrics

# 使用範例
if __name__ == "__main__":
    # 範例 1:時間序列分析
    print("\n" + "=" * 60)
    print("範例 1:時間序列分析")
    print("=" * 60)

    # 生成範例時間序列資料
    np.random.seed(42)
    dates = pd.date_range(start='2020-01-01', periods=48, freq='M')

    # 建立具有趨勢和季節性的時間序列
    trend = np.linspace(100, 150, 48)
    seasonal = 10 * np.sin(np.arange(48) * 2 * np.pi / 12)
    noise = np.random.normal(0, 5, 48)
    values = trend + seasonal + noise

    ts_data = pd.Series(values, index=dates, name='sales')

    # 建立時間序列分析器
    analyzer = TimeSeriesAnalyzer(ts_data)

    # 檢查平穩性
    stationarity_result = analyzer.check_stationarity()

    # 季節性分解
    decomposition = analyzer.decompose_series(model='additive', period=12)

    # 識別模型階數
    acf_vals, pacf_vals = analyzer.identify_order()

    # 擬合 SARIMA 模型
    model_fit = analyzer.fit_sarima(
        order=(1, 1, 1),
        seasonal_order=(1, 1, 1, 12)
    )

    # 進行預測
    forecast = analyzer.forecast(steps=12)
    print("\n預測結果:")
    print(forecast)

    # 模型診斷
    residuals = analyzer.diagnose_model()

    # 範例 2:商店績效分析
    print("\n" + "=" * 60)
    print("範例 2:商店績效分析")
    print("=" * 60)

    # 建立商店績效分析器
    store_analyzer = StorePerformanceAnalyzer()

    # 生成範例資料
    sample_data = store_analyzer.generate_sample_data(n_stores=30)

    # 計算績效指標
    metrics = store_analyzer.calculate_metrics()

    # 分析趨勢
    trends = store_analyzer.analyze_trends()

    # 商店分群
    segments = store_analyzer.segment_stores(n_segments=3)

上述程式碼實作了完整的時間序列分析框架,包含 TimeSeriesAnalyzer 和 StorePerformanceAnalyzer 兩個核心類別。TimeSeriesAnalyzer 類別提供了平穩性檢定、季節性分解、模型擬合和預測等功能,而 StorePerformanceAnalyzer 類別則專門用於商店績效資料的分析和分群。這種模組化的設計讓程式碼易於維護和擴展。

時間序列預測模型選擇與調參策略

選擇合適的時間序列模型需要綜合考量資料特性、預測目標和運算資源等因素。模型選擇不當可能導致預測偏差或過度擬合,因此系統性的模型選擇流程對於獲得可靠的預測結果至關重要。

模型選擇的第一步是對資料進行詳盡的探索性分析。透過視覺化觀察時間序列的形態,可以初步判斷是否存在趨勢、季節性和異常值。自相關函數和偏自相關函數圖則提供了模型階數選擇的線索。如果 ACF 呈現緩慢衰減的模式,通常表示需要進行差分處理。如果 ACF 在特定滯後處顯示峰值,則暗示可能存在季節性。

資訊準則是模型選擇的重要工具,最常用的是 AIC 和 BIC。AIC 傾向於選擇較複雜的模型,因為它對模型複雜度的懲罰較輕。BIC 則對複雜度施加更重的懲罰,傾向於選擇較簡單的模型。在實務中,通常會比較多個候選模型的資訊準則值,選擇最小者。然而,資訊準則不是唯一的考量因素,還需要檢查模型的殘差診斷結果。

交叉驗證是評估模型預測能力的可靠方法。對於時間序列資料,需要使用保持時間順序的交叉驗證方法,例如滾動視窗驗證或擴展視窗驗證。在滾動視窗驗證中,使用固定長度的歷史資料訓練模型,然後預測下一個時間點,接著將視窗向前移動重複這個過程。這種方法可以評估模型在不同時間段的預測穩定性。

from sklearn.metrics import mean_absolute_error, mean_squared_error
import itertools

class ModelSelector:
    """
    時間序列模型選擇器

    提供自動化的模型選擇和超參數調整功能
    """

    def __init__(self, series):
        """
        初始化模型選擇器

        Args:
            series: pandas Series,時間序列資料
        """
        self.series = series
        self.results = []
        self.best_model = None

    def grid_search_arima(self, p_range=(0, 3), d_range=(0, 2), q_range=(0, 3)):
        """
        網格搜尋 ARIMA 模型最佳參數

        遍歷所有參數組合,根據 AIC 選擇最佳模型

        Args:
            p_range: AR 階數範圍 (min, max)
            d_range: 差分次數範圍 (min, max)
            q_range: MA 階數範圍 (min, max)

        Returns:
            DataFrame: 所有模型的評估結果
        """
        # 生成所有參數組合
        p_values = range(p_range[0], p_range[1] + 1)
        d_values = range(d_range[0], d_range[1] + 1)
        q_values = range(q_range[0], q_range[1] + 1)

        pdq_combinations = list(itertools.product(p_values, d_values, q_values))

        print(f"正在評估 {len(pdq_combinations)} 個 ARIMA 模型...")
        print("=" * 50)

        results = []

        for pdq in pdq_combinations:
            try:
                # 擬合模型
                model = ARIMA(self.series, order=pdq)
                model_fit = model.fit()

                # 記錄結果
                results.append({
                    'order': pdq,
                    'aic': model_fit.aic,
                    'bic': model_fit.bic,
                    'llf': model_fit.llf
                })

            except Exception as e:
                # 記錄失敗的模型
                results.append({
                    'order': pdq,
                    'aic': np.inf,
                    'bic': np.inf,
                    'llf': np.nan,
                    'error': str(e)
                })

        # 轉換為 DataFrame 並排序
        results_df = pd.DataFrame(results)
        results_df = results_df.sort_values('aic')
        self.results = results_df

        # 輸出最佳結果
        best = results_df.iloc[0]
        print(f"\n最佳 ARIMA 模型: {best['order']}")
        print(f"AIC: {best['aic']:.2f}")
        print(f"BIC: {best['bic']:.2f}")

        # 輸出前 5 名模型
        print("\n前 5 名模型:")
        print(results_df.head()[['order', 'aic', 'bic']])

        return results_df

    def grid_search_sarima(self, order_ranges, seasonal_ranges, m=12):
        """
        網格搜尋 SARIMA 模型最佳參數

        Args:
            order_ranges: 非季節性參數範圍字典
            seasonal_ranges: 季節性參數範圍字典
            m: 季節週期長度

        Returns:
            DataFrame: 所有模型的評估結果
        """
        # 生成所有參數組合
        p = range(order_ranges.get('p', (0, 1))[0], order_ranges.get('p', (0, 1))[1] + 1)
        d = range(order_ranges.get('d', (0, 1))[0], order_ranges.get('d', (0, 1))[1] + 1)
        q = range(order_ranges.get('q', (0, 1))[0], order_ranges.get('q', (0, 1))[1] + 1)

        P = range(seasonal_ranges.get('P', (0, 1))[0], seasonal_ranges.get('P', (0, 1))[1] + 1)
        D = range(seasonal_ranges.get('D', (0, 1))[0], seasonal_ranges.get('D', (0, 1))[1] + 1)
        Q = range(seasonal_ranges.get('Q', (0, 1))[0], seasonal_ranges.get('Q', (0, 1))[1] + 1)

        pdq_combinations = list(itertools.product(p, d, q))
        PDQ_combinations = list(itertools.product(P, D, Q))

        total_models = len(pdq_combinations) * len(PDQ_combinations)
        print(f"正在評估 {total_models} 個 SARIMA 模型...")
        print("=" * 50)

        results = []
        count = 0

        for pdq in pdq_combinations:
            for PDQ in PDQ_combinations:
                count += 1
                seasonal_order = PDQ + (m,)

                try:
                    # 擬合模型
                    model = SARIMAX(
                        self.series,
                        order=pdq,
                        seasonal_order=seasonal_order,
                        enforce_stationarity=False,
                        enforce_invertibility=False
                    )
                    model_fit = model.fit(disp=False)

                    # 記錄結果
                    results.append({
                        'order': pdq,
                        'seasonal_order': seasonal_order,
                        'aic': model_fit.aic,
                        'bic': model_fit.bic,
                        'llf': model_fit.llf
                    })

                except Exception as e:
                    results.append({
                        'order': pdq,
                        'seasonal_order': seasonal_order,
                        'aic': np.inf,
                        'bic': np.inf,
                        'llf': np.nan,
                        'error': str(e)
                    })

                # 進度報告
                if count % 20 == 0:
                    print(f"已完成 {count}/{total_models} 個模型...")

        # 轉換為 DataFrame 並排序
        results_df = pd.DataFrame(results)
        results_df = results_df.sort_values('aic')
        self.results = results_df

        # 輸出最佳結果
        best = results_df.iloc[0]
        print(f"\n最佳 SARIMA 模型: {best['order']} x {best['seasonal_order']}")
        print(f"AIC: {best['aic']:.2f}")
        print(f"BIC: {best['bic']:.2f}")

        return results_df

    def time_series_cv(self, model_order, n_splits=5, horizon=1):
        """
        時間序列交叉驗證

        使用擴展視窗方法評估模型的預測能力

        Args:
            model_order: 模型階數 (p, d, q)
            n_splits: 交叉驗證的折數
            horizon: 預測的時間步數

        Returns:
            dict: 包含各折和平均誤差的結果
        """
        n = len(self.series)

        # 計算每折的大小
        # 確保有足夠的初始訓練資料
        min_train_size = int(n * 0.5)
        fold_size = (n - min_train_size - horizon) // n_splits

        if fold_size < 1:
            raise ValueError("資料量不足以進行指定的交叉驗證")

        print(f"\n正在進行 {n_splits} 折時間序列交叉驗證...")
        print(f"模型: ARIMA{model_order}")
        print("=" * 50)

        mae_scores = []
        rmse_scores = []

        for i in range(n_splits):
            # 定義訓練和測試範圍
            train_end = min_train_size + i * fold_size
            test_start = train_end
            test_end = test_start + horizon

            if test_end > n:
                break

            # 分割資料
            train_data = self.series.iloc[:train_end]
            test_data = self.series.iloc[test_start:test_end]

            try:
                # 擬合模型
                model = ARIMA(train_data, order=model_order)
                model_fit = model.fit()

                # 預測
                predictions = model_fit.forecast(steps=horizon)

                # 計算誤差
                mae = mean_absolute_error(test_data, predictions)
                rmse = np.sqrt(mean_squared_error(test_data, predictions))

                mae_scores.append(mae)
                rmse_scores.append(rmse)

                print(f"折 {i+1}: MAE = {mae:.4f}, RMSE = {rmse:.4f}")

            except Exception as e:
                print(f"折 {i+1}: 失敗 - {str(e)}")

        # 計算平均誤差
        if mae_scores:
            avg_mae = np.mean(mae_scores)
            avg_rmse = np.mean(rmse_scores)

            print("\n" + "-" * 50)
            print(f"平均 MAE: {avg_mae:.4f}")
            print(f"平均 RMSE: {avg_rmse:.4f}")

            return {
                'mae_scores': mae_scores,
                'rmse_scores': rmse_scores,
                'avg_mae': avg_mae,
                'avg_rmse': avg_rmse
            }
        else:
            print("交叉驗證失敗")
            return None

# 使用範例
if __name__ == "__main__":
    # 生成範例資料
    np.random.seed(42)
    dates = pd.date_range(start='2019-01-01', periods=60, freq='M')

    # 建立具有趨勢和季節性的時間序列
    trend = np.linspace(100, 160, 60)
    seasonal = 15 * np.sin(np.arange(60) * 2 * np.pi / 12)
    noise = np.random.normal(0, 5, 60)
    values = trend + seasonal + noise

    ts_data = pd.Series(values, index=dates, name='sales')

    # 建立模型選擇器
    selector = ModelSelector(ts_data)

    # 網格搜尋 ARIMA
    arima_results = selector.grid_search_arima(
        p_range=(0, 2),
        d_range=(0, 1),
        q_range=(0, 2)
    )

    # 交叉驗證最佳模型
    best_order = arima_results.iloc[0]['order']
    cv_results = selector.time_series_cv(
        model_order=best_order,
        n_splits=4,
        horizon=3
    )

上述程式碼實作了自動化的模型選擇流程,包括網格搜尋和時間序列交叉驗證。ModelSelector 類別提供了系統性地評估多個候選模型的能力,幫助分析人員選擇最適合資料特性的模型。網格搜尋遍歷所有參數組合並根據 AIC 排序,而交叉驗證則評估模型的實際預測能力。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "模型選擇流程" as process {
    rectangle "資料探索" as explore {
        rectangle "視覺化分析" as viz
        rectangle "ACF/PACF 分析" as acf
    }

    rectangle "候選模型生成" as generate {
        rectangle "參數範圍設定" as params
        rectangle "網格搜尋" as grid
    }

    rectangle "模型評估" as evaluate {
        rectangle "資訊準則比較" as aic
        rectangle "交叉驗證" as cv
        rectangle "殘差診斷" as residual
    }

    rectangle "最終選擇" as select {
        rectangle "綜合評估" as integrate
        rectangle "業務考量" as business
    }
}

viz --> acf
acf --> params
params --> grid
grid --> aic
aic --> cv
cv --> residual
residual --> integrate
integrate --> business

@enduml

上圖展示了完整的時間序列模型選擇流程。從資料探索開始,透過視覺化和 ACF/PACF 分析了解資料特性,然後設定參數範圍進行網格搜尋。候選模型透過資訊準則、交叉驗證和殘差診斷進行評估,最後結合業務考量做出最終選擇。

商店績效資料視覺化最佳實踐

資料視覺化是溝通分析結果的關鍵環節,良好的視覺化設計可以讓複雜的時間序列模式變得清晰易懂。在商店績效分析中,視覺化不僅用於呈現歷史趨勢,還用於展示預測結果和績效比較。

時間序列圖是最基本也是最重要的視覺化形式。一張好的時間序列圖應該清楚地展示資料的長期趨勢和短期波動,並標註重要的時間點和事件。使用適當的時間尺度和數值範圍可以避免誤導性的視覺效果。當需要比較多個時間序列時,可以使用多軸圖或小倍數圖來保持清晰度。

季節性分解圖將時間序列拆分為趨勢、季節性和殘差三個成分,分別展示在不同的子圖中。這種視覺化方式幫助分析人員理解各個成分對整體資料的貢獻程度。趨勢圖顯示長期走勢,季節性圖顯示週期性模式,殘差圖則用於診斷模型的適當性。

預測視覺化需要同時呈現歷史資料、預測值和信賴區間。使用不同的顏色或線型區分歷史和預測部分,信賴區間通常用半透明的色帶表示。良好的預測視覺化讓決策者能夠直觀地理解預測的不確定性程度,從而做出更明智的決策。

績效儀表板整合多個關鍵績效指標的視覺化,提供商店績效的全面概覽。儀表板設計應該考慮使用者的需求和閱讀習慣,將最重要的資訊放在最顯眼的位置。互動式儀表板允許使用者鑽取詳細資料或篩選特定的商店和時間段,提供更靈活的分析體驗。

在設計商店績效視覺化時,需要考慮目標受眾的專業背景。對於高階管理者,簡潔的摘要圖表和關鍵數字更為合適。對於分析人員,則需要提供更詳細的視覺化和原始資料的存取權限。無論目標受眾是誰,視覺化設計都應該遵循一致的風格和配色方案,確保專業性和可讀性。

時間序列分析與商店績效視覺化的結合為商業決策提供了強大的支援工具。透過理解資料的時間模式,企業可以更準確地預測未來需求,及時調整庫存和人力配置。透過直觀的視覺化呈現,分析結果可以有效地傳達給各層級的決策者,促進資料驅動的決策文化。隨著資料量的持續增長和分析技術的不斷進步,時間序列分析的應用範圍將會更加廣泛,視覺化工具也將變得更加智能和互動。企業應該持續投資於分析能力的建設,培養具備時間序列分析和資料視覺化技能的人才,以在競爭激烈的市場中保持優勢。