在當今競爭激烈的商業環境中,資料科學已成為驅動企業決策的核心動力。企業不再僅依賴直覺或過往經驗來制定策略,而是透過系統化的資料分析方法來洞察市場趨勢、優化營運效率並創造競爭優勢。本文將從實務角度深入探討資料科學如何驅動商業決策,涵蓋 KPI 定義、資料管線建構、預測模型應用、ROI 分析以及決策支援系統的完整建構流程。

商業決策的資料科學框架

資料驅動決策的本質在於將複雜的商業問題轉化為可量化、可分析的資料問題。這個轉化過程需要一個完整的框架來確保分析結果能夠有效地支援決策制定。商業決策的資料科學框架包含五個核心階段:問題定義、資料準備、分析建模、結果驗證以及決策執行。每個階段都有其特定的方法論和技術工具,而這些階段之間的銜接則決定了整體分析專案的成敗。

問題定義階段是整個框架的基礎,企業需要清楚地界定商業目標,並將其轉化為具體的分析問題。這個階段最重要的工作是確認關鍵績效指標 KPI,因為 KPI 的選擇直接影響後續所有分析工作的方向和價值。資料準備階段則涉及資料的採集、清理、轉換和整合,這個階段通常佔據整體專案時間的百分之六十至八十,是最耗時但也最關鍵的環節。分析建模階段運用統計方法和機器學習技術來發現資料中的模式和關聯,並建構預測模型來支援決策。結果驗證階段確保分析結果的可靠性和業務相關性,避免因資料偏差或模型過擬合而導致錯誤決策。最後的決策執行階段則將分析洞察轉化為具體的商業行動,並建立持續監控機制來追蹤決策效果。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "問題定義" as A
rectangle "資料準備" as B
rectangle "分析建模" as C
rectangle "結果驗證" as D
rectangle "決策執行" as E

A --> B : KPI 定義
B --> C : 資料管線
C --> D : 模型驗證
D --> E : 決策支援
E --> A : 回饋優化

@enduml

這個框架的核心價值在於建立一個可重複、可擴展的分析流程。企業透過這個框架可以將資料科學的應用從單次專案提升為持續性的決策支援能力,從而在快速變化的市場環境中保持競爭優勢。

KPI 定義與商業指標設計

關鍵績效指標 KPI 的定義是資料驅動決策的起點,也是最容易被低估的環節。一個設計良好的 KPI 系統能夠精準地反映商業目標的達成程度,並提供明確的改善方向;而設計不當的 KPI 則可能導致資源錯配甚至適得其反的決策。KPI 定義需要遵循 SMART 原則:具體 Specific、可衡量 Measurable、可達成 Achievable、相關性 Relevant 以及時限性 Time-bound。

在實務上,KPI 的設計需要考慮指標之間的層級關係和因果連結。頂層 KPI 通常反映企業的整體策略目標,例如營收成長率、市場佔有率或客戶終身價值。這些頂層指標需要分解為可操作的次級指標,例如客戶獲取成本、轉換率、回購率等,以便各部門能夠明確自己的貢獻方向。此外,領先指標和落後指標的區分也相當重要。落後指標如營收和獲利反映的是過去的績效,而領先指標如網站流量、客戶滿意度則能夠預測未來的績效趨勢。一個完整的 KPI 系統應該同時包含這兩類指標,以便企業既能評估過去也能預測未來。

以下是一個完整的 KPI 管理類別實作,展示如何在 Python 中建構系統化的指標追蹤機制:

# 匯入必要的套件
# pandas 用於資料處理和分析
# numpy 用於數值運算
# datetime 用於日期時間處理
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

# 定義 KPI 類型的列舉類別
# 區分領先指標和落後指標,便於後續分類處理
class KPIType(Enum):
    LEADING = "leading"      # 領先指標:可預測未來績效
    LAGGING = "lagging"      # 落後指標:反映過去績效

# 定義 KPI 資料類別
# 使用 dataclass 裝飾器簡化類別定義
# 包含 KPI 的所有必要屬性
@dataclass
class KPIDefinition:
    name: str                          # KPI 名稱
    description: str                   # KPI 描述說明
    kpi_type: KPIType                  # KPI 類型(領先或落後)
    target_value: float                # 目標值
    unit: str                          # 單位(如百分比、金額等)
    calculation_formula: str           # 計算公式說明
    data_sources: List[str]            # 資料來源列表
    update_frequency: str              # 更新頻率(如每日、每週)
    owner: str                         # 負責人或部門

# KPI 管理器類別
# 提供 KPI 的註冊、計算、追蹤和報告功能
class KPIManager:
    def __init__(self):
        # 初始化 KPI 定義字典,儲存所有已註冊的 KPI
        self.kpi_definitions: Dict[str, KPIDefinition] = {}
        # 初始化 KPI 歷史資料字典,儲存各 KPI 的時間序列資料
        self.kpi_history: Dict[str, pd.DataFrame] = {}

    def register_kpi(self, kpi: KPIDefinition) -> None:
        """
        註冊新的 KPI 定義
        將 KPI 加入管理系統並初始化其歷史資料儲存空間

        參數:
            kpi: KPIDefinition 物件,包含 KPI 的完整定義
        """
        # 將 KPI 定義加入字典
        self.kpi_definitions[kpi.name] = kpi
        # 初始化該 KPI 的歷史資料 DataFrame
        # 包含日期、實際值、目標值和達成率四個欄位
        self.kpi_history[kpi.name] = pd.DataFrame(
            columns=['date', 'actual_value', 'target_value', 'achievement_rate']
        )

    def record_kpi_value(self, kpi_name: str, date: datetime,
                         actual_value: float) -> None:
        """
        記錄 KPI 的實際值
        自動計算達成率並儲存至歷史資料

        參數:
            kpi_name: KPI 名稱
            date: 記錄日期
            actual_value: 實際測量值
        """
        # 檢查 KPI 是否已註冊
        if kpi_name not in self.kpi_definitions:
            raise ValueError(f"KPI '{kpi_name}' 尚未註冊")

        # 取得該 KPI 的目標值
        target = self.kpi_definitions[kpi_name].target_value
        # 計算達成率(實際值除以目標值的百分比)
        achievement_rate = (actual_value / target) * 100 if target != 0 else 0

        # 建立新的記錄資料
        new_record = pd.DataFrame([{
            'date': date,
            'actual_value': actual_value,
            'target_value': target,
            'achievement_rate': achievement_rate
        }])

        # 將新記錄加入歷史資料
        self.kpi_history[kpi_name] = pd.concat(
            [self.kpi_history[kpi_name], new_record],
            ignore_index=True
        )

    def calculate_trend(self, kpi_name: str, periods: int = 7) -> Dict:
        """
        計算 KPI 的趨勢分析
        包含移動平均、成長率和趨勢方向判斷

        參數:
            kpi_name: KPI 名稱
            periods: 計算趨勢的期數,預設為 7 期

        回傳:
            包含趨勢分析結果的字典
        """
        # 檢查是否有足夠的歷史資料
        if kpi_name not in self.kpi_history:
            return {"error": "無歷史資料"}

        # 取得歷史資料
        history = self.kpi_history[kpi_name]

        # 若資料筆數不足,回傳錯誤訊息
        if len(history) < periods:
            return {"error": f"資料不足,需要至少 {periods} 筆資料"}

        # 取得最近 periods 期的資料
        recent_data = history.tail(periods)

        # 計算移動平均值
        moving_average = recent_data['actual_value'].mean()

        # 計算期間成長率
        # 使用第一筆和最後一筆資料計算
        first_value = recent_data['actual_value'].iloc[0]
        last_value = recent_data['actual_value'].iloc[-1]
        growth_rate = ((last_value - first_value) / first_value * 100
                       if first_value != 0 else 0)

        # 判斷趨勢方向
        # 根據成長率正負來決定上升、下降或持平
        if growth_rate > 5:
            trend_direction = "上升"
        elif growth_rate < -5:
            trend_direction = "下降"
        else:
            trend_direction = "持平"

        # 計算標準差,用於評估資料的波動程度
        volatility = recent_data['actual_value'].std()

        # 回傳趨勢分析結果
        return {
            "kpi_name": kpi_name,
            "periods_analyzed": periods,
            "moving_average": round(moving_average, 2),
            "growth_rate": round(growth_rate, 2),
            "trend_direction": trend_direction,
            "volatility": round(volatility, 2),
            "latest_value": last_value,
            "latest_achievement_rate": round(
                recent_data['achievement_rate'].iloc[-1], 2
            )
        }

    def generate_kpi_report(self) -> pd.DataFrame:
        """
        產生所有 KPI 的彙總報告
        包含各 KPI 的最新狀態和達成情況

        回傳:
            包含所有 KPI 彙總資訊的 DataFrame
        """
        # 建立空的報告列表
        report_data = []

        # 遍歷所有已註冊的 KPI
        for kpi_name, kpi_def in self.kpi_definitions.items():
            # 取得該 KPI 的歷史資料
            history = self.kpi_history.get(kpi_name)

            # 若有歷史資料,取得最新值
            if history is not None and len(history) > 0:
                latest = history.iloc[-1]
                actual = latest['actual_value']
                achievement = latest['achievement_rate']
            else:
                actual = None
                achievement = None

            # 建立報告記錄
            report_data.append({
                'KPI 名稱': kpi_name,
                '類型': kpi_def.kpi_type.value,
                '目標值': kpi_def.target_value,
                '實際值': actual,
                '達成率': achievement,
                '單位': kpi_def.unit,
                '負責人': kpi_def.owner
            })

        # 轉換為 DataFrame 並回傳
        return pd.DataFrame(report_data)

