房地產市場是複雜的經濟系統,受到地理位置、經濟發展、人口結構、政策法規等多重因素交互影響。傳統的房地產分析依賴專家經驗與簡單的統計方法,難以捕捉市場的非線性動態與區域性差異。資料科學與機器學習技術的成熟,為房地產市場分析帶來革命性的轉變。透過大規模資料收集、系統化的特徵工程與先進的預測模型,分析師能夠更精確地量化影響房價的關鍵因素,預測未來市場走勢。

房價預測是房地產分析的核心問題。影響房價的因素眾多且複雜,包含可量化的結構特徵如房屋面積、房間數量、浴室數量,以及難以直接量化的區位因素如學區品質、交通便利性、社區安全性。這些因素之間存在複雜的交互作用,例如在優質學區內,較小的房屋面積可能仍然擁有較高的單位面積價格。傳統的線性模型難以捕捉這些非線性關係,而機器學習演算法能夠自動發現資料中的模式。

區域發展模式是另一個重要的分析維度。城市、郊區與農村地區在房地產市場中展現截然不同的特徵。城市地區通常具有較高的房價與人口密度,但提供更好的就業機會與公共服務。郊區地區在房價與生活品質之間取得平衡,吸引追求居住環境的家庭。農村地區房價較低但可能面臨基礎建設不足的挑戰。理解這些區域差異對於制定差異化的投資策略至關重要。

本文將建立完整的房地產資料科學分析框架。從資料收集與前置處理開始,透過探索性分析揭示資料特徵與分佈。接著進行特徵工程,建立能夠有效預測房價的特徵集合。運用多種機器學習演算法包含多元線性迴歸、隨機森林與梯度提升模型,建立並比較不同的預測模型。最後分析區域發展模式,提供基於資料的投資建議。整個流程遵循嚴謹的資料科學方法論,確保結論的可靠性與實用性。

房地產資料收集與探索性分析

房地產資料分析的第一步是系統化的資料收集與品質評估。高品質的資料是準確分析的基礎,而資料品質問題如缺失值、異常值、資料不一致等,會直接影響後續分析的可信度。探索性資料分析透過統計方法與視覺化技術,幫助分析師理解資料的基本特徵、分佈型態與潛在問題。

房地產資料通常包含結構化的數值特徵與類別特徵。數值特徵如房屋面積、房間數量、浴室數量、建築年份等,可以直接用於數學運算與統計分析。類別特徵如地區類型、房屋型態、建築材料等,需要透過編碼技術轉換為機器學習演算法可處理的格式。此外,地理資訊如經緯度座標、郵遞區號等,可以透過地理資訊系統技術轉換為有用的空間特徵。

資料分佈分析是探索性分析的核心。透過直方圖、箱型圖與密度圖,分析師能夠理解每個特徵的分佈型態。房價通常呈現右偏分佈,少數高價豪宅拉高平均值但不代表市場主流。在這種情況下,中位數比平均值更能反映典型房價水準。房屋面積、房間數量等特徵的分佈也提供重要洞察,例如市場是否以小坪數或大坪數為主。

相關性分析揭示特徵之間的線性關係。房價與房屋面積通常呈現正相關,但相關強度在不同區域可能有顯著差異。房間數量與浴室數量往往高度相關,這種共線性在建立預測模型時需要特別處理。地區類型作為類別變數,可以透過分組統計觀察其與房價的關係。這些初步的相關性分析為特徵選擇與模型設計提供指引。

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')

class RealEstateDataAnalyzer:
    """
    房地產資料分析系統
    提供資料載入、清理、探索性分析與視覺化功能
    """
    
    def __init__(self):
        """
        初始化房地產資料分析器
        """
        self.df = None
        self.numeric_features = []
        self.categorical_features = []
        self.analysis_results = {}
        
        # 設定視覺化樣式
        plt.style.use('seaborn-v0_8-darkgrid')
        sns.set_palette("husl")
    
    def load_data(self, data_dict: Dict[str, List]) -> pd.DataFrame:
        """
        載入房地產資料
        
        參數:
            data_dict: 包含各欄位資料的字典
            
        回傳:
            處理後的 DataFrame
        """
        self.df = pd.DataFrame(data_dict)
        
        # 識別數值型與類別型特徵
        self.numeric_features = self.df.select_dtypes(
            include=[np.number]
        ).columns.tolist()
        
        self.categorical_features = self.df.select_dtypes(
            include=['object', 'category']
        ).columns.tolist()
        
        print(f"成功載入 {len(self.df)} 筆房地產資料")
        print(f"數值型特徵: {self.numeric_features}")
        print(f"類別型特徵: {self.categorical_features}")
        
        return self.df
    
    def data_quality_check(self) -> Dict:
        """
        執行資料品質檢查
        
        回傳:
            品質檢查結果
        """
        quality_report = {}
        
        print("\n資料品質檢查")
        print("=" * 60)
        
        # 檢查缺失值
        missing_counts = self.df.isnull().sum()
        missing_percentages = (missing_counts / len(self.df)) * 100
        
        print("\n缺失值統計:")
        for col in self.df.columns:
            if missing_counts[col] > 0:
                print(f"  {col}: {missing_counts[col]} ({missing_percentages[col]:.2f}%)")
        
        quality_report['missing_values'] = missing_counts.to_dict()
        
        # 檢查重複記錄
        duplicates = self.df.duplicated().sum()
        print(f"\n重複記錄: {duplicates} 筆")
        quality_report['duplicates'] = duplicates
        
        # 檢查數值範圍
        print("\n數值特徵範圍檢查:")
        for col in self.numeric_features:
            min_val = self.df[col].min()
            max_val = self.df[col].max()
            print(f"  {col}: [{min_val:.2f}, {max_val:.2f}]")
            
            # 檢查是否有不合理的負值
            if min_val < 0 and col in ['房屋面積', '房間數', '浴室數', '售價']:
                print(f"    警告: {col} 包含負值")
        
        return quality_report
    
    def descriptive_statistics(self) -> pd.DataFrame:
        """
        計算描述統計量
        
        回傳:
            包含統計資訊的 DataFrame
        """
        print("\n描述統計分析")
        print("=" * 60)
        
        # 基本統計量
        stats_df = self.df[self.numeric_features].describe()
        
        # 加入額外的統計指標
        stats_df.loc['中位數'] = self.df[self.numeric_features].median()
        stats_df.loc['變異數'] = self.df[self.numeric_features].var()
        stats_df.loc['偏度'] = self.df[self.numeric_features].skew()
        stats_df.loc['峰度'] = self.df[self.numeric_features].kurtosis()
        
        print(stats_df)
        
        self.analysis_results['descriptive_stats'] = stats_df
        return stats_df
    
    def distribution_analysis(self, feature: str):
        """
        分析特徵的分佈型態
        
        參數:
            feature: 要分析的特徵名稱
        """
        if feature not in self.numeric_features:
            print(f"錯誤: {feature} 不是數值型特徵")
            return
        
        data = self.df[feature].dropna()
        
        print(f"\n{feature} 分佈分析")
        print("-" * 60)
        
        # 計算分佈統計量
        mean_val = data.mean()
        median_val = data.median()
        std_val = data.std()
        skewness = data.skew()
        kurtosis_val = data.kurtosis()
        
        print(f"平均值: {mean_val:.2f}")
        print(f"中位數: {median_val:.2f}")
        print(f"標準差: {std_val:.2f}")
        print(f"偏度: {skewness:.2f} ({'右偏' if skewness > 0 else '左偏' if skewness < 0 else '對稱'})")
        print(f"峰度: {kurtosis_val:.2f}")
        
        # 常態分佈檢驗
        if len(data) >= 3:
            statistic, p_value = stats.shapiro(data)
            print(f"\nShapiro-Wilk 常態檢驗:")
            print(f"  統計量: {statistic:.4f}")
            print(f"  p 值: {p_value:.4f}")
            print(f"  結論: {'不符合' if p_value < 0.05 else '符合'}常態分佈")
        
        # 視覺化分佈
        fig, axes = plt.subplots(1, 3, figsize=(15, 4))
        
        # 直方圖
        axes[0].hist(data, bins=30, edgecolor='black', alpha=0.7)
        axes[0].axvline(mean_val, color='red', linestyle='--', label=f'平均值: {mean_val:.2f}')
        axes[0].axvline(median_val, color='green', linestyle='--', label=f'中位數: {median_val:.2f}')
        axes[0].set_xlabel(feature)
        axes[0].set_ylabel('頻率')
        axes[0].set_title(f'{feature} 分佈直方圖')
        axes[0].legend()
        
        # 箱型圖
        axes[1].boxplot(data, vert=True)
        axes[1].set_ylabel(feature)
        axes[1].set_title(f'{feature} 箱型圖')
        axes[1].grid(True, alpha=0.3)
        
        # Q-Q 圖
        stats.probplot(data, dist="norm", plot=axes[2])
        axes[2].set_title(f'{feature} Q-Q 圖')
        
        plt.tight_layout()
        plt.savefig(f'/mnt/user-data/outputs/{feature}_distribution.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"\n分佈圖已儲存至 {feature}_distribution.png")
    
    def correlation_analysis(self) -> pd.DataFrame:
        """
        計算特徵間的相關係數
        
        回傳:
            相關係數矩陣
        """
        print("\n相關性分析")
        print("=" * 60)
        
        # 計算 Pearson 相關係數
        corr_matrix = self.df[self.numeric_features].corr()
        
        print("\n相關係數矩陣:")
        print(corr_matrix)
        
        # 找出高度相關的特徵對
        print("\n高度相關的特徵對 (|r| > 0.7):")
        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                if abs(corr_matrix.iloc[i, j]) > 0.7:
                    print(f"  {corr_matrix.columns[i]} vs {corr_matrix.columns[j]}: {corr_matrix.iloc[i, j]:.3f}")
        
        # 視覺化相關係數矩陣
        plt.figure(figsize=(10, 8))
        sns.heatmap(
            corr_matrix, 
            annot=True, 
            fmt='.2f', 
            cmap='coolwarm',
            center=0,
            square=True,
            linewidths=1
        )
        plt.title('特徵相關係數熱力圖', fontsize=14, pad=20)
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/correlation_heatmap.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("\n相關係數熱力圖已儲存至 correlation_heatmap.png")
        
        self.analysis_results['correlation_matrix'] = corr_matrix
        return corr_matrix
    
    def categorical_analysis(self, feature: str, target: str = '售價'):
        """
        分析類別特徵與目標變數的關係
        
        參數:
            feature: 類別特徵名稱
            target: 目標變數名稱
        """
        if feature not in self.categorical_features:
            print(f"錯誤: {feature} 不是類別型特徵")
            return
        
        print(f"\n{feature}{target} 的關係分析")
        print("-" * 60)
        
        # 計算各類別的統計量
        grouped = self.df.groupby(feature)[target].agg([
            'count', 'mean', 'median', 'std', 'min', 'max'
        ])
        
        print("\n各類別統計:")
        print(grouped)
        
        # 執行 ANOVA 檢驗(檢驗不同類別間是否有顯著差異)
        categories = self.df[feature].unique()
        groups = [self.df[self.df[feature] == cat][target].dropna() for cat in categories]
        
        if len(groups) >= 2:
            f_stat, p_value = stats.f_oneway(*groups)
            print(f"\nANOVA 檢驗結果:")
            print(f"  F 統計量: {f_stat:.4f}")
            print(f"  p 值: {p_value:.4f}")
            print(f"  結論: {'存在' if p_value < 0.05 else '不存在'}顯著差異")
        
        # 視覺化
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 箱型圖
        self.df.boxplot(column=target, by=feature, ax=axes[0])
        axes[0].set_xlabel(feature)
        axes[0].set_ylabel(target)
        axes[0].set_title(f'{feature} 各類別的 {target} 分佈')
        plt.sca(axes[0])
        plt.xticks(rotation=45)
        
        # 長條圖
        grouped['mean'].plot(kind='bar', ax=axes[1], color='skyblue', edgecolor='black')
        axes[1].set_xlabel(feature)
        axes[1].set_ylabel(f'{target} 平均值')
        axes[1].set_title(f'{feature} 各類別的平均 {target}')
        axes[1].grid(True, alpha=0.3)
        plt.sca(axes[1])
        plt.xticks(rotation=45)
        
        plt.tight_layout()
        plt.savefig(f'/mnt/user-data/outputs/{feature}_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"\n分析圖表已儲存至 {feature}_analysis.png")
    
    def outlier_detection(self, feature: str, method: str = 'iqr') -> List[int]:
        """
        偵測異常值
        
        參數:
            feature: 要檢查的特徵
            method: 偵測方法 ('iqr' 或 'zscore')
            
        回傳:
            異常值的索引列表
        """
        if feature not in self.numeric_features:
            print(f"錯誤: {feature} 不是數值型特徵")
            return []
        
        data = self.df[feature].dropna()
        outliers = []
        
        if method == 'iqr':
            # 使用四分位距法
            Q1 = data.quantile(0.25)
            Q3 = data.quantile(0.75)
            IQR = Q3 - Q1
            
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            outliers = self.df[
                (self.df[feature] < lower_bound) | 
                (self.df[feature] > upper_bound)
            ].index.tolist()
            
        elif method == 'zscore':
            # 使用 Z-score 法
            z_scores = np.abs(stats.zscore(data))
            outliers = self.df[z_scores > 3].index.tolist()
        
        print(f"\n{feature} 異常值偵測 ({method} 方法)")
        print("-" * 60)
        print(f"發現 {len(outliers)} 個異常值")
        
        if outliers:
            print(f"異常值索引: {outliers[:10]}{'...' if len(outliers) > 10 else ''}")
        
        return outliers
    
    def generate_summary_report(self) -> str:
        """
        產生綜合分析報告
        
        回傳:
            格式化的報告內容
        """
        report = "房地產市場資料分析報告\n"
        report += "=" * 60 + "\n\n"
        
        report += f"資料概況\n"
        report += "-" * 60 + "\n"
        report += f"總記錄數: {len(self.df)}\n"
        report += f"特徵數量: {len(self.df.columns)}\n"
        report += f"數值型特徵: {len(self.numeric_features)}\n"
        report += f"類別型特徵: {len(self.categorical_features)}\n\n"
        
        # 房價統計
        if '售價' in self.df.columns:
            report += "房價統計\n"
            report += "-" * 60 + "\n"
            report += f"平均房價: ${self.df['售價'].mean():,.0f}\n"
            report += f"中位數房價: ${self.df['售價'].median():,.0f}\n"
            report += f"最低房價: ${self.df['售價'].min():,.0f}\n"
            report += f"最高房價: ${self.df['售價'].max():,.0f}\n"
            report += f"標準差: ${self.df['售價'].std():,.0f}\n\n"
        
        # 區域分析
        if '地區' in self.df.columns:
            report += "區域分佈\n"
            report += "-" * 60 + "\n"
            region_counts = self.df['地區'].value_counts()
            for region, count in region_counts.items():
                percentage = (count / len(self.df)) * 100
                report += f"{region}: {count} 筆 ({percentage:.1f}%)\n"
            report += "\n"
        
        report += "分析建議\n"
        report += "-" * 60 + "\n"
        report += "1. 建議進行特徵工程,建立衍生特徵如單位面積價格\n"
        report += "2. 考慮地理位置特徵的重要性,可加入經緯度或郵遞區號\n"
        report += "3. 對右偏分佈的特徵(如房價)考慮對數轉換\n"
        report += "4. 處理高度相關的特徵,避免多重共線性問題\n"
        report += "5. 建立預測模型前,建議進行特徵標準化\n"
        
        return report

# 建立範例資料集
sample_data = {
    '專案編號': list(range(8, 28)),
    '地區': ['郊區', '農村', '郊區', '城市', '郊區', '農村', '城市', '郊區', 
            '農村', '農村', '郊區', '郊區', '郊區', '城市', '郊區', '農村',
            '郊區', '農村', '農村', '城市'],
    '房屋面積': [676, 961, 3034, 2267, 709, 1978, 667, 2782, 2615, 3840,
                1677, 3403, 2028, 575, 2306, 1070, 1678, 3446, 2464, 1455],
    '房間數': [4, 1, 4, 2, 2, 3, 3, 1, 3, 2, 4, 1, 1, 1, 3, 4, 3, 4, 4, 4],
    '浴室數': [2, 2, 2, 2, 3, 3, 3, 2, 1, 1, 3, 1, 1, 1, 1, 2, 3, 1, 1, 3],
    '售價': [240440, 259472, 182427, 404842, 689639, 333371, 241945, 570434,
            623581, 767426, 114781, 243802, 693658, 253227, 413448, 590058,
            531414, 470035, 670566, 238296]
}

# 使用範例
if __name__ == "__main__":
    # 建立分析器
    analyzer = RealEstateDataAnalyzer()
    
    # 載入資料
    df = analyzer.load_data(sample_data)
    
    # 資料品質檢查
    print("\n" + "="*60)
    analyzer.data_quality_check()
    
    # 描述統計
    print("\n" + "="*60)
    analyzer.descriptive_statistics()
    
    # 相關性分析
    print("\n" + "="*60)
    analyzer.correlation_analysis()
    
    # 分佈分析
    print("\n" + "="*60)
    analyzer.distribution_analysis('售價')
    analyzer.distribution_analysis('房屋面積')
    
    # 類別分析
    print("\n" + "="*60)
    analyzer.categorical_analysis('地區', '售價')
    
    # 異常值偵測
    print("\n" + "="*60)
    analyzer.outlier_detection('售價', method='iqr')
    
    # 產生報告
    print("\n" + analyzer.generate_summary_report())

這個房地產資料分析系統提供全面的探索性分析功能。系統首先載入資料並自動識別數值型與類別型特徵,這種自動化的型別辨識簡化後續分析流程。資料品質檢查涵蓋缺失值、重複記錄與數值範圍驗證,確保資料的完整性與合理性。描述統計分析計算多種統計量,包含中位數、變異數、偏度與峰度,全面描繪資料分佈特徵。

分佈分析功能深入探討個別特徵的統計特性。透過 Shapiro-Wilk 檢驗評估常態性,這對於選擇適當的統計方法至關重要。視覺化輸出包含直方圖、箱型圖與 Q-Q 圖,從不同角度呈現資料分佈。相關性分析計算 Pearson 相關係數並產生熱力圖,快速識別高度相關的特徵對。類別分析透過 ANOVA 檢驗評估不同區域的房價差異,視覺化呈現各類別的分佈與平均值。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:原始房地產資料收集;

partition "資料品質檢查" {
    :缺失值檢測與處理;
    :重複記錄識別;
    :數值範圍驗證;
    :資料型別確認;
}

partition "描述統計分析" {
    :計算集中趨勢\n平均值與中位數;
    :計算離散程度\n標準差與變異數;
    :計算分佈形狀\n偏度與峰度;
}

partition "探索性視覺化" {
    :分佈圖\n直方圖與密度圖;
    :箱型圖\n異常值識別;
    :散佈圖\n特徵關係;
}

partition "統計檢驗" {
    :常態分佈檢驗\nShapiro-Wilk 檢驗;
    :相關性分析\nPearson 係數;
    :組間差異檢驗\nANOVA 檢驗;
}

:產生分析報告\n洞察與建議;

stop

@enduml

這個流程圖描述房地產資料探索性分析的完整流程。從原始資料收集開始,系統執行多層次的品質檢查,包含缺失值處理、重複記錄識別與數值驗證。描述統計階段計算各種統計量,從集中趨勢到分佈形狀的完整特徵。視覺化階段產生多種圖表,幫助分析師直觀理解資料。統計檢驗階段執行假設檢驗,量化特徵關係與組間差異。最後整合所有分析結果,產生包含洞察與建議的綜合報告。

機器學習驅動的房價預測模型

房價預測是典型的迴歸問題,目標是根據房屋特徵預測其市場價值。機器學習演算法能夠自動學習特徵與房價之間的複雜關係,相較於傳統的評估方法,提供更準確且一致的預測。模型建立過程包含特徵工程、演算法選擇、訓練最佳化與效能評估等環節,每個環節都需要謹慎設計以確保模型的預測能力。

特徵工程是提升模型效能的關鍵。原始特徵如房屋面積、房間數量可能不足以捕捉房價的所有變異。透過建立衍生特徵,如單位面積價格、房間與浴室比例、地區房價中位數等,能夠為模型提供更豐富的資訊。類別特徵如地區類型需要透過編碼技術轉換,常見的方法包含獨熱編碼與標籤編碼。特徵標準化確保不同量級的特徵對模型的影響相當,避免大數值特徵主導模型訓練。

演算法選擇需要考慮資料特性與問題需求。多元線性迴歸假設特徵與目標之間存在線性關係,模型簡單且易於解釋,但可能無法捕捉非線性模式。隨機森林透過集成多個決策樹,能夠處理非線性關係與特徵交互作用,同時提供特徵重要性評估。梯度提升模型採用序列式訓練策略,逐步修正前一個模型的錯誤,通常能達到最佳的預測準確度。比較多種演算法的效能,選擇最適合特定資料集的模型。

模型評估需要多個指標綜合考量。均方根誤差量化預測值與實際值的平均差距,絕對誤差百分比反映相對誤差大小,決定係數評估模型解釋變異的比例。交叉驗證技術透過多次訓練與測試分割,提供更穩健的效能估計,避免過度擬合。殘差分析檢查預測誤差的分佈,理想的模型應該產生隨機分佈的殘差,系統性的誤差模式指示模型存在偏差。

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')

class RealEstatePricePredictor:
    """
    房地產價格預測系統
    提供特徵工程、模型訓練、預測與評估功能
    """
    
    def __init__(self):
        """
        初始化預測系統
        """
        self.models = {}
        self.preprocessor = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.feature_names = []
        self.predictions = {}
        self.evaluation_results = {}
    
    def prepare_features(
        self, 
        df: pd.DataFrame, 
        target_col: str = '售價'
    ) -> Tuple[pd.DataFrame, pd.Series]:
        """
        準備特徵與目標變數
        
        參數:
            df: 原始資料
            target_col: 目標變數欄位名稱
            
        回傳:
            特徵矩陣與目標向量
        """
        # 建立衍生特徵
        df_features = df.copy()
        
        # 單位面積價格(每平方英尺價格)
        df_features['單位面積價格'] = df_features[target_col] / df_features['房屋面積']
        
        # 房間浴室比
        df_features['房浴比'] = df_features['房間數'] / (df_features['浴室數'] + 1)
        
        # 總房間數(房間 + 浴室)
        df_features['總房間數'] = df_features['房間數'] + df_features['浴室數']
        
        # 地區平均房價(分組統計特徵)
        region_avg_price = df_features.groupby('地區')[target_col].transform('mean')
        df_features['地區平均房價'] = region_avg_price
        
        # 房價與地區平均的差異
        df_features['相對地區價格'] = df_features[target_col] - df_features['地區平均房價']
        
        print("建立的衍生特徵:")
        print(f"  - 單位面積價格")
        print(f"  - 房浴比")
        print(f"  - 總房間數")
        print(f"  - 地區平均房價")
        print(f"  - 相對地區價格")
        
        # 選擇特徵欄位
        feature_cols = [
            '房屋面積', '房間數', '浴室數', '地區',
            '房浴比', '總房間數', '地區平均房價'
        ]
        
        X = df_features[feature_cols]
        y = df_features[target_col]
        
        self.feature_names = feature_cols
        
        return X, y
    
    def build_preprocessor(self, X: pd.DataFrame):
        """
        建立資料前處理管線
        
        參數:
            X: 特徵矩陣
        """
        # 識別數值與類別特徵
        numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
        categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
        
        print(f"\n數值特徵: {numeric_features}")
        print(f"類別特徵: {categorical_features}")
        
        # 建立轉換器
        # 數值特徵使用標準化
        numeric_transformer = StandardScaler()
        
        # 類別特徵使用獨熱編碼
        categorical_transformer = OneHotEncoder(drop='first', sparse_output=False)
        
        # 組合轉換器
        self.preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features)
            ]
        )
        
        print("資料前處理管線建立完成")
    
    def split_data(
        self, 
        X: pd.DataFrame, 
        y: pd.Series, 
        test_size: float = 0.2,
        random_state: int = 42
    ):
        """
        分割訓練集與測試集
        
        參數:
            X: 特徵矩陣
            y: 目標向量
            test_size: 測試集比例
            random_state: 隨機種子
        """
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        print(f"\n資料分割完成:")
        print(f"  訓練集: {len(self.X_train)} 筆")
        print(f"  測試集: {len(self.X_test)} 筆")
    
    def train_linear_regression(self):
        """
        訓練線性迴歸模型
        """
        print("\n訓練線性迴歸模型...")
        
        # 建立管線
        pipeline = Pipeline([
            ('preprocessor', self.preprocessor),
            ('regressor', LinearRegression())
        ])
        
        # 訓練模型
        pipeline.fit(self.X_train, self.y_train)
        
        self.models['Linear Regression'] = pipeline
        
        # 計算訓練集 R²
        train_score = pipeline.score(self.X_train, self.y_train)
        print(f"  訓練集 R²: {train_score:.4f}")
    
    def train_ridge_regression(self, alpha: float = 1.0):
        """
        訓練 Ridge 迴歸模型(L2 正則化)
        
        參數:
            alpha: 正則化強度
        """
        print("\n訓練 Ridge 迴歸模型...")
        
        pipeline = Pipeline([
            ('preprocessor', self.preprocessor),
            ('regressor', Ridge(alpha=alpha))
        ])
        
        pipeline.fit(self.X_train, self.y_train)
        
        self.models['Ridge Regression'] = pipeline
        
        train_score = pipeline.score(self.X_train, self.y_train)
        print(f"  訓練集 R² (alpha={alpha}): {train_score:.4f}")
    
    def train_random_forest(
        self, 
        n_estimators: int = 100,
        max_depth: int = None,
        random_state: int = 42
    ):
        """
        訓練隨機森林模型
        
        參數:
            n_estimators: 決策樹數量
            max_depth: 樹的最大深度
            random_state: 隨機種子
        """
        print("\n訓練隨機森林模型...")
        
        pipeline = Pipeline([
            ('preprocessor', self.preprocessor),
            ('regressor', RandomForestRegressor(
                n_estimators=n_estimators,
                max_depth=max_depth,
                random_state=random_state,
                n_jobs=-1
            ))
        ])
        
        pipeline.fit(self.X_train, self.y_train)
        
        self.models['Random Forest'] = pipeline
        
        train_score = pipeline.score(self.X_train, self.y_train)
        print(f"  訓練集 R² (n_trees={n_estimators}): {train_score:.4f}")
    
    def train_gradient_boosting(
        self,
        n_estimators: int = 100,
        learning_rate: float = 0.1,
        max_depth: int = 3,
        random_state: int = 42
    ):
        """
        訓練梯度提升模型
        
        參數:
            n_estimators: 提升輪數
            learning_rate: 學習率
            max_depth: 樹的最大深度
            random_state: 隨機種子
        """
        print("\n訓練梯度提升模型...")
        
        pipeline = Pipeline([
            ('preprocessor', self.preprocessor),
            ('regressor', GradientBoostingRegressor(
                n_estimators=n_estimators,
                learning_rate=learning_rate,
                max_depth=max_depth,
                random_state=random_state
            ))
        ])
        
        pipeline.fit(self.X_train, self.y_train)
        
        self.models['Gradient Boosting'] = pipeline
        
        train_score = pipeline.score(self.X_train, self.y_train)
        print(f"  訓練集 R² (lr={learning_rate}): {train_score:.4f}")
    
    def evaluate_model(self, model_name: str) -> Dict:
        """
        評估模型效能
        
        參數:
            model_name: 模型名稱
            
        回傳:
            評估指標字典
        """
        if model_name not in self.models:
            print(f"錯誤: 模型 {model_name} 不存在")
            return {}
        
        model = self.models[model_name]
        
        # 預測
        y_train_pred = model.predict(self.X_train)
        y_test_pred = model.predict(self.X_test)
        
        # 儲存預測結果
        self.predictions[model_name] = {
            'train': y_train_pred,
            'test': y_test_pred
        }
        
        # 計算評估指標
        metrics = {
            'train_rmse': np.sqrt(mean_squared_error(self.y_train, y_train_pred)),
            'test_rmse': np.sqrt(mean_squared_error(self.y_test, y_test_pred)),
            'train_mae': mean_absolute_error(self.y_train, y_train_pred),
            'test_mae': mean_absolute_error(self.y_test, y_test_pred),
            'train_r2': r2_score(self.y_train, y_train_pred),
            'test_r2': r2_score(self.y_test, y_test_pred),
            'train_mape': np.mean(np.abs((self.y_train - y_train_pred) / self.y_train)) * 100,
            'test_mape': np.mean(np.abs((self.y_test - y_test_pred) / self.y_test)) * 100
        }
        
        self.evaluation_results[model_name] = metrics
        
        print(f"\n{model_name} 評估結果:")
        print("-" * 60)
        print(f"訓練集:")
        print(f"  RMSE: ${metrics['train_rmse']:,.2f}")
        print(f"  MAE: ${metrics['train_mae']:,.2f}")
        print(f"  R²: {metrics['train_r2']:.4f}")
        print(f"  MAPE: {metrics['train_mape']:.2f}%")
        print(f"\n測試集:")
        print(f"  RMSE: ${metrics['test_rmse']:,.2f}")
        print(f"  MAE: ${metrics['test_mae']:,.2f}")
        print(f"  R²: {metrics['test_r2']:.4f}")
        print(f"  MAPE: {metrics['test_mape']:.2f}%")
        
        return metrics
    
    def cross_validate_model(
        self, 
        model_name: str, 
        cv: int = 5
    ) -> Dict:
        """
        執行交叉驗證
        
        參數:
            model_name: 模型名稱
            cv: 折數
            
        回傳:
            交叉驗證結果
        """
        if model_name not in self.models:
            print(f"錯誤: 模型 {model_name} 不存在")
            return {}
        
        model = self.models[model_name]
        
        print(f"\n執行 {cv}-fold 交叉驗證: {model_name}")
        
        # 執行交叉驗證
        cv_scores = cross_val_score(
            model, 
            self.X_train, 
            self.y_train,
            cv=cv,
            scoring='r2'
        )
        
        cv_results = {
            'scores': cv_scores,
            'mean': cv_scores.mean(),
            'std': cv_scores.std()
        }
        
        print(f"  R² 分數: {cv_scores}")
        print(f"  平均 R²: {cv_results['mean']:.4f} (+/- {cv_results['std']:.4f})")
        
        return cv_results
    
    def plot_predictions(self, model_name: str):
        """
        繪製預測值與實際值的比較圖
        
        參數:
            model_name: 模型名稱
        """
        if model_name not in self.predictions:
            print(f"錯誤: 模型 {model_name} 尚未執行預測")
            return
        
        y_test_pred = self.predictions[model_name]['test']
        
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 散佈圖
        axes[0].scatter(self.y_test, y_test_pred, alpha=0.5)
        axes[0].plot(
            [self.y_test.min(), self.y_test.max()],
            [self.y_test.min(), self.y_test.max()],
            'r--', lw=2
        )
        axes[0].set_xlabel('實際房價')
        axes[0].set_ylabel('預測房價')
        axes[0].set_title(f'{model_name} - 預測 vs 實際')
        axes[0].grid(True, alpha=0.3)
        
        # 殘差圖
        residuals = self.y_test - y_test_pred
        axes[1].scatter(y_test_pred, residuals, alpha=0.5)
        axes[1].axhline(y=0, color='r', linestyle='--', lw=2)
        axes[1].set_xlabel('預測房價')
        axes[1].set_ylabel('殘差')
        axes[1].set_title(f'{model_name} - 殘差分析')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(f'/mnt/user-data/outputs/{model_name}_predictions.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"\n預測圖表已儲存至 {model_name}_predictions.png")
    
    def feature_importance_analysis(self, model_name: str):
        """
        分析特徵重要性(僅適用於樹狀模型)
        
        參數:
            model_name: 模型名稱
        """
        if model_name not in self.models:
            print(f"錯誤: 模型 {model_name} 不存在")
            return
        
        model = self.models[model_name]
        
        # 檢查模型是否支援特徵重要性
        if not hasattr(model.named_steps['regressor'], 'feature_importances_'):
            print(f"{model_name} 不支援特徵重要性分析")
            return
        
        # 取得特徵重要性
        importances = model.named_steps['regressor'].feature_importances_
        
        # 取得特徵名稱(需要考慮獨熱編碼後的特徵)
        feature_names = self.preprocessor.get_feature_names_out()
        
        # 排序
        indices = np.argsort(importances)[::-1]
        
        print(f"\n{model_name} 特徵重要性:")
        print("-" * 60)
        for i, idx in enumerate(indices[:10], 1):
            print(f"{i}. {feature_names[idx]}: {importances[idx]:.4f}")
        
        # 視覺化
        plt.figure(figsize=(10, 6))
        plt.bar(range(len(importances)), importances[indices])
        plt.xticks(range(len(importances)), [feature_names[i] for i in indices], rotation=45, ha='right')
        plt.xlabel('特徵')
        plt.ylabel('重要性')
        plt.title(f'{model_name} 特徵重要性分析')
        plt.tight_layout()
        plt.savefig(f'/mnt/user-data/outputs/{model_name}_feature_importance.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"\n特徵重要性圖表已儲存至 {model_name}_feature_importance.png")
    
    def compare_models(self):
        """
        比較所有模型的效能
        """
        if not self.evaluation_results:
            print("錯誤: 尚未評估任何模型")
            return
        
        print("\n模型效能比較")
        print("=" * 60)
        
        # 建立比較表格
        comparison_df = pd.DataFrame(self.evaluation_results).T
        print(comparison_df[['test_rmse', 'test_mae', 'test_r2', 'test_mape']])
        
        # 視覺化比較
        metrics_to_plot = ['test_rmse', 'test_mae', 'test_r2', 'test_mape']
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        axes = axes.ravel()
        
        for i, metric in enumerate(metrics_to_plot):
            comparison_df[metric].plot(kind='bar', ax=axes[i])
            axes[i].set_title(metric.replace('_', ' ').title())
            axes[i].set_xlabel('模型')
            axes[i].set_ylabel('數值')
            axes[i].grid(True, alpha=0.3)
            plt.sca(axes[i])
            plt.xticks(rotation=45, ha='right')
        
        plt.tight_layout()
        plt.savefig('/mnt/user-data/outputs/model_comparison.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("\n模型比較圖表已儲存至 model_comparison.png")

# 使用範例
if __name__ == "__main__":
    # 建立預測系統
    predictor = RealEstatePricePredictor()
    
    # 載入資料(使用前面的範例資料)
    df = pd.DataFrame(sample_data)
    
    # 準備特徵
    X, y = predictor.prepare_features(df)
    
    # 建立前處理器
    predictor.build_preprocessor(X)
    
    # 分割資料
    predictor.split_data(X, y)
    
    # 訓練多個模型
    predictor.train_linear_regression()
    predictor.train_ridge_regression(alpha=10.0)
    predictor.train_random_forest(n_estimators=100)
    predictor.train_gradient_boosting(n_estimators=100, learning_rate=0.1)
    
    # 評估所有模型
    for model_name in predictor.models.keys():
        predictor.evaluate_model(model_name)
        predictor.cross_validate_model(model_name)
        predictor.plot_predictions(model_name)
        
        # 特徵重要性分析(僅樹狀模型)
        if 'Forest' in model_name or 'Boosting' in model_name:
            predictor.feature_importance_analysis(model_name)
    
    # 比較所有模型
    predictor.compare_models()

這個房價預測系統實作完整的機器學習工作流程。特徵準備階段建立多個衍生特徵,包含單位面積價格、房浴比與地區統計特徵,豐富模型的輸入資訊。資料前處理管線整合標準化與編碼轉換,確保特徵在合適的尺度上。系統訓練多種演算法,從簡單的線性迴歸到複雜的梯度提升,涵蓋不同的模型複雜度。

模型評估採用多個指標,RMSE 與 MAE 量化絕對誤差,R² 評估解釋變異比例,MAPE 反映相對誤差。交叉驗證提供穩健的效能估計,避免單次分割造成的偶然性。視覺化功能包含預測值散佈圖與殘差分析,幫助診斷模型問題。特徵重要性分析識別對房價影響最大的因素,提供可解釋的商業洞察。模型比較功能並排呈現所有模型的效能,便於選擇最優模型。

@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

start

:原始特徵資料;

partition "特徵工程" {
    :建立衍生特徵\n單位面積價格、房浴比;
    :類別編碼\n獨熱編碼地區;
    :特徵標準化\nStandardScaler;
}

:分割訓練與測試集;

partition "模型訓練" {
    :線性迴歸\n基線模型;
    :Ridge 迴歸\nL2 正則化;
    :隨機森林\n集成學習;
    :梯度提升\n序列學習;
}

partition "模型評估" {
    :計算評估指標\nRMSE、MAE、R²;
    :交叉驗證\n5-fold CV;
    :殘差分析\n誤差分佈;
}

:特徵重要性分析;

:選擇最優模型;

:部署預測服務;

stop

@enduml

這個流程圖描述機器學習房價預測的完整流程。從原始特徵開始,透過特徵工程建立衍生特徵、執行編碼轉換與標準化。資料分割為訓練與測試集後,訓練多種演算法從線性模型到集成方法。模型評估階段計算多個效能指標,執行交叉驗證與殘差分析,全面評估模型品質。特徵重要性分析識別關鍵影響因素。最後根據評估結果選擇最優模型,部署為實際的預測服務。

結語

房地產市場分析正經歷資料科學驅動的轉型。透過系統化的資料收集、嚴謹的探索性分析與先進的機器學習技術,分析師能夠更深入理解市場動態,更準確預測未來趨勢。本文建立的完整分析框架,從資料品質控制到預測模型部署,為房地產專業人士提供實務導向的技術指南。

探索性資料分析揭示房地產市場的複雜性。房價受到地理位置、房屋屬性、區域發展等多重因素影響,這些因素之間存在複雜的交互作用。透過統計方法與視覺化技術,分析師能夠識別資料特徵、發現異常模式、量化關聯強度。這些洞察不僅指引模型設計,更提供直接的商業價值,幫助投資者理解市場機會與風險。

機器學習模型展現強大的預測能力。從線性迴歸到梯度提升,不同演算法適用於不同的資料特性與預測需求。特徵工程是提升效能的關鍵,透過建立有意義的衍生特徵,模型能夠捕捉更多的價格變異。模型評估需要多個指標綜合考量,確保預測的準確性與穩健性。特徵重要性分析提供模型的可解釋性,這對於建立使用者信任至關重要。

未來的房地產分析將更加智慧化與自動化。隨著資料量的累積與計算能力的提升,深度學習技術可能在房價預測中發揮更大作用。地理資訊系統與衛星影像分析能夠納入更豐富的空間特徵。時間序列模型可以捕捉市場的動態演變。自然語言處理技術能夠從房屋描述與評論中擷取情感特徵。這些技術的整合將推動房地產分析進入新的階段,為市場參與者提供更精準的決策支援。