這個 KPI 管理器提供了完整的指標生命週期管理功能。從註冊新指標、記錄測量值,到計算趨勢分析和產生報告,所有功能都經過良好的封裝和文件說明。在實際應用中,企業可以根據自身需求擴展這個類別,例如增加告警機制、與資料庫整合或建立視覺化儀表板。

資料管線建構與自動化

資料管線 Data Pipeline 是資料科學專案的基礎設施,負責將原始資料轉化為可供分析的格式。一個設計良好的資料管線能夠確保資料的品質、一致性和時效性,同時降低人工處理的錯誤風險和時間成本。資料管線的建構需要考慮資料的來源多樣性、處理邏輯的複雜度以及下游應用的需求。

現代資料管線通常採用 ETL(Extract、Transform、Load)或 ELT(Extract、Load、Transform)的架構模式。ETL 模式先進行資料轉換再載入目標系統,適合資料量較小或轉換邏輯固定的場景;ELT 模式則先載入原始資料再進行轉換,適合大數據環境和需要靈活分析的場景。無論採用哪種模式,資料管線都需要具備錯誤處理、日誌記錄和監控告警等功能,以確保系統的可靠性和可維護性。

以下是一個完整的資料管線建構類別實作:

# 匯入必要的套件
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Callable, Any, Optional
from abc import ABC, abstractmethod
import logging
import json
import hashlib

# 設定日誌記錄器
# 使用模組名稱作為記錄器名稱,便於追蹤問題來源
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 定義資料來源抽象基礎類別
# 所有具體的資料來源類別都必須繼承此類別並實作抽象方法
class DataSource(ABC):
    @abstractmethod
    def extract(self) -> pd.DataFrame:
        """
        從資料來源擷取資料的抽象方法
        子類別必須實作此方法以定義具體的擷取邏輯

        回傳:
            擷取的資料 DataFrame
        """
        pass

    @abstractmethod
    def get_source_name(self) -> str:
        """
        取得資料來源名稱的抽象方法
        用於識別和記錄資料來源

        回傳:
            資料來源的名稱字串
        """
        pass

# CSV 檔案資料來源類別
# 實作從 CSV 檔案讀取資料的功能
class CSVDataSource(DataSource):
    def __init__(self, file_path: str, encoding: str = 'utf-8'):
        """
        初始化 CSV 資料來源

        參數:
            file_path: CSV 檔案路徑
            encoding: 檔案編碼,預設為 utf-8
        """
        self.file_path = file_path
        self.encoding = encoding

    def extract(self) -> pd.DataFrame:
        """
        從 CSV 檔案擷取資料

        回傳:
            讀取的資料 DataFrame
        """
        # 記錄擷取開始
        logger.info(f"開始從 {self.file_path} 擷取資料")

        # 讀取 CSV 檔案
        df = pd.read_csv(self.file_path, encoding=self.encoding)

        # 記錄擷取完成和資料筆數
        logger.info(f"成功擷取 {len(df)} 筆資料")
        return df

    def get_source_name(self) -> str:
        return f"CSV: {self.file_path}"

# 資料庫資料來源類別
# 實作從資料庫查詢資料的功能
class DatabaseDataSource(DataSource):
    def __init__(self, connection_string: str, query: str):
        """
        初始化資料庫資料來源

        參數:
            connection_string: 資料庫連線字串
            query: SQL 查詢語句
        """
        self.connection_string = connection_string
        self.query = query

    def extract(self) -> pd.DataFrame:
        """
        從資料庫擷取資料
        此處為示範用途,實際應使用資料庫連線

        回傳:
            查詢結果的 DataFrame
        """
        logger.info(f"執行資料庫查詢: {self.query[:50]}...")
        # 實際應用中應使用 pd.read_sql 配合資料庫連線
        # 此處回傳空 DataFrame 作為示範
        return pd.DataFrame()

    def get_source_name(self) -> str:
        return f"Database Query"

# 資料轉換器類別
# 提供常用的資料轉換功能
class DataTransformer:
    def __init__(self):
        # 儲存轉換步驟的列表
        # 每個步驟是一個包含名稱和函數的元組
        self.transformation_steps: List[Tuple[str, Callable]] = []

    def add_step(self, name: str, transform_func: Callable) -> 'DataTransformer':
        """
        新增轉換步驟
        使用 Fluent Interface 模式,可以鏈式呼叫

        參數:
            name: 步驟名稱
            transform_func: 轉換函數,接受 DataFrame 回傳 DataFrame

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.transformation_steps.append((name, transform_func))
        return self

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        執行所有轉換步驟
        依序套用所有已註冊的轉換函數

        參數:
            df: 輸入的 DataFrame

        回傳:
            轉換後的 DataFrame
        """
        # 複製輸入資料,避免修改原始資料
        result = df.copy()

        # 依序執行每個轉換步驟
        for step_name, transform_func in self.transformation_steps:
            logger.info(f"執行轉換步驟: {step_name}")
            # 記錄轉換前的資料筆數
            before_count = len(result)

            # 執行轉換
            result = transform_func(result)

            # 記錄轉換後的資料筆數
            after_count = len(result)
            logger.info(f"  轉換完成: {before_count} -> {after_count} 筆")

        return result

# 資料品質檢查器類別
# 提供資料品質驗證功能
class DataQualityChecker:
    def __init__(self):
        # 儲存檢查規則的列表
        self.rules: List[Dict] = []

    def add_rule(self, name: str, check_func: Callable[[pd.DataFrame], bool],
                 severity: str = "error") -> 'DataQualityChecker':
        """
        新增資料品質檢查規則

        參數:
            name: 規則名稱
            check_func: 檢查函數,回傳 True 表示通過
            severity: 嚴重程度,可為 error 或 warning

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.rules.append({
            'name': name,
            'check_func': check_func,
            'severity': severity
        })
        return self

    def check(self, df: pd.DataFrame) -> Dict:
        """
        執行所有品質檢查規則

        參數:
            df: 要檢查的 DataFrame

        回傳:
            檢查結果字典,包含通過與否和詳細結果
        """
        results = {
            'passed': True,
            'errors': [],
            'warnings': [],
            'details': []
        }

        # 執行每個檢查規則
        for rule in self.rules:
            rule_name = rule['name']
            check_func = rule['check_func']
            severity = rule['severity']

            # 執行檢查
            passed = check_func(df)

            # 記錄結果
            result_detail = {
                'rule': rule_name,
                'passed': passed,
                'severity': severity
            }
            results['details'].append(result_detail)

            # 若未通過,根據嚴重程度分類
            if not passed:
                if severity == 'error':
                    results['errors'].append(rule_name)
                    results['passed'] = False
                else:
                    results['warnings'].append(rule_name)

        return results

# 資料管線類別
# 整合資料擷取、轉換、品質檢查和載入功能
class DataPipeline:
    def __init__(self, name: str):
        """
        初始化資料管線

        參數:
            name: 管線名稱
        """
        self.name = name
        self.data_sources: List[DataSource] = []
        self.transformer = DataTransformer()
        self.quality_checker = DataQualityChecker()
        self.execution_log: List[Dict] = []

    def add_source(self, source: DataSource) -> 'DataPipeline':
        """
        新增資料來源

        參數:
            source: DataSource 物件

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.data_sources.append(source)
        return self

    def add_transformation(self, name: str,
                           transform_func: Callable) -> 'DataPipeline':
        """
        新增轉換步驟

        參數:
            name: 步驟名稱
            transform_func: 轉換函數

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.transformer.add_step(name, transform_func)
        return self

    def add_quality_rule(self, name: str, check_func: Callable,
                         severity: str = "error") -> 'DataPipeline':
        """
        新增品質檢查規則

        參數:
            name: 規則名稱
            check_func: 檢查函數
            severity: 嚴重程度

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.quality_checker.add_rule(name, check_func, severity)
        return self

    def run(self) -> Tuple[pd.DataFrame, Dict]:
        """
        執行資料管線
        依序執行擷取、合併、轉換和品質檢查

        回傳:
            元組,包含處理後的 DataFrame 和執行記錄
        """
        # 記錄執行開始時間
        start_time = datetime.now()
        logger.info(f"開始執行管線: {self.name}")

        execution_record = {
            'pipeline_name': self.name,
            'start_time': start_time.isoformat(),
            'status': 'running',
            'steps': []
        }

        try:
            # 步驟一:從所有來源擷取資料
            extracted_data = []
            for source in self.data_sources:
                df = source.extract()
                extracted_data.append(df)
                execution_record['steps'].append({
                    'step': 'extract',
                    'source': source.get_source_name(),
                    'rows': len(df)
                })

            # 步驟二:合併所有資料來源
            if len(extracted_data) == 1:
                combined_df = extracted_data[0]
            else:
                # 使用 concat 合併多個資料來源
                combined_df = pd.concat(extracted_data, ignore_index=True)

            logger.info(f"合併後資料筆數: {len(combined_df)}")

            # 步驟三:執行轉換
            transformed_df = self.transformer.transform(combined_df)
            execution_record['steps'].append({
                'step': 'transform',
                'input_rows': len(combined_df),
                'output_rows': len(transformed_df)
            })

            # 步驟四:執行品質檢查
            quality_results = self.quality_checker.check(transformed_df)
            execution_record['steps'].append({
                'step': 'quality_check',
                'results': quality_results
            })

            # 若品質檢查失敗,記錄錯誤但仍回傳資料
            if not quality_results['passed']:
                logger.warning(f"品質檢查未通過: {quality_results['errors']}")
                execution_record['status'] = 'completed_with_errors'
            else:
                execution_record['status'] = 'success'

            # 記錄執行結束時間
            end_time = datetime.now()
            execution_record['end_time'] = end_time.isoformat()
            execution_record['duration_seconds'] = (
                end_time - start_time
            ).total_seconds()

            # 儲存執行記錄
            self.execution_log.append(execution_record)

            logger.info(f"管線執行完成,耗時: {execution_record['duration_seconds']} 秒")

            return transformed_df, execution_record

        except Exception as e:
            # 捕獲並記錄錯誤
            execution_record['status'] = 'failed'
            execution_record['error'] = str(e)
            execution_record['end_time'] = datetime.now().isoformat()
            self.execution_log.append(execution_record)
            logger.error(f"管線執行失敗: {e}")
            raise

這個資料管線框架提供了模組化和可擴展的設計。透過抽象基礎類別和 Fluent Interface 模式,開發者可以輕鬆地添加新的資料來源類型、轉換邏輯和品質檢查規則。日誌記錄和執行追蹤功能則確保了系統的可觀測性和問題排查能力。

預測模型在商業決策中的應用

預測模型是資料科學驅動商業決策的核心工具。透過分析歷史資料中的模式和關聯,預測模型能夠對未來的事件或結果做出估計,從而支援企業的策略規劃和營運決策。常見的商業預測應用包括銷售預測、客戶流失預測、需求預測和風險評估等。預測模型的價值不僅在於提供準確的數字預測,更在於量化不確定性和揭示影響因素的重要性。

預測模型的建構流程包含特徵工程、模型選擇、訓練驗證和效能評估等步驟。特徵工程是將原始資料轉換為模型可用的輸入變數的過程,這個過程需要結合領域知識和統計分析來識別和創造有預測力的特徵。模型選擇則需要考慮問題的性質、資料的特性以及模型的可解釋性需求。在商業應用中,模型的可解釋性往往與預測準確度同樣重要,因為決策者需要理解預測結果背後的邏輯才能做出有信心的決策。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "歷史資料" as A
rectangle "特徵工程" as B
rectangle "模型訓練" as C
rectangle "模型驗證" as D
rectangle "預測輸出" as E
rectangle "決策應用" as F

A --> B
B --> C
C --> D
D --> E
E --> F
D --> C : 調整參數

@enduml

以下是一個完整的商業預測模型類別實作,展示如何建構銷售預測和客戶流失預測模型:

# 匯入必要的套件
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import (mean_squared_error, mean_absolute_error,
                              r2_score, accuracy_score, precision_score,
                              recall_score, f1_score, roc_auc_score)
from typing import Dict, List, Tuple, Optional, Any
import warnings
warnings.filterwarnings('ignore')

# 商業分析類別
# 提供預測模型建構和特徵重要性分析功能
class BusinessAnalytics:
    def __init__(self):
        """
        初始化商業分析器
        設定模型和標準化器的儲存空間
        """
        # 儲存已訓練的模型
        self.models: Dict[str, Any] = {}
        # 儲存特徵標準化器
        self.scalers: Dict[str, StandardScaler] = {}
        # 儲存模型效能指標
        self.model_metrics: Dict[str, Dict] = {}
        # 儲存特徵重要性
        self.feature_importance: Dict[str, pd.DataFrame] = {}

    def prepare_features(self, df: pd.DataFrame,
                         target_column: str,
                         feature_columns: List[str],
                         scale: bool = True) -> Tuple[np.ndarray, np.ndarray]:
        """
        準備模型訓練所需的特徵和目標變數

        參數:
            df: 輸入資料 DataFrame
            target_column: 目標變數欄位名稱
            feature_columns: 特徵欄位名稱列表
            scale: 是否進行特徵標準化

        回傳:
            元組,包含特徵矩陣 X 和目標向量 y
        """
        # 擷取特徵和目標變數
        X = df[feature_columns].values
        y = df[target_column].values

        # 若需要標準化,建立並套用 StandardScaler
        if scale:
            scaler = StandardScaler()
            X = scaler.fit_transform(X)
            # 儲存標準化器以供後續預測使用
            self.scalers[target_column] = scaler

        return X, y

    def train_sales_forecast_model(self, df: pd.DataFrame,
                                    target_column: str,
                                    feature_columns: List[str],
                                    model_type: str = 'random_forest',
                                    test_size: float = 0.2) -> Dict:
        """
        訓練銷售預測模型
        使用迴歸方法預測連續型銷售數值

        參數:
            df: 訓練資料
            target_column: 目標變數欄位(銷售額)
            feature_columns: 特徵欄位列表
            model_type: 模型類型,random_forest 或 linear
            test_size: 測試集比例

        回傳:
            包含模型效能指標的字典
        """
        # 準備特徵和目標變數
        X, y = self.prepare_features(df, target_column, feature_columns)

        # 分割訓練集和測試集
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42
        )

        # 根據指定類型建立模型
        if model_type == 'random_forest':
            # Random Forest 適合捕捉非線性關係
            model = RandomForestRegressor(
                n_estimators=100,      # 決策樹數量
                max_depth=10,          # 最大深度
                min_samples_split=5,   # 分割所需最小樣本數
                random_state=42
            )
        else:
            # Linear Regression 適合線性關係,易於解釋
            model = LinearRegression()

        # 訓練模型
        model.fit(X_train, y_train)

        # 進行預測
        y_pred = model.predict(X_test)

        # 計算效能指標
        metrics = {
            'model_type': model_type,
            'target': target_column,
            # RMSE:均方根誤差,越小越好
            'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
            # MAE:平均絕對誤差,越小越好
            'mae': mean_absolute_error(y_test, y_pred),
            # R2:決定係數,越接近 1 越好
            'r2_score': r2_score(y_test, y_pred),
            # MAPE:平均絕對百分比誤差
            'mape': np.mean(np.abs((y_test - y_pred) / y_test)) * 100,
            'test_samples': len(y_test),
            'train_samples': len(y_train)
        }

        # 儲存模型和指標
        model_name = f"sales_forecast_{target_column}"
        self.models[model_name] = model
        self.model_metrics[model_name] = metrics

        # 計算並儲存特徵重要性
        if model_type == 'random_forest':
            importance_df = pd.DataFrame({
                'feature': feature_columns,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)
            self.feature_importance[model_name] = importance_df

        return metrics

    def train_churn_prediction_model(self, df: pd.DataFrame,
                                      target_column: str,
                                      feature_columns: List[str],
                                      model_type: str = 'random_forest',
                                      test_size: float = 0.2) -> Dict:
        """
        訓練客戶流失預測模型
        使用分類方法預測客戶是否會流失

        參數:
            df: 訓練資料
            target_column: 目標變數欄位(流失標記,0/1)
            feature_columns: 特徵欄位列表
            model_type: 模型類型
            test_size: 測試集比例

        回傳:
            包含模型效能指標的字典
        """
        # 準備特徵和目標變數
        X, y = self.prepare_features(df, target_column, feature_columns)

        # 分割訓練集和測試集
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42
        )

        # 根據指定類型建立分類模型
        if model_type == 'random_forest':
            model = RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                random_state=42,
                class_weight='balanced'  # 處理類別不平衡
            )
        else:
            model = LogisticRegression(
                random_state=42,
                class_weight='balanced',
                max_iter=1000
            )

        # 訓練模型
        model.fit(X_train, y_train)

        # 進行預測
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]

        # 計算分類效能指標
        metrics = {
            'model_type': model_type,
            'target': target_column,
            # 準確率:正確預測的比例
            'accuracy': accuracy_score(y_test, y_pred),
            # 精確率:預測為正例中實際為正例的比例
            'precision': precision_score(y_test, y_pred),
            # 召回率:實際正例中被正確預測的比例
            'recall': recall_score(y_test, y_pred),
            # F1 分數:精確率和召回率的調和平均
            'f1_score': f1_score(y_test, y_pred),
            # AUC:ROC 曲線下面積
            'auc_roc': roc_auc_score(y_test, y_pred_proba),
            'test_samples': len(y_test),
            'train_samples': len(y_train)
        }

        # 儲存模型和指標
        model_name = f"churn_prediction_{target_column}"
        self.models[model_name] = model
        self.model_metrics[model_name] = metrics

        # 計算並儲存特徵重要性
        if model_type == 'random_forest':
            importance_df = pd.DataFrame({
                'feature': feature_columns,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)
            self.feature_importance[model_name] = importance_df

        return metrics

    def predict(self, model_name: str, X: pd.DataFrame) -> np.ndarray:
        """
        使用已訓練的模型進行預測

        參數:
            model_name: 模型名稱
            X: 輸入特徵 DataFrame

        回傳:
            預測結果陣列
        """
        # 檢查模型是否存在
        if model_name not in self.models:
            raise ValueError(f"模型 '{model_name}' 不存在")

        # 取得模型
        model = self.models[model_name]

        # 取得特徵值
        X_values = X.values

        # 若有對應的標準化器,套用標準化
        # 從模型名稱中提取目標欄位名稱
        target_key = model_name.split('_')[-1]
        if target_key in self.scalers:
            X_values = self.scalers[target_key].transform(X_values)

        # 回傳預測結果
        return model.predict(X_values)

    def get_feature_importance(self, model_name: str) -> pd.DataFrame:
        """
        取得指定模型的特徵重要性

        參數:
            model_name: 模型名稱

        回傳:
            特徵重要性 DataFrame
        """
        if model_name in self.feature_importance:
            return self.feature_importance[model_name]
        else:
            return pd.DataFrame({'message': ['此模型無特徵重要性資訊']})

    def generate_model_report(self) -> pd.DataFrame:
        """
        產生所有模型的效能報告

        回傳:
            模型效能彙總 DataFrame
        """
        # 將所有模型指標轉換為 DataFrame
        if not self.model_metrics:
            return pd.DataFrame({'message': ['尚無已訓練的模型']})

        report_data = []
        for model_name, metrics in self.model_metrics.items():
            record = {'model_name': model_name}
            record.update(metrics)
            report_data.append(record)

        return pd.DataFrame(report_data)

這個商業分析類別提供了完整的預測模型建構功能。透過封裝常用的機器學習流程,企業可以快速建立銷售預測和客戶流失預測模型。特徵重要性分析功能則幫助決策者理解哪些因素對預測結果影響最大,從而制定更有針對性的商業策略。

ROI 分析與價值量化

ROI(Return on Investment)分析是評估商業決策價值的核心方法。透過系統化的 ROI 分析,企業可以比較不同方案的經濟效益、優先排序投資專案,並為決策提供量化的依據。資料科學專案的 ROI 分析尤其重要,因為這類專案通常需要前期投資但效益延遲顯現,必須透過嚴謹的分析來論證其價值。

ROI 分析的基本公式為 ROI 等於淨利益除以成本再乘以百分之百。然而在實務應用中,計算的複雜度遠超這個簡單公式。首先,成本的估算需要考慮直接成本(如軟體、硬體、人力)和間接成本(如機會成本、培訓成本)。其次,效益的量化需要區分有形效益(如營收增加、成本節省)和無形效益(如客戶滿意度提升、品牌價值增加)。此外,時間價值也是重要考量,未來的現金流需要折現到現值才能進行比較。

以下是一個完整的 ROI 分析類別實作:

# 匯入必要的套件
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from dataclasses import dataclass

# 定義成本項目資料類別
@dataclass
class CostItem:
    name: str              # 成本項目名稱
    amount: float          # 金額
    category: str          # 類別(直接/間接)
    is_recurring: bool     # 是否為經常性成本
    frequency: str         # 發生頻率(一次性/月/年)

# 定義效益項目資料類別
@dataclass
class BenefitItem:
    name: str              # 效益項目名稱
    amount: float          # 金額
    category: str          # 類別(有形/無形)
    probability: float     # 實現機率(0-1)
    time_to_realize: int   # 實現所需月數

# ROI 分析器類別
class ROIAnalyzer:
    def __init__(self, project_name: str, analysis_period_months: int = 36,
                 discount_rate: float = 0.10):
        """
        初始化 ROI 分析器

        參數:
            project_name: 專案名稱
            analysis_period_months: 分析期間(月數)
            discount_rate: 年折現率
        """
        self.project_name = project_name
        self.analysis_period_months = analysis_period_months
        # 將年折現率轉換為月折現率
        self.monthly_discount_rate = (1 + discount_rate) ** (1/12) - 1

        # 儲存成本和效益項目
        self.costs: List[CostItem] = []
        self.benefits: List[BenefitItem] = []

    def add_cost(self, cost: CostItem) -> 'ROIAnalyzer':
        """
        新增成本項目

        參數:
            cost: CostItem 物件

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.costs.append(cost)
        return self

    def add_benefit(self, benefit: BenefitItem) -> 'ROIAnalyzer':
        """
        新增效益項目

        參數:
            benefit: BenefitItem 物件

        回傳:
            自身實例,支援鏈式呼叫
        """
        self.benefits.append(benefit)
        return self

    def calculate_total_cost(self) -> Tuple[float, pd.DataFrame]:
        """
        計算總成本
        考慮一次性成本和經常性成本

        回傳:
            元組,包含總成本金額和成本明細 DataFrame
        """
        cost_details = []
        total_cost = 0

        for cost in self.costs:
            # 根據成本頻率計算總金額
            if cost.is_recurring:
                if cost.frequency == '月':
                    # 月度成本乘以分析期間月數
                    period_cost = cost.amount * self.analysis_period_months
                elif cost.frequency == '年':
                    # 年度成本乘以分析期間年數
                    years = self.analysis_period_months / 12
                    period_cost = cost.amount * years
                else:
                    period_cost = cost.amount
            else:
                # 一次性成本
                period_cost = cost.amount

            # 累加總成本
            total_cost += period_cost

            # 記錄成本明細
            cost_details.append({
                '項目名稱': cost.name,
                '單位金額': cost.amount,
                '類別': cost.category,
                '頻率': cost.frequency if cost.is_recurring else '一次性',
                '期間總成本': period_cost
            })

        return total_cost, pd.DataFrame(cost_details)

    def calculate_total_benefit(self) -> Tuple[float, pd.DataFrame]:
        """
        計算總效益
        考慮實現機率和折現值

        回傳:
            元組,包含總效益金額和效益明細 DataFrame
        """
        benefit_details = []
        total_benefit = 0

        for benefit in self.benefits:
            # 計算效益開始實現後的月數
            remaining_months = max(
                0,
                self.analysis_period_months - benefit.time_to_realize
            )

            # 若效益尚未開始實現,跳過
            if remaining_months <= 0:
                period_benefit = 0
            else:
                # 假設效益每月均勻實現
                monthly_benefit = benefit.amount / 12

                # 計算折現後的總效益
                discounted_benefit = 0
                for month in range(int(remaining_months)):
                    # 從效益開始實現的月份計算折現
                    actual_month = benefit.time_to_realize + month
                    # 套用折現因子
                    discount_factor = 1 / (
                        (1 + self.monthly_discount_rate) ** actual_month
                    )
                    discounted_benefit += monthly_benefit * discount_factor

                # 乘以實現機率得到期望效益
                period_benefit = discounted_benefit * benefit.probability

            # 累加總效益
            total_benefit += period_benefit

            # 記錄效益明細
            benefit_details.append({
                '項目名稱': benefit.name,
                '年度金額': benefit.amount,
                '類別': benefit.category,
                '實現機率': f"{benefit.probability * 100:.0f}%",
                '實現延遲(月)': benefit.time_to_realize,
                '期間總效益(折現)': round(period_benefit, 2)
            })

        return total_benefit, pd.DataFrame(benefit_details)

    def calculate_roi(self) -> Dict:
        """
        計算 ROI 和相關財務指標

        回傳:
            包含所有財務分析結果的字典
        """
        # 計算總成本和總效益
        total_cost, cost_df = self.calculate_total_cost()
        total_benefit, benefit_df = self.calculate_total_benefit()

        # 計算淨現值 NPV
        npv = total_benefit - total_cost

        # 計算 ROI 百分比
        roi_percentage = (npv / total_cost * 100) if total_cost > 0 else 0

        # 計算效益成本比 BCR
        bcr = total_benefit / total_cost if total_cost > 0 else 0

        # 估算回本期間
        if total_benefit > 0:
            # 假設效益均勻分布,計算回本所需月數
            monthly_benefit = total_benefit / self.analysis_period_months
            payback_months = (total_cost / monthly_benefit
                              if monthly_benefit > 0 else float('inf'))
        else:
            payback_months = float('inf')

        # 建立分析結果
        results = {
            'project_name': self.project_name,
            'analysis_period_months': self.analysis_period_months,
            'total_cost': round(total_cost, 2),
            'total_benefit': round(total_benefit, 2),
            'npv': round(npv, 2),
            'roi_percentage': round(roi_percentage, 2),
            'bcr': round(bcr, 2),
            'payback_months': round(payback_months, 1),
            'recommendation': self._generate_recommendation(
                roi_percentage, bcr, payback_months
            ),
            'cost_details': cost_df,
            'benefit_details': benefit_df
        }

        return results

    def _generate_recommendation(self, roi: float, bcr: float,
                                  payback: float) -> str:
        """
        根據財務指標產生投資建議

        參數:
            roi: ROI 百分比
            bcr: 效益成本比
            payback: 回本期間(月)

        回傳:
            投資建議文字
        """
        # 根據多個指標綜合評估
        if roi > 100 and bcr > 2 and payback < 12:
            return "強烈建議投資:高報酬、快速回本"
        elif roi > 50 and bcr > 1.5 and payback < 24:
            return "建議投資:良好的財務表現"
        elif roi > 0 and bcr > 1:
            return "可考慮投資:正向但需謹慎評估"
        else:
            return "不建議投資:財務效益不足"

    def sensitivity_analysis(self, variable: str,
                              range_percent: float = 0.3,
                              steps: int = 5) -> pd.DataFrame:
        """
        敏感度分析
        評估關鍵變數變化對 ROI 的影響

        參數:
            variable: 變數類型(cost 或 benefit)
            range_percent: 變化範圍百分比
            steps: 分析步驟數

        回傳:
            敏感度分析結果 DataFrame
        """
        # 計算基準值
        base_results = self.calculate_roi()
        base_roi = base_results['roi_percentage']

        # 產生變化比例序列
        multipliers = np.linspace(
            1 - range_percent,
            1 + range_percent,
            steps
        )

        results = []
        for mult in multipliers:
            # 根據變數類型調整計算
            if variable == 'cost':
                # 暫時調整成本
                adjusted_cost = base_results['total_cost'] * mult
                adjusted_benefit = base_results['total_benefit']
            else:
                adjusted_cost = base_results['total_cost']
                adjusted_benefit = base_results['total_benefit'] * mult

            # 計算調整後的 ROI
            npv = adjusted_benefit - adjusted_cost
            adjusted_roi = (npv / adjusted_cost * 100
                            if adjusted_cost > 0 else 0)

            results.append({
                '變化比例': f"{(mult - 1) * 100:+.0f}%",
                f'調整後{variable}': round(
                    adjusted_cost if variable == 'cost'
                    else adjusted_benefit, 2
                ),
                'ROI (%)': round(adjusted_roi, 2),
                'ROI 變化': f"{adjusted_roi - base_roi:+.2f}%"
            })

        return pd.DataFrame(results)

這個 ROI 分析器提供了完整的財務分析功能,包含成本效益計算、NPV 分析、回本期間估算以及敏感度分析。透過這些功能,企業可以對資料科學專案進行嚴謹的財務評估,並向決策者提供清晰的投資建議。

決策支援系統的建構

決策支援系統 Decision Support System 是整合資料、模型和介面的完整解決方案,目的是協助決策者在複雜的商業環境中做出更好的決策。一個有效的決策支援系統需要具備資料整合能力、分析建模能力、情境模擬能力以及結果呈現能力。系統的設計應該以使用者為中心,確保決策者能夠輕鬆地存取資訊、理解分析結果並採取行動。

決策支援系統的核心功能包括資料彙整與視覺化、預測模型整合、情境分析與模擬,以及建議產生與追蹤。資料彙整功能將來自不同來源的資料整合為一致的格式,並透過視覺化呈現關鍵指標的狀態和趨勢。預測模型整合功能將已訓練的模型嵌入系統,提供即時的預測服務。情境分析功能允許決策者調整假設參數,模擬不同決策的可能結果。建議產生功能則根據分析結果自動產生行動建議,並追蹤建議的執行狀況和效果。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

rectangle "資料來源" as DS {
    rectangle "內部系統" as IS
    rectangle "外部資料" as ED
}

rectangle "資料處理層" as DPL {
    rectangle "資料整合" as DI
    rectangle "資料品質" as DQ
}

rectangle "分析引擎" as AE {
    rectangle "預測模型" as PM
    rectangle "情境模擬" as SS
}

rectangle "決策介面" as UI {
    rectangle "儀表板" as DB
    rectangle "報告產生" as RG
}

IS --> DI
ED --> DI
DI --> DQ
DQ --> PM
DQ --> SS
PM --> DB
SS --> DB
DB --> RG

@enduml

以下是一個完整的決策支援系統類別實作:

# 匯入必要的套件
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum

# 定義決策類型列舉
class DecisionType(Enum):
    STRATEGIC = "strategic"      # 策略性決策
    TACTICAL = "tactical"        # 戰術性決策
    OPERATIONAL = "operational"  # 營運性決策

# 定義建議優先級列舉
class Priority(Enum):
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

# 定義決策建議資料類別
@dataclass
class Recommendation:
    id: str                     # 建議編號
    title: str                  # 建議標題
    description: str            # 詳細描述
    decision_type: DecisionType # 決策類型
    priority: Priority          # 優先級
    expected_impact: float      # 預期影響(金額)
    confidence: float           # 信心度(0-1)
    action_items: List[str]     # 行動項目
    deadline: datetime          # 建議執行期限

# 決策支援系統類別
class DecisionSupport:
    def __init__(self):
        """
        初始化決策支援系統
        設定各項功能模組的儲存空間
        """
        # 儲存 KPI 資料
        self.kpi_data: Dict[str, pd.DataFrame] = {}
        # 儲存預測模型
        self.models: Dict[str, Any] = {}
        # 儲存產生的建議
        self.recommendations: List[Recommendation] = []
        # 儲存情境模擬結果
        self.scenarios: Dict[str, Dict] = {}
        # 儲存決策追蹤記錄
        self.decision_log: List[Dict] = []

    def load_kpi_data(self, kpi_name: str, data: pd.DataFrame) -> None:
        """
        載入 KPI 資料

        參數:
            kpi_name: KPI 名稱
            data: KPI 資料 DataFrame
        """
        # 確保資料包含必要欄位
        required_columns = ['date', 'value']
        for col in required_columns:
            if col not in data.columns:
                raise ValueError(f"資料缺少必要欄位: {col}")

        # 儲存資料
        self.kpi_data[kpi_name] = data.copy()

    def register_model(self, model_name: str, model: Any,
                       model_type: str) -> None:
        """
        註冊預測模型

        參數:
            model_name: 模型名稱
            model: 模型物件
            model_type: 模型類型描述
        """
        self.models[model_name] = {
            'model': model,
            'type': model_type,
            'registered_at': datetime.now()
        }

    def analyze_kpi_status(self, kpi_name: str) -> Dict:
        """
        分析 KPI 狀態
        包含趨勢分析和異常偵測

        參數:
            kpi_name: KPI 名稱

        回傳:
            KPI 分析結果字典
        """
        # 檢查資料是否存在
        if kpi_name not in self.kpi_data:
            return {'error': f'找不到 KPI: {kpi_name}'}

        # 取得資料
        data = self.kpi_data[kpi_name]

        # 計算基本統計量
        latest_value = data['value'].iloc[-1]
        mean_value = data['value'].mean()
        std_value = data['value'].std()

        # 計算趨勢
        # 使用最近 7 筆資料計算簡單移動平均
        if len(data) >= 7:
            recent_mean = data['value'].tail(7).mean()
            older_mean = data['value'].tail(14).head(7).mean()
            trend = 'upward' if recent_mean > older_mean else 'downward'
        else:
            trend = 'insufficient_data'

        # 異常偵測
        # 使用 3 個標準差作為閾值
        z_score = (latest_value - mean_value) / std_value if std_value > 0 else 0
        is_anomaly = abs(z_score) > 3

        # 建立分析結果
        return {
            'kpi_name': kpi_name,
            'latest_value': round(latest_value, 2),
            'mean': round(mean_value, 2),
            'std': round(std_value, 2),
            'trend': trend,
            'z_score': round(z_score, 2),
            'is_anomaly': is_anomaly,
            'data_points': len(data)
        }

    def run_scenario(self, scenario_name: str,
                     base_assumptions: Dict,
                     variations: List[Dict]) -> Dict:
        """
        執行情境模擬
        分析不同假設下的結果

        參數:
            scenario_name: 情境名稱
            base_assumptions: 基礎假設參數
            variations: 變化情境列表

        回傳:
            情境模擬結果字典
        """
        results = {
            'scenario_name': scenario_name,
            'base_case': self._calculate_scenario_outcome(base_assumptions),
            'variations': []
        }

        # 計算各變化情境的結果
        for i, variation in enumerate(variations):
            # 合併基礎假設和變化參數
            scenario_params = {**base_assumptions, **variation}
            outcome = self._calculate_scenario_outcome(scenario_params)
            outcome['variation_id'] = i + 1
            outcome['changed_params'] = list(variation.keys())
            results['variations'].append(outcome)

        # 儲存情境結果
        self.scenarios[scenario_name] = results

        return results

    def _calculate_scenario_outcome(self, params: Dict) -> Dict:
        """
        計算情境結果
        根據參數計算預期結果

        參數:
            params: 情境參數

        回傳:
            計算結果字典
        """
        # 這是一個簡化的計算邏輯
        # 實際應用中應根據具體業務模型計算

        # 假設有營收、成本和成長率三個參數
        revenue = params.get('revenue', 1000000)
        cost = params.get('cost', 800000)
        growth_rate = params.get('growth_rate', 0.1)

        # 計算淨利
        profit = revenue - cost
        # 計算利潤率
        margin = profit / revenue if revenue > 0 else 0
        # 計算預期成長後的營收
        projected_revenue = revenue * (1 + growth_rate)

        return {
            'revenue': revenue,
            'cost': cost,
            'profit': profit,
            'margin': round(margin * 100, 2),
            'projected_revenue': round(projected_revenue, 2),
            'growth_rate': growth_rate
        }

    def generate_recommendations(self) -> List[Recommendation]:
        """
        根據分析結果產生決策建議

        回傳:
            建議列表
        """
        recommendations = []

        # 分析所有 KPI 並產生建議
        for kpi_name in self.kpi_data.keys():
            analysis = self.analyze_kpi_status(kpi_name)

            # 若偵測到異常,產生高優先級建議
            if analysis.get('is_anomaly', False):
                rec = Recommendation(
                    id=f"REC-{len(recommendations)+1:03d}",
                    title=f"{kpi_name} 異常警示",
                    description=f"{kpi_name} 出現異常值,需要立即關注。"
                                f"Z-score: {analysis['z_score']}",
                    decision_type=DecisionType.OPERATIONAL,
                    priority=Priority.HIGH,
                    expected_impact=0,  # 需根據具體情況估算
                    confidence=0.9,
                    action_items=[
                        "立即調查異常原因",
                        "確認資料正確性",
                        "評估對業務的影響"
                    ],
                    deadline=datetime.now() + timedelta(days=1)
                )
                recommendations.append(rec)

            # 若趨勢向下,產生中優先級建議
            if analysis.get('trend') == 'downward':
                rec = Recommendation(
                    id=f"REC-{len(recommendations)+1:03d}",
                    title=f"{kpi_name} 趨勢下降",
                    description=f"{kpi_name} 呈現下降趨勢,建議採取改善措施。",
                    decision_type=DecisionType.TACTICAL,
                    priority=Priority.MEDIUM,
                    expected_impact=0,
                    confidence=0.7,
                    action_items=[
                        "分析下降原因",
                        "制定改善計畫",
                        "設定改善目標"
                    ],
                    deadline=datetime.now() + timedelta(days=7)
                )
                recommendations.append(rec)

        # 儲存建議
        self.recommendations = recommendations

        return recommendations

    def log_decision(self, recommendation_id: str, decision: str,
                     decision_maker: str, notes: str = "") -> None:
        """
        記錄決策執行情況

        參數:
            recommendation_id: 建議編號
            decision: 決策內容
            decision_maker: 決策者
            notes: 備註
        """
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'recommendation_id': recommendation_id,
            'decision': decision,
            'decision_maker': decision_maker,
            'notes': notes
        }

        self.decision_log.append(log_entry)

    def get_dashboard_data(self) -> Dict:
        """
        取得儀表板資料
        彙整所有關鍵資訊供視覺化呈現

        回傳:
            儀表板資料字典
        """
        # 彙整 KPI 分析結果
        kpi_summaries = []
        for kpi_name in self.kpi_data.keys():
            analysis = self.analyze_kpi_status(kpi_name)
            kpi_summaries.append(analysis)

        # 彙整建議統計
        high_priority = len([r for r in self.recommendations
                             if r.priority == Priority.HIGH])
        medium_priority = len([r for r in self.recommendations
                               if r.priority == Priority.MEDIUM])
        low_priority = len([r for r in self.recommendations
                            if r.priority == Priority.LOW])

        # 建立儀表板資料
        dashboard = {
            'generated_at': datetime.now().isoformat(),
            'kpi_count': len(self.kpi_data),
            'kpi_summaries': kpi_summaries,
            'model_count': len(self.models),
            'recommendation_count': len(self.recommendations),
            'recommendations_by_priority': {
                'high': high_priority,
                'medium': medium_priority,
                'low': low_priority
            },
            'scenario_count': len(self.scenarios),
            'decision_log_count': len(self.decision_log)
        }

        return dashboard

    def export_report(self) -> str:
        """
        匯出決策支援報告
        產生結構化的文字報告

        回傳:
            報告文字內容
        """
        # 取得儀表板資料
        dashboard = self.get_dashboard_data()

        # 建立報告內容
        report_lines = [
            "=" * 50,
            "決策支援系統報告",
            f"產生時間: {dashboard['generated_at']}",
            "=" * 50,
            "",
            "一、KPI 概覽",
            f"   監控中的 KPI 數量: {dashboard['kpi_count']}",
            ""
        ]

        # 加入 KPI 詳細資訊
        for summary in dashboard['kpi_summaries']:
            report_lines.append(f"   - {summary['kpi_name']}")
            report_lines.append(f"     最新值: {summary['latest_value']}")
            report_lines.append(f"     趨勢: {summary['trend']}")
            if summary['is_anomaly']:
                report_lines.append("     [警告] 偵測到異常值")
            report_lines.append("")

        # 加入建議資訊
        report_lines.extend([
            "二、決策建議",
            f"   總建議數: {dashboard['recommendation_count']}",
            f"   高優先級: {dashboard['recommendations_by_priority']['high']}",
            f"   中優先級: {dashboard['recommendations_by_priority']['medium']}",
            f"   低優先級: {dashboard['recommendations_by_priority']['low']}",
            ""
        ])

        # 列出各項建議
        for rec in self.recommendations:
            report_lines.append(f"   [{rec.id}] {rec.title}")
            report_lines.append(f"      優先級: {rec.priority.value}")
            report_lines.append(f"      期限: {rec.deadline.strftime('%Y-%m-%d')}")
            report_lines.append("")

        report_lines.append("=" * 50)

        return "\n".join(report_lines)

這個決策支援系統整合了 KPI 監控、預測模型、情境模擬和建議產生等功能。透過結構化的設計,系統能夠為決策者提供全面的資訊支援,並自動產生可行的決策建議。決策追蹤功能則確保每個建議的執行情況都有完整的記錄,便於後續的效果評估和持續改善。

實務案例:電商平台決策優化

為了展示上述框架的實際應用,我們以一個電商平台的決策優化案例來說明完整的實作流程。這個案例涵蓋了從 KPI 定義到決策執行的完整週期,展示資料科學如何在實務中驅動商業決策。

電商平台面臨的核心挑戰是提升客戶終身價值並降低客戶流失率。為達成這個目標,平台需要建立一套完整的分析系統來追蹤關鍵指標、預測客戶行為並產生最佳化建議。以下是這個案例的完整實作示範:

# 匯入先前定義的類別和必要套件
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 模擬產生電商平台的樣本資料
# 用於示範決策支援系統的完整應用
def generate_sample_data(n_customers: int = 1000,
                          n_days: int = 90) -> Dict[str, pd.DataFrame]:
    """
    產生電商平台的模擬資料

    參數:
        n_customers: 客戶數量
        n_days: 資料天數

    回傳:
        包含各類資料的字典
    """
    np.random.seed(42)

    # 產生客戶資料
    customer_data = pd.DataFrame({
        'customer_id': range(1, n_customers + 1),
        # 會員等級影響消費行為
        'membership_tier': np.random.choice(
            ['bronze', 'silver', 'gold'],
            n_customers,
            p=[0.6, 0.3, 0.1]
        ),
        # 註冊天數
        'days_since_registration': np.random.randint(30, 730, n_customers),
        # 總消費金額
        'total_spend': np.random.exponential(5000, n_customers),
        # 訂單數量
        'order_count': np.random.poisson(5, n_customers),
        # 最近購買天數
        'days_since_last_purchase': np.random.exponential(30, n_customers),
        # 客服聯繫次數
        'support_tickets': np.random.poisson(1, n_customers),
        # 流失標記(作為預測目標)
        'churned': np.random.choice([0, 1], n_customers, p=[0.85, 0.15])
    })

    # 產生每日 KPI 資料
    dates = pd.date_range(
        end=datetime.now(),
        periods=n_days,
        freq='D'
    )

    # 每日營收資料
    daily_revenue = pd.DataFrame({
        'date': dates,
        'value': np.random.normal(100000, 15000, n_days).cumsum() / n_days
    })

    # 每日轉換率資料
    daily_conversion = pd.DataFrame({
        'date': dates,
        'value': np.random.normal(3.5, 0.5, n_days)  # 轉換率百分比
    })

    # 每日客戶滿意度資料
    daily_satisfaction = pd.DataFrame({
        'date': dates,
        'value': np.random.normal(4.2, 0.3, n_days)  # 1-5 分
    })

    return {
        'customers': customer_data,
        'daily_revenue': daily_revenue,
        'daily_conversion': daily_conversion,
        'daily_satisfaction': daily_satisfaction
    }

# 主要執行函數
# 示範完整的決策支援系統應用流程
def run_ecommerce_decision_support():
    """
    執行電商平台決策支援系統示範
    涵蓋完整的分析和決策流程
    """
    # 步驟一:產生樣本資料
    print("步驟一:載入資料...")
    sample_data = generate_sample_data()

    # 步驟二:初始化並設定 KPI 管理器
    print("步驟二:設定 KPI...")
    kpi_manager = KPIManager()

    # 註冊營收 KPI
    revenue_kpi = KPIDefinition(
        name="每日營收",
        description="每日總營業收入",
        kpi_type=KPIType.LAGGING,
        target_value=100000,
        unit="TWD",
        calculation_formula="SUM(order_amount)",
        data_sources=["orders_table"],
        update_frequency="每日",
        owner="財務部"
    )
    kpi_manager.register_kpi(revenue_kpi)

    # 註冊轉換率 KPI
    conversion_kpi = KPIDefinition(
        name="網站轉換率",
        description="訪客轉換為購買者的比率",
        kpi_type=KPIType.LEADING,
        target_value=4.0,
        unit="%",
        calculation_formula="(orders / visitors) * 100",
        data_sources=["web_analytics", "orders_table"],
        update_frequency="每日",
        owner="行銷部"
    )
    kpi_manager.register_kpi(conversion_kpi)

    # 載入歷史 KPI 數據
    for _, row in sample_data['daily_revenue'].iterrows():
        kpi_manager.record_kpi_value("每日營收", row['date'], row['value'])

    for _, row in sample_data['daily_conversion'].iterrows():
        kpi_manager.record_kpi_value("網站轉換率", row['date'], row['value'])

    # 產生 KPI 報告
    kpi_report = kpi_manager.generate_kpi_report()
    print("\nKPI 報告:")
    print(kpi_report.to_string())

    # 步驟三:建立預測模型
    print("\n步驟三:訓練預測模型...")
    analytics = BusinessAnalytics()

    # 訓練客戶流失預測模型
    churn_metrics = analytics.train_churn_prediction_model(
        df=sample_data['customers'],
        target_column='churned',
        feature_columns=[
            'days_since_registration',
            'total_spend',
            'order_count',
            'days_since_last_purchase',
            'support_tickets'
        ],
        model_type='random_forest'
    )

    print("\n客戶流失預測模型效能:")
    for key, value in churn_metrics.items():
        print(f"  {key}: {value}")

    # 取得特徵重要性
    importance = analytics.get_feature_importance("churn_prediction_churned")
    print("\n特徵重要性排序:")
    print(importance.to_string())

    # 步驟四:執行 ROI 分析
    print("\n步驟四:ROI 分析...")
    roi_analyzer = ROIAnalyzer(
        project_name="客戶流失預防系統",
        analysis_period_months=24,
        discount_rate=0.08
    )

    # 新增成本項目
    roi_analyzer.add_cost(CostItem(
        name="系統開發",
        amount=500000,
        category="直接成本",
        is_recurring=False,
        frequency="一次性"
    ))

    roi_analyzer.add_cost(CostItem(
        name="雲端運算資源",
        amount=15000,
        category="直接成本",
        is_recurring=True,
        frequency="月"
    ))

    roi_analyzer.add_cost(CostItem(
        name="資料團隊人力",
        amount=80000,
        category="直接成本",
        is_recurring=True,
        frequency="月"
    ))

    # 新增效益項目
    roi_analyzer.add_benefit(BenefitItem(
        name="減少客戶流失",
        amount=1200000,
        category="有形效益",
        probability=0.7,
        time_to_realize=3
    ))

    roi_analyzer.add_benefit(BenefitItem(
        name="提升客戶終身價值",
        amount=800000,
        category="有形效益",
        probability=0.6,
        time_to_realize=6
    ))

    # 計算 ROI
    roi_results = roi_analyzer.calculate_roi()
    print("\nROI 分析結果:")
    print(f"  總成本: {roi_results['total_cost']:,.0f} TWD")
    print(f"  總效益: {roi_results['total_benefit']:,.0f} TWD")
    print(f"  NPV: {roi_results['npv']:,.0f} TWD")
    print(f"  ROI: {roi_results['roi_percentage']:.1f}%")
    print(f"  建議: {roi_results['recommendation']}")

    # 步驟五:建立決策支援系統
    print("\n步驟五:啟動決策支援系統...")
    dss = DecisionSupport()

    # 載入 KPI 資料
    dss.load_kpi_data("每日營收", sample_data['daily_revenue'])
    dss.load_kpi_data("網站轉換率", sample_data['daily_conversion'])
    dss.load_kpi_data("客戶滿意度", sample_data['daily_satisfaction'])

    # 註冊預測模型
    dss.register_model(
        "churn_prediction",
        analytics.models.get("churn_prediction_churned"),
        "Random Forest Classifier"
    )

    # 執行情境模擬
    scenario_results = dss.run_scenario(
        scenario_name="營收成長情境",
        base_assumptions={
            'revenue': 100000,
            'cost': 75000,
            'growth_rate': 0.10
        },
        variations=[
            {'growth_rate': 0.05},   # 保守情境
            {'growth_rate': 0.15},   # 樂觀情境
            {'cost': 85000},         # 成本上升情境
            {'revenue': 120000}      # 營收提升情境
        ]
    )

    print("\n情境模擬結果:")
    print(f"  基礎情境利潤率: {scenario_results['base_case']['margin']}%")
    for var in scenario_results['variations']:
        print(f"  變化情境 {var['variation_id']}: "
              f"利潤率 {var['margin']}%")

    # 產生決策建議
    recommendations = dss.generate_recommendations()
    print(f"\n產生 {len(recommendations)} 項決策建議")

    # 輸出決策支援報告
    report = dss.export_report()
    print("\n" + report)

    return {
        'kpi_manager': kpi_manager,
        'analytics': analytics,
        'roi_analyzer': roi_analyzer,
        'decision_support': dss
    }

# 執行示範
if __name__ == "__main__":
    results = run_ecommerce_decision_support()

這個完整的案例示範展示了資料科學驅動商業決策的實際應用流程。從資料載入、KPI 監控、預測模型訓練、ROI 分析到決策支援系統的整合,每個環節都有清晰的實作和說明。透過這樣的系統化方法,企業可以建立可靠的資料驅動決策能力,並持續優化商業成果。

結論與未來展望

資料科學驅動商業決策已從概念願景轉變為實務標準。透過本文介紹的框架和工具,企業可以建立系統化的分析能力,將資料轉化為具體的商業價值。KPI 管理確保企業專注於正確的目標,資料管線保障分析的品質和效率,預測模型提供前瞻性的洞察,ROI 分析量化投資價值,而決策支援系統則將所有元素整合為完整的解決方案。

然而,技術工具只是成功的一部分。企業還需要培養資料素養文化,確保決策者能夠正確理解和使用分析結果。同時,資料治理和倫理考量也不容忽視,企業必須確保資料的使用符合法規要求並尊重個人隱私。展望未來,隨著人工智慧技術的持續進步,自動化決策支援將更加普及,即時分析和預測將成為常態。企業若能在現階段建立堅實的資料科學基礎,將在未來的競爭中佔據有利位置。

資料科學驅動商業決策的旅程是持續演進的過程。企業應該從小規模的專案開始,逐步累積經驗和能力,並根據回饋不斷改善。透過本文提供的框架和程式碼範例,讀者可以立即開始實作自己的決策支援系統,將資料的潛力轉化為商業成功的動力。