資料科學時代的技術革新與挑戰
在當代數位經濟的浪潮中,資料已經成為企業最寶貴的資產之一。每天都有數以億計的資料從各種管道產生,包括社群媒體平台、電子商務網站、物聯網裝置和企業內部系統。如何有效地收集、處理和分析這些資料,從中萃取有價值的洞察,已經成為企業競爭力的關鍵所在。然而,資料科學的實踐並非僅僅是運行幾個演算法這麼簡單,它需要一套完整的技術棧和系統化的工作流程。
Python 作為資料科學領域的首選程式語言,其地位已經無可撼動。這不僅因為 Python 具有簡潔優雅的語法,更重要的是其擁有豐富且成熟的生態系統。從資料擷取的 Requests 和 BeautifulSoup,到資料處理的 Pandas 和 NumPy,再到機器學習的 Scikit-learn 和深度學習的 TensorFlow,Python 提供了完整的工具鏈來支援資料科學的各個環節。這種一站式的解決方案讓資料科學家能夠專注於問題的本質,而不需要在不同的程式語言和工具之間切換。
然而,掌握這些工具只是資料科學實踐的起點。真正的挑戰在於如何將這些工具有機地組合起來,建構出適合特定業務場景的解決方案。從社群媒體平台擷取即時資料流,從網頁爬取結構化資訊,將非結構化的原始資料轉換為可分析的格式,透過視覺化技術呈現資料的內在模式,再運用統計方法和機器學習演算法進行深度分析,每一個環節都需要紮實的技術功底和豐富的實務經驗。
本文將帶領讀者深入探討 Python 資料科學的完整實踐流程。我們會從最基礎的資料擷取開始,詳細介紹如何使用 Twitter API 取得社群媒體資料,以及如何透過 BeautifulSoup 進行網頁爬蟲。接著會探討資料的儲存與管理,展示如何使用 MongoDB 這類 NoSQL 資料庫來處理非結構化資料。在資料處理環節,我們會深入 Pandas 的各種操作技巧,包括資料清理、轉換和聚合。視覺化部分則會介紹 Matplotlib 和 Seaborn 的進階應用,展示如何建立具有洞察力的統計圖表。最後,我們會探討蒙特卡羅模擬和主成分分析這些進階技術,展示如何運用統計方法解決實際的商業問題。
Twitter 社群資料擷取的完整實作
社群媒體平台已經成為現代資料科學中極為重要的資料來源。Twitter 作為全球最大的即時資訊分享平台之一,每天產生數億則推文,這些資料包含了豐富的使用者意見、市場趨勢和社會現象。對於企業來說,能夠即時監控品牌聲譽、追蹤競爭對手動態、分析消費者情緒,這些能力都可以透過 Twitter 資料分析來實現。然而,要從 Twitter 平台有效地擷取資料,需要理解其 API 架構和認證機制。
Twitter API 提供了多種存取方式,包括 REST API 和 Streaming API。REST API 適合用來取得歷史資料或進行特定的搜尋查詢,而 Streaming API 則能夠即時接收符合條件的推文流。在實務應用中,我們通常會根據具體需求選擇適當的 API 端點。無論使用哪種方式,都需要先在 Twitter Developer Portal 註冊應用程式,取得必要的認證憑證,包括 Consumer Key、Consumer Secret、Access Token 和 Access Token Secret。
Python 生態系統中有多個函式庫可以簡化 Twitter API 的使用,其中 TwitterSearch 函式庫提供了簡潔的介面來進行推文搜尋。這個函式庫封裝了底層的 HTTP 請求細節,讓開發者可以專注於搜尋邏輯的實作。在使用這個函式庫時,需要注意 Twitter API 的速率限制,避免在短時間內發送過多請求而被暫時封鎖。
# Twitter 資料擷取完整實作
# 此程式展示如何使用 TwitterSearch 函式庫進行推文搜尋與資料儲存
from TwitterSearch import TwitterSearch, TwitterSearchOrder
import json
import sys
import os
from datetime import datetime
def load_credentials():
"""
載入 Twitter API 認證資訊
認證資訊應儲存在 JSON 檔案中,格式如下:
{
"consumer_key": "你的 Consumer Key",
"consumer_secret": "你的 Consumer Secret",
"access_token": "你的 Access Token",
"access_token_secret": "你的 Access Token Secret"
}
Returns:
dict: 包含認證資訊的字典
"""
# 檢查認證檔案是否存在
credentials_file = 'twitter_credentials.json'
if not os.path.exists(credentials_file):
raise FileNotFoundError(
f"找不到認證檔案: {credentials_file}\n"
"請建立此檔案並填入 Twitter API 認證資訊"
)
# 讀取並解析 JSON 認證檔案
with open(credentials_file, 'r', encoding='utf-8') as file:
credentials = json.load(file)
# 驗證必要欄位是否存在
required_fields = [
'consumer_key',
'consumer_secret',
'access_token',
'access_token_secret'
]
for field in required_fields:
if field not in credentials:
raise ValueError(f"認證檔案缺少必要欄位: {field}")
return credentials
def create_unicode_mapper():
"""
建立 Unicode 字元對應表
Twitter 資料可能包含 BMP (Basic Multilingual Plane) 以外的 Unicode 字元
這些字元在某些環境下可能導致編碼問題
此函式建立一個對應表,將非 BMP 字元轉換為替代字元
Returns:
dict: Unicode 字元對應字典
"""
# 建立從 U+10000 到 sys.maxunicode 的字元對應
# 將這些字元對應到 U+FFFD (替代字元)
non_bmp_map = dict.fromkeys(
range(0x10000, sys.maxunicode + 1),
0xfffd
)
return non_bmp_map
def save_to_json(filename, data, pretty=True):
"""
將資料儲存為 JSON 檔案
Args:
filename (str): 目標檔案名稱
data: 要儲存的資料 (必須可序列化為 JSON)
pretty (bool): 是否格式化 JSON 輸出,預設為 True
"""
try:
with open(filename, 'w', encoding='utf-8') as file:
if pretty:
# 格式化輸出,便於人工閱讀
json.dump(
data,
file,
ensure_ascii=False, # 保留中文字元
indent=4, # 縮排 4 個空格
sort_keys=True # 按鍵排序
)
else:
# 緊湊輸出,節省空間
json.dump(data, file, ensure_ascii=False)
print(f"成功儲存 {len(data)} 筆資料至 {filename}")
except IOError as e:
print(f"儲存檔案時發生錯誤: {e}")
raise
def load_from_json(filename):
"""
從 JSON 檔案載入資料
Args:
filename (str): 來源檔案名稱
Returns:
載入的資料結構
"""
try:
with open(filename, 'r', encoding='utf-8') as file:
return json.load(file)
except FileNotFoundError:
print(f"找不到檔案: {filename}")
return None
except json.JSONDecodeError as e:
print(f"JSON 解析錯誤: {e}")
return None
def extract_tweet_data(tweet):
"""
從原始推文物件中提取關鍵資訊
Twitter API 返回的推文物件包含大量欄位
此函式提取我們關注的核心資訊
Args:
tweet (dict): 原始推文物件
Returns:
dict: 精簡的推文資料
"""
# 提取使用者資訊
user_info = tweet.get('user', {})
# 建立精簡的資料結構
extracted_data = {
# 推文識別碼
'tweet_id': tweet.get('id_str', ''),
# 推文內容 (優先使用完整文字)
'text': tweet.get('full_text') or tweet.get('text', ''),
# 使用者資訊
'user': {
'screen_name': user_info.get('screen_name', ''),
'name': user_info.get('name', ''),
'followers_count': user_info.get('followers_count', 0),
'verified': user_info.get('verified', False)
},
# 互動統計
'engagement': {
'retweet_count': tweet.get('retweet_count', 0),
'favorite_count': tweet.get('favorite_count', 0),
'reply_count': tweet.get('reply_count', 0)
},
# 時間戳記
'created_at': tweet.get('created_at', ''),
# 語言
'lang': tweet.get('lang', ''),
# 是否為轉推
'is_retweet': 'retweeted_status' in tweet,
# 主題標籤
'hashtags': [
tag['text']
for tag in tweet.get('entities', {}).get('hashtags', [])
]
}
return extracted_data
def search_tweets(keywords, max_results=100, language='zh'):
"""
搜尋符合關鍵字的推文
Args:
keywords (list): 搜尋關鍵字清單
max_results (int): 最大結果數量,預設 100
language (str): 語言代碼,預設為中文 'zh'
Returns:
list: 推文資料清單
"""
try:
# 載入認證資訊
credentials = load_credentials()
# 建立 TwitterSearch 物件
ts = TwitterSearch(
consumer_key=credentials['consumer_key'],
consumer_secret=credentials['consumer_secret'],
access_token=credentials['access_token'],
access_token_secret=credentials['access_token_secret']
)
# 設定搜尋條件
search_order = TwitterSearchOrder()
search_order.set_keywords(keywords)
search_order.set_language(language)
search_order.set_include_entities(True) # 包含實體資訊
# 收集推文資料
tweets = []
print(f"開始搜尋包含關鍵字 {keywords} 的推文...")
for tweet in ts.search_tweets_iterable(search_order):
# 提取並儲存推文資料
extracted = extract_tweet_data(tweet)
tweets.append(extracted)
# 即時顯示進度
if len(tweets) % 10 == 0:
print(f"已收集 {len(tweets)} 則推文...")
# 達到目標數量後停止
if len(tweets) >= max_results:
break
print(f"搜尋完成,共收集 {len(tweets)} 則推文")
return tweets
except Exception as e:
print(f"搜尋推文時發生錯誤: {e}")
return []
def display_tweets(tweets, limit=10):
"""
格式化顯示推文資料
Args:
tweets (list): 推文資料清單
limit (int): 顯示數量限制,預設 10
"""
# 建立 Unicode 字元對應表
unicode_map = create_unicode_mapper()
print("\n" + "="*80)
print(f"推文資料預覽 (顯示前 {min(limit, len(tweets))} 則)")
print("="*80 + "\n")
for i, tweet in enumerate(tweets[:limit], 1):
# 處理可能的 Unicode 問題
text = tweet['text'].translate(unicode_map)
# 截斷過長的文字
if len(text) > 100:
text = text[:100] + "..."
# 格式化顯示
print(f"推文 #{i}")
print(f"ID: {tweet['tweet_id']}")
print(f"使用者: @{tweet['user']['screen_name']}")
print(f"內容: {text}")
print(f"互動: ♥ {tweet['engagement']['favorite_count']} "
f"↻ {tweet['engagement']['retweet_count']}")
print(f"主題標籤: {', '.join(tweet['hashtags']) if tweet['hashtags'] else '無'}")
print(f"建立時間: {tweet['created_at']}")
print("-" * 80)
def analyze_tweet_stats(tweets):
"""
分析推文統計資訊
Args:
tweets (list): 推文資料清單
Returns:
dict: 統計分析結果
"""
if not tweets:
return {}
# 計算各種統計指標
total_tweets = len(tweets)
total_favorites = sum(t['engagement']['favorite_count'] for t in tweets)
total_retweets = sum(t['engagement']['retweet_count'] for t in tweets)
# 語言分佈
language_dist = {}
for tweet in tweets:
lang = tweet['lang']
language_dist[lang] = language_dist.get(lang, 0) + 1
# 最受歡迎的主題標籤
hashtag_freq = {}
for tweet in tweets:
for tag in tweet['hashtags']:
hashtag_freq[tag] = hashtag_freq.get(tag, 0) + 1
# 排序主題標籤
top_hashtags = sorted(
hashtag_freq.items(),
key=lambda x: x[1],
reverse=True
)[:10]
stats = {
'total_tweets': total_tweets,
'avg_favorites': total_favorites / total_tweets if total_tweets > 0 else 0,
'avg_retweets': total_retweets / total_tweets if total_tweets > 0 else 0,
'language_distribution': language_dist,
'top_hashtags': top_hashtags
}
return stats
def main():
"""
主程式流程
"""
# 設定搜尋參數
keywords = ['機器學習', '人工智慧', 'AI']
max_results = 50
output_file = 'twitter_data.json'
# 執行搜尋
tweets = search_tweets(keywords, max_results)
if tweets:
# 儲存資料
save_to_json(output_file, tweets)
# 顯示推文
display_tweets(tweets, limit=5)
# 分析統計
stats = analyze_tweet_stats(tweets)
print("\n" + "="*80)
print("統計分析結果")
print("="*80)
print(f"總推文數: {stats['total_tweets']}")
print(f"平均按讚數: {stats['avg_favorites']:.2f}")
print(f"平均轉推數: {stats['avg_retweets']:.2f}")
print(f"\n語言分佈:")
for lang, count in stats['language_distribution'].items():
print(f" {lang}: {count} ({count/stats['total_tweets']*100:.1f}%)")
if stats['top_hashtags']:
print(f"\n熱門主題標籤:")
for tag, freq in stats['top_hashtags'][:5]:
print(f" #{tag}: {freq} 次")
else:
print("未能取得推文資料")
if __name__ == '__main__':
main()
這個完整的 Twitter 資料擷取程式展示了資料科學專案中的多個重要概念。首先是錯誤處理的重要性,在每個可能出錯的地方都加入了適當的異常處理機制,確保程式的健壯性。其次是資料的結構化提取,從 Twitter API 返回的複雜 JSON 物件中提取關鍵資訊,建立清晰的資料結構。最後是資料的持久化儲存,將擷取的資料儲存為 JSON 格式,便於後續的處理和分析。
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
title Twitter 資料擷取處理流程架構
actor "資料科學家" as user
participant "Python 程式\n(TwitterSearch)" as python
participant "Twitter API\n(REST Endpoint)" as api
database "本地儲存\n(JSON 檔案)" as storage
participant "資料處理模組\n(分析與清理)" as processor
user -> python : 設定搜尋關鍵字\n指定擷取數量
activate python
python -> python : 載入 API 認證資訊\n驗證必要欄位
python -> api : 發送搜尋請求\nGET /search/tweets
activate api
api -> api : 驗證認證憑證\n檢查速率限制
api --> python : 返回推文資料\n(JSON 格式)
deactivate api
python -> python : 提取關鍵欄位\n建立結構化資料
note right
提取資訊包括:
推文 ID 與內容
使用者資訊
互動統計
時間戳記
主題標籤
end note
python -> python : Unicode 字元處理\n避免編碼問題
python -> storage : 儲存至 JSON 檔案\n格式化輸出
activate storage
storage --> python : 確認儲存成功
deactivate storage
python -> processor : 傳遞資料進行分析
activate processor
processor -> processor : 計算統計指標\n分析語言分佈\n提取熱門標籤
processor --> python : 返回分析結果
deactivate processor
python --> user : 顯示推文預覽\n輸出統計報告
deactivate python
note bottom
重要考量事項
API 速率限制
Twitter 標準 API 有請求頻率限制
需要實作適當的等待機制
資料品質控制
過濾垃圾推文與機器人帳號
處理重複資料與轉推
隱私與合規
遵守 Twitter 服務條款
保護使用者隱私資訊
符合資料保護法規
end note
@enduml網頁爬蟲技術的深度實踐
除了社群媒體平台,網際網路上還有大量有價值的資料散佈在各種網站中。從電子商務網站的商品資訊、新聞媒體的文章內容,到政府開放資料平台的統計數據,這些資料往往以 HTML 網頁的形式呈現。網頁爬蟲技術讓我們能夠自動化地收集這些資料,建立自己的資料集用於分析和研究。
網頁爬蟲的核心挑戰在於如何從非結構化的 HTML 文件中提取結構化的資料。HTML 是一種標記語言,主要用於描述網頁的視覺呈現,而非資料的語義結構。因此,我們需要使用解析工具來理解 HTML 的樹狀結構,並透過 CSS 選擇器或 XPath 表達式來定位目標元素。BeautifulSoup 是 Python 生態系統中最受歡迎的 HTML 解析函式庫,它提供了直觀的 API 來遍歷和搜尋解析樹。
在實作網頁爬蟲時,需要特別注意幾個重要的議題。首先是合法性和道德考量,我們應該遵守網站的 robots.txt 規則,尊重網站的服務條款,避免對目標網站造成過大的負載。其次是技術挑戰,包括處理動態載入的內容、應對反爬蟲機制、處理不同的字元編碼等。最後是資料品質問題,網頁上的資料可能包含錯誤、不一致或缺失值,需要進行適當的清理和驗證。
# 網頁爬蟲完整實作
# 此程式展示如何使用 BeautifulSoup 從網站擷取結構化資料
from bs4 import BeautifulSoup
import requests
import json
import time
from urllib.parse import urljoin, urlparse
from datetime import datetime
import re
class WebScraper:
"""
網頁爬蟲類別
提供完整的網頁資料擷取功能
包含錯誤處理、速率限制和資料清理
"""
def __init__(self, base_url, delay=1.0):
"""
初始化爬蟲
Args:
base_url (str): 基礎 URL
delay (float): 請求間隔秒數,避免過度請求
"""
self.base_url = base_url
self.delay = delay
self.session = requests.Session()
# 設定 User-Agent,模擬正常瀏覽器行為
self.session.headers.update({
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36'
)
})
def fetch_page(self, url, max_retries=3):
"""
取得網頁內容
Args:
url (str): 目標 URL
max_retries (int): 最大重試次數
Returns:
BeautifulSoup: 解析後的網頁物件,失敗時返回 None
"""
for attempt in range(max_retries):
try:
# 發送 HTTP GET 請求
response = self.session.get(url, timeout=10)
# 檢查 HTTP 狀態碼
response.raise_for_status()
# 設定正確的編碼
# 優先使用 Content-Type header 指定的編碼
if response.encoding == 'ISO-8859-1':
# 如果是預設編碼,嘗試從內容偵測
response.encoding = response.apparent_encoding
# 解析 HTML
soup = BeautifulSoup(response.text, 'lxml')
# 實作速率限制,避免對伺服器造成壓力
time.sleep(self.delay)
return soup
except requests.exceptions.RequestException as e:
print(f"請求失敗 (嘗試 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
# 指數退避策略
wait_time = (2 ** attempt) * self.delay
print(f"等待 {wait_time} 秒後重試...")
time.sleep(wait_time)
else:
print(f"無法取得網頁: {url}")
return None
return None
def extract_text(self, element):
"""
從 HTML 元素中提取純文字
Args:
element: BeautifulSoup 元素
Returns:
str: 清理後的文字內容
"""
if element is None:
return ""
# 取得文字並清理
text = element.get_text(strip=True)
# 移除多餘的空白字元
text = re.sub(r'\s+', ' ', text)
return text.strip()
def extract_attribute(self, element, attribute, default=""):
"""
從元素中提取屬性值
Args:
element: BeautifulSoup 元素
attribute (str): 屬性名稱
default: 預設值
Returns:
屬性值或預設值
"""
if element is None:
return default
return element.get(attribute, default)
def scrape_book_listing(self, url):
"""
爬取書籍清單頁面
此函式示範如何從圖書搜尋結果頁面
提取書籍資訊
Args:
url (str): 搜尋結果頁面 URL
Returns:
list: 書籍資料清單
"""
# 取得網頁內容
soup = self.fetch_page(url)
if soup is None:
return []
books = []
# 尋找所有書籍條目
# 根據網站結構調整選擇器
book_elements = soup.find_all('article', class_='product-result')
print(f"找到 {len(book_elements)} 本書籍")
for index, book_elem in enumerate(book_elements, 1):
try:
# 提取書名
title_elem = book_elem.find('p', class_='title')
title = self.extract_text(title_elem)
# 清理書名(移除尾部的破折號)
title = self.clean_title(title)
# 提取作者資訊
author_elem = book_elem.find('p', class_='note')
author = self.extract_text(author_elem)
# 提取發布日期
date_elem = book_elem.find('p', class_='note date2')
date_info = self.extract_date(date_elem)
# 提取詳細連結
link_elem = book_elem.find('a', class_='learn-more')
detail_url = self.extract_attribute(link_elem, 'href')
# 轉換為完整 URL
if detail_url:
detail_url = urljoin(self.base_url, detail_url)
# 提取圖片 URL
img_elem = book_elem.find('img')
image_url = self.extract_attribute(img_elem, 'src')
if image_url:
image_url = urljoin(self.base_url, image_url)
# 建立書籍資料字典
book_data = {
'id': index,
'title': title,
'author': author,
'publish_date': date_info.get('date', ''),
'date_prefix': date_info.get('prefix', ''),
'detail_url': detail_url,
'image_url': image_url,
'scraped_at': datetime.now().isoformat()
}
books.append(book_data)
# 顯示進度
if index % 5 == 0:
print(f"已處理 {index} 本書籍...")
except Exception as e:
print(f"處理第 {index} 本書籍時發生錯誤: {e}")
continue
return books
def clean_title(self, title):
"""
清理書名
移除尾部的破折號和多餘空白
Args:
title (str): 原始書名
Returns:
str: 清理後的書名
"""
# 分割字串
parts = title.split()
# 移除尾部的破折號
cleaned_parts = []
for part in parts:
if part == '-':
break
cleaned_parts.append(part)
return ' '.join(cleaned_parts)
def extract_date(self, date_element):
"""
提取發布日期資訊
Args:
date_element: 包含日期的 HTML 元素
Returns:
dict: 包含 prefix 和 date 的字典
"""
if date_element is None:
return {'prefix': '', 'date': ''}
# 取得完整文字
text = self.extract_text(date_element)
parts = text.split()
if len(parts) < 3:
return {'prefix': text, 'date': ''}
# 分離前綴和日期
prefix = f"{parts[0]} {parts[1]}"
if len(parts) == 5:
date = f"{parts[2]} {parts[3]} {parts[4]}"
elif len(parts) >= 4:
date = f"{parts[2]} {parts[3]}"
else:
date = parts[2] if len(parts) > 2 else ''
return {
'prefix': prefix,
'date': date
}
def save_to_json(self, data, filename):
"""
儲存資料至 JSON 檔案
Args:
data: 要儲存的資料
filename (str): 檔案名稱
"""
try:
with open(filename, 'w', encoding='utf-8') as file:
json.dump(
data,
file,
ensure_ascii=False,
indent=4,
sort_keys=True
)
print(f"成功儲存 {len(data)} 筆資料至 {filename}")
except IOError as e:
print(f"儲存檔案時發生錯誤: {e}")
def display_books(self, books, limit=5):
"""
格式化顯示書籍資訊
Args:
books (list): 書籍資料清單
limit (int): 顯示數量限制
"""
print("\n" + "="*80)
print(f"書籍資料預覽 (顯示前 {min(limit, len(books))} 本)")
print("="*80 + "\n")
for book in books[:limit]:
print(f"書名: {book['title']}")
print(f"作者: {book['author']}")
print(f"發布: {book['date_prefix']} {book['publish_date']}")
print(f"連結: {book['detail_url']}")
print("-" * 80)
def main():
"""
主程式流程
"""
# 設定目標 URL
search_url = f"{base_url}/?q=data+science&type=book"
# 建立爬蟲實例
scraper = WebScraper(base_url, delay=1.5)
print(f"開始爬取網頁: {search_url}")
# 執行爬蟲
books = scraper.scrape_book_listing(search_url)
if books:
# 儲存資料
output_file = 'scraped_books.json'
scraper.save_to_json(books, output_file)
# 顯示結果
scraper.display_books(books, limit=6)
# 簡單統計
print(f"\n總共爬取 {len(books)} 本書籍")
else:
print("未能取得書籍資料")
if __name__ == '__main__':
main()
這個網頁爬蟲程式展示了物件導向設計在資料擷取中的應用。透過將爬蟲功能封裝成類別,我們可以更好地組織程式碼,提高可重用性和可維護性。程式中實作了多個重要的功能,包括錯誤處理與重試機制、速率限制以避免對目標網站造成負擔、字元編碼的正確處理,以及資料的結構化提取和清理。
@startuml
!define DISABLE_LINK
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
title 網頁爬蟲資料擷取處理架構
package "Python 爬蟲程式" {
component [WebScraper 類別\n核心爬蟲引擎] as scraper
component [HTML 解析器\nBeautifulSoup] as parser
component [HTTP 客戶端\nRequests Session] as http
component [資料清理模組\n正規表示式處理] as cleaner
}
cloud "目標網站\nBook Search" as website
database "本地儲存系統" {
file "scraped_books.json\n結構化資料" as jsonfile
file "error_log.txt\n錯誤記錄" as logfile
}
actor "資料分析師" as analyst
analyst --> scraper : 設定搜尋參數\n啟動爬蟲程序
scraper --> http : 發送 HTTP 請求\n設定 User-Agent
http --> website : GET /search?q=keywords
website --> http : 返回 HTML 內容\n狀態碼 200
http --> scraper : 原始 HTML 文件
scraper --> parser : 傳遞 HTML\n建立解析樹
note right of parser
解析流程:
- 解析 HTML 結構建立 DOM 樹
- 定位目標元素使用 CSS 選擇器
選擇器範例:
article.product-result
p.title
a.learn-more
p.note.date2
end note
parser --> scraper : 返回元素物件
scraper --> cleaner : 提取文字內容\n清理資料
note right of cleaner
清理流程:
- 移除 HTML 標籤處理空白字元
- 正規化日期格式清理特殊字元
end note
cleaner --> scraper : 結構化資料物件
scraper --> jsonfile : 序列化並儲存 JSON 格式
scraper --> logfile : 記錄執行狀態錯誤資訊
scraper --> analyst : 返回爬取結果統計資訊
note bottom
網頁爬蟲最佳實務
遵守 robots.txt 規範:
- 檢查網站的爬蟲政策
- 避免爬取禁止的內容
實作速率限制:
- 設定合理的請求間隔
- 使用指數退避重試
- 避免對伺服器造成壓力
錯誤處理機制:
- 處理網路異常
- 應對 HTTP 錯誤碼
- 記錄詳細的錯誤日誌
資料品質控制:
- 驗證提取的資料
- 處理缺失值
- 標準化資料格式
end note
@endumlPandas 資料處理的進階技巧
當我們成功擷取到資料後,下一步就是對資料進行處理和分析。Pandas 是 Python 資料科學生態系統中最重要的函式庫之一,它提供了強大且靈活的資料結構來處理結構化資料。Pandas 的兩個核心資料結構是 Series(一維陣列)和 DataFrame(二維表格),它們建立在 NumPy 陣列之上,但提供了更豐富的功能和更直觀的 API。
在實務的資料科學專案中,原始資料往往不能直接用於分析。資料可能包含缺失值、重複記錄、不一致的格式,或是需要進行轉換和聚合。Pandas 提供了完整的工具集來處理這些問題。透過 Pandas 的資料清理功能,我們可以處理缺失值、移除重複項、轉換資料型別。透過資料轉換功能,我們可以建立新的衍生欄位、進行分組聚合、重塑資料結構。透過資料合併功能,我們可以結合來自不同來源的資料集。
資料的切片(Slicing)和切塊(Dicing)是資料探索的基本操作。切片讓我們能夠選取資料的子集,無論是特定的行、列,還是滿足某些條件的記錄。Pandas 提供了多種索引方式,包括位置索引(iloc)、標籤索引(loc)和布林索引。理解這些不同的索引方式,以及它們的適用場景,是高效使用 Pandas 的關鍵。
# Pandas 資料處理完整範例
# 展示資料載入、清理、轉換和分析的完整流程
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
def load_twitter_data(filename):
"""
載入 Twitter 資料並轉換為 DataFrame
Args:
filename (str): JSON 檔案路徑
Returns:
pd.DataFrame: Twitter 資料框
"""
# 載入 JSON 資料
with open(filename, 'r', encoding='utf-8') as file:
data = json.load(file)
# 轉換為 DataFrame
df = pd.DataFrame(data)
# 展開巢狀的使用者資訊
if 'user' in df.columns:
# 將巢狀字典展開為獨立欄位
user_df = pd.json_normalize(df['user'])
user_df.columns = ['user_' + col for col in user_df.columns]
# 合併回主 DataFrame
df = pd.concat([df.drop('user', axis=1), user_df], axis=1)
# 展開互動統計
if 'engagement' in df.columns:
engagement_df = pd.json_normalize(df['engagement'])
engagement_df.columns = ['engagement_' + col for col in engagement_df.columns]
df = pd.concat([df.drop('engagement', axis=1), engagement_df], axis=1)
# 轉換日期欄位為 datetime 型別
if 'created_at' in df.columns:
# Twitter 日期格式: 'Wed Oct 10 20:19:24 +0000 2018'
df['created_at'] = pd.to_datetime(
df['created_at'],
format='%a %b %d %H:%M:%S %z %Y',
errors='coerce' # 無法解析的日期設為 NaT
)
return df
def clean_data(df):
"""
清理資料框
處理缺失值、重複項和異常值
Args:
df (pd.DataFrame): 原始資料框
Returns:
pd.DataFrame: 清理後的資料框
"""
print("開始資料清理...")
print(f"原始資料形狀: {df.shape}")
# 1. 移除完全重複的記錄
before_dup = len(df)
df = df.drop_duplicates()
after_dup = len(df)
if before_dup > after_dup:
print(f"移除 {before_dup - after_dup} 筆重複記錄")
# 2. 處理缺失值
# 檢查每個欄位的缺失情況
missing_summary = df.isnull().sum()
missing_summary = missing_summary[missing_summary > 0]
if not missing_summary.empty:
print("\n缺失值統計:")
for col, count in missing_summary.items():
pct = count / len(df) * 100
print(f" {col}: {count} ({pct:.1f}%)")
# 3. 處理文字欄位的缺失值
text_columns = df.select_dtypes(include=['object']).columns
for col in text_columns:
df[col] = df[col].fillna('')
# 4. 處理數值欄位的缺失值
numeric_columns = df.select_dtypes(include=['int64', 'float64']).columns
for col in numeric_columns:
# 使用中位數填補
df[col] = df[col].fillna(df[col].median())
# 5. 移除文字長度異常的記錄
if 'text' in df.columns:
# 移除過短的推文(可能是無效資料)
min_length = 10
before_filter = len(df)
df = df[df['text'].str.len() >= min_length]
after_filter = len(df)
if before_filter > after_filter:
print(f"移除 {before_filter - after_filter} 筆文字過短的記錄")
print(f"清理後資料形狀: {df.shape}\n")
return df
def add_derived_features(df):
"""
新增衍生特徵
根據現有欄位計算新的特徵
Args:
df (pd.DataFrame): 原始資料框
Returns:
pd.DataFrame: 包含衍生特徵的資料框
"""
print("新增衍生特徵...")
# 1. 文字長度特徵
if 'text' in df.columns:
df['text_length'] = df['text'].str.len()
df['word_count'] = df['text'].str.split().str.len()
# 2. 時間相關特徵
if 'created_at' in df.columns:
# 提取小時、星期幾等資訊
df['hour'] = df['created_at'].dt.hour
df['day_of_week'] = df['created_at'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6])
# 計算距離現在的時間差
now = pd.Timestamp.now(tz='UTC')
df['days_since_post'] = (now - df['created_at']).dt.days
# 3. 互動指標
if all(col in df.columns for col in ['engagement_favorite_count', 'engagement_retweet_count']):
# 總互動數
df['total_engagement'] = (
df['engagement_favorite_count'] +
df['engagement_retweet_count']
)
# 互動率(考慮追蹤者數量)
if 'user_followers_count' in df.columns:
# 避免除以零
df['engagement_rate'] = df.apply(
lambda row: (
row['total_engagement'] / row['user_followers_count'] * 100
if row['user_followers_count'] > 0
else 0
),
axis=1
)
# 4. 主題標籤特徵
if 'hashtags' in df.columns:
# 主題標籤數量
df['hashtag_count'] = df['hashtags'].apply(
lambda x: len(x) if isinstance(x, list) else 0
)
# 5. 使用者等級分類
if 'user_followers_count' in df.columns:
# 根據追蹤者數量分類使用者影響力
df['user_tier'] = pd.cut(
df['user_followers_count'],
bins=[0, 100, 1000, 10000, float('inf')],
labels=['微型', '小型', '中型', '大型']
)
print(f"新增 {len([c for c in df.columns if c not in ['text', 'created_at']])} 個特徵\n")
return df
def analyze_data(df):
"""
執行探索性資料分析
Args:
df (pd.DataFrame): 資料框
"""
print("="*80)
print("資料集概覽")
print("="*80)
# 基本資訊
print(f"\n資料形狀: {df.shape[0]} 列 × {df.shape[1]} 欄")
print(f"記憶體使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
# 資料型別分佈
print("\n資料型別分佈:")
print(df.dtypes.value_counts())
# 數值欄位統計
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
if len(numeric_cols) > 0:
print("\n" + "="*80)
print("數值欄位統計摘要")
print("="*80)
print(df[numeric_cols].describe().round(2))
# 分類欄位分析
if 'lang' in df.columns:
print("\n" + "="*80)
print("語言分佈")
print("="*80)
lang_dist = df['lang'].value_counts()
for lang, count in lang_dist.head(5).items():
pct = count / len(df) * 100
print(f"{lang}: {count} ({pct:.1f}%)")
# 互動統計
if 'total_engagement' in df.columns:
print("\n" + "="*80)
print("互動統計")
print("="*80)
print(f"平均總互動數: {df['total_engagement'].mean():.2f}")
print(f"中位數總互動數: {df['total_engagement'].median():.2f}")
print(f"最高互動數: {df['total_engagement'].max()}")
# 時間分析
if 'hour' in df.columns:
print("\n" + "="*80)
print("發文時段分析")
print("="*80)
hour_dist = df['hour'].value_counts().sort_index()
peak_hour = hour_dist.idxmax()
print(f"最活躍時段: {peak_hour}:00 ({hour_dist[peak_hour]} 則推文)")
def slice_and_dice_demo(df):
"""
示範資料切片與切塊操作
Args:
df (pd.DataFrame): 資料框
"""
print("\n" + "="*80)
print("資料切片與切塊示範")
print("="*80)
# 1. 位置索引 (iloc)
print("\n1. 使用位置索引選取前 3 列:")
print(df.iloc[:3, :3]) # 前 3 列,前 3 欄
# 2. 標籤索引 (loc)
if 'text' in df.columns and 'user_screen_name' in df.columns:
print("\n2. 使用標籤索引選取特定欄位:")
print(df.loc[:2, ['user_screen_name', 'text', 'total_engagement']])
# 3. 布林索引
if 'engagement_favorite_count' in df.columns:
print("\n3. 使用布林索引篩選高互動推文:")
high_engagement = df[df['engagement_favorite_count'] > 100]
print(f"高互動推文數量: {len(high_engagement)}")
if len(high_engagement) > 0:
print(high_engagement[['user_screen_name', 'text', 'engagement_favorite_count']].head(3))
# 4. 組合條件
if all(col in df.columns for col in ['is_weekend', 'total_engagement']):
print("\n4. 組合條件查詢(週末且高互動):")
weekend_popular = df[
(df['is_weekend'] == True) &
(df['total_engagement'] > df['total_engagement'].median())
]
print(f"符合條件的推文數量: {len(weekend_popular)}")
def group_analysis_demo(df):
"""
示範分組聚合分析
Args:
df (pd.DataFrame): 資料框
"""
print("\n" + "="*80)
print("分組聚合分析示範")
print("="*80)
# 按語言分組
if 'lang' in df.columns and 'total_engagement' in df.columns:
print("\n按語言分組的平均互動數:")
lang_engagement = df.groupby('lang')['total_engagement'].agg([
('平均', 'mean'),
('中位數', 'median'),
('最大值', 'max'),
('數量', 'count')
]).round(2)
print(lang_engagement.head())
# 按時段分組
if 'hour' in df.columns and 'total_engagement' in df.columns:
print("\n按發文時段分組的平均互動數:")
hour_engagement = df.groupby('hour')['total_engagement'].mean().round(2)
print(hour_engagement.head(10))
# 按使用者等級分組
if 'user_tier' in df.columns:
print("\n按使用者等級分組的統計:")
tier_stats = df.groupby('user_tier').agg({
'total_engagement': ['mean', 'median'],
'text_length': 'mean',
'hashtag_count': 'mean'
}).round(2)
print(tier_stats)
def main():
"""
主程式流程
"""
# 假設已有 Twitter 資料檔案
twitter_file = 'twitter_data.json'
try:
# 載入資料
print("載入資料...")
df = load_twitter_data(twitter_file)
# 清理資料
df = clean_data(df)
# 新增衍生特徵
df = add_derived_features(df)
# 探索性分析
analyze_data(df)
# 切片示範
slice_and_dice_demo(df)
# 分組分析
group_analysis_demo(df)
# 儲存處理後的資料
output_file = 'processed_twitter_data.csv'
df.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"\n處理後的資料已儲存至: {output_file}")
except FileNotFoundError:
print(f"找不到檔案: {twitter_file}")
print("請先執行 Twitter 資料擷取程式")
except Exception as e:
print(f"發生錯誤: {e}")
if __name__ == '__main__':
main()
這個完整的 Pandas 資料處理範例展示了資料科學工作流程中的關鍵步驟。從載入原始資料開始,我們進行了系統化的資料清理,包括處理缺失值、移除重複項和過濾異常值。接著建立了多個衍生特徵,這些特徵可以幫助我們從不同角度理解資料。最後透過探索性資料分析,我們對資料的分佈、趨勢和模式有了初步的認識。
資料視覺化的藝術與科學
資料視覺化是將抽象的數據轉化為直觀圖形的過程,它在資料科學中扮演著至關重要的角色。一個設計良好的視覺化圖表不僅能夠清晰地呈現資料的分佈和趨勢,更能夠揭示隱藏在資料中的模式和洞察。Python 提供了豐富的視覺化函式庫,其中 Matplotlib 是基礎函式庫,提供了底層的繪圖功能。Seaborn 則建立在 Matplotlib 之上,提供了更高階的統計圖表介面。
在選擇視覺化方式時,需要考慮資料的型別和分析目的。對於數值變數的分佈,我們可以使用直方圖或密度圖。對於類別變數,長條圖或圓餅圖可能更合適。對於變數之間的關係,散點圖和相關性熱力圖(Heatmap)是常用的選擇。時間序列資料則適合使用折線圖來展示趨勢。
熱力圖是一種特別有用的視覺化工具,它使用顏色的深淺來表示數值的大小。在資料科學中,熱力圖最常用於展示變數之間的相關性矩陣。透過相關性分析,我們可以快速識別哪些變數之間存在強相關關係,這對於特徵選擇和模型建構都非常重要。
# 資料視覺化完整範例
# 展示使用 Seaborn 和 Matplotlib 建立各種統計圖表
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
# 設定中文字型與視覺化風格
plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei'] # 微軟正黑體
plt.rcParams['axes.unicode_minus'] = False # 正確顯示負號
sns.set_style("whitegrid") # 設定背景風格
sns.set_palette("husl") # 設定色彩方案
def create_correlation_heatmap(df, numeric_cols=None, figsize=(12, 10)):
"""
建立相關性熱力圖
Args:
df (pd.DataFrame): 資料框
numeric_cols (list): 要分析的數值欄位清單,None 表示全部
figsize (tuple): 圖表大小
"""
# 選取數值欄位
if numeric_cols is None:
numeric_df = df.select_dtypes(include=['int64', 'float64'])
else:
numeric_df = df[numeric_cols]
# 計算相關係數矩陣
correlation_matrix = numeric_df.corr()
# 建立圖表
plt.figure(figsize=figsize)
# 繪製熱力圖
# annot=True: 在每個格子中顯示數值
# fmt='.2f': 數值格式為小數點後兩位
# cmap='coolwarm': 使用冷暖色調色盤
# center=0: 將色階的中心設在 0
# square=True: 使格子呈正方形
# linewidths=1: 格子之間的線寬
# cbar_kws: 色條的參數設定
sns.heatmap(
correlation_matrix,
annot=True,
fmt='.2f',
cmap='coolwarm',
center=0,
square=True,
linewidths=1,
cbar_kws={"shrink": 0.8, "label": "相關係數"}
)
plt.title('變數相關性熱力圖', fontsize=16, fontweight='bold', pad=20)
plt.xlabel('變數', fontsize=12)
plt.ylabel('變數', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
# 儲存圖表
plt.savefig('correlation_heatmap.png', dpi=300, bbox_inches='tight')
plt.show()
# 找出強相關的變數對
print("\n強相關變數對 (|r| > 0.7):")
# 取得上三角矩陣的索引(避免重複)
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
correlation_matrix_masked = correlation_matrix.mask(mask)
# 找出絕對值大於 0.7 的相關係數
strong_corr = correlation_matrix_masked[abs(correlation_matrix_masked) > 0.7].stack()
if len(strong_corr) > 0:
for (var1, var2), corr in strong_corr.items():
print(f"{var1} vs {var2}: {corr:.3f}")
else:
print("沒有發現強相關的變數對")
def create_distribution_plots(df, columns, figsize=(15, 10)):
"""
建立分佈圖
Args:
df (pd.DataFrame): 資料框
columns (list): 要繪製的欄位清單
figsize (tuple): 圖表大小
"""
n_cols = len(columns)
n_rows = (n_cols + 2) // 3 # 每行 3 個子圖
fig, axes = plt.subplots(n_rows, 3, figsize=figsize)
axes = axes.flatten() if n_cols > 1 else [axes]
for idx, col in enumerate(columns):
ax = axes[idx]
# 繪製直方圖與核密度估計
sns.histplot(
data=df,
x=col,
kde=True, # 顯示核密度估計曲線
ax=ax,
bins=30,
color='skyblue',
edgecolor='black',
alpha=0.7
)
# 添加平均值線
mean_val = df[col].mean()
ax.axvline(
mean_val,
color='red',
linestyle='--',
linewidth=2,
label=f'平均值: {mean_val:.2f}'
)
# 添加中位數線
median_val = df[col].median()
ax.axvline(
median_val,
color='green',
linestyle='--',
linewidth=2,
label=f'中位數: {median_val:.2f}'
)
ax.set_title(f'{col} 分佈', fontsize=12, fontweight='bold')
ax.set_xlabel(col, fontsize=10)
ax.set_ylabel('頻率', fontsize=10)
ax.legend()
ax.grid(True, alpha=0.3)
# 隱藏多餘的子圖
for idx in range(n_cols, len(axes)):
axes[idx].set_visible(False)
plt.suptitle('變數分佈分析', fontsize=16, fontweight='bold', y=1.02)
plt.tight_layout()
plt.savefig('distribution_plots.png', dpi=300, bbox_inches='tight')
plt.show()
def create_scatter_matrix(df, columns, figsize=(15, 15)):
"""
建立散點圖矩陣
Args:
df (pd.DataFrame): 資料框
columns (list): 要分析的欄位清單
figsize (tuple): 圖表大小
"""
# 使用 Seaborn 的 pairplot 功能
# diag_kind='kde': 對角線顯示核密度估計
# plot_kws: 散點圖的參數
# diag_kws: 對角線圖的參數
pairplot = sns.pairplot(
df[columns],
diag_kind='kde',
plot_kws={'alpha': 0.6, 's': 50, 'edgecolor': 'k'},
diag_kws={'alpha': 0.7}
)
pairplot.fig.suptitle('變數關係散點圖矩陣', fontsize=16, fontweight='bold', y=1.01)
plt.tight_layout()
plt.savefig('scatter_matrix.png', dpi=300, bbox_inches='tight')
plt.show()
def create_time_series_plot(df, time_col, value_col, figsize=(15, 6)):
"""
建立時間序列圖
Args:
df (pd.DataFrame): 資料框
time_col (str): 時間欄位名稱
value_col (str): 數值欄位名稱
figsize (tuple): 圖表大小
"""
plt.figure(figsize=figsize)
# 確保時間欄位為 datetime 型別
if not pd.api.types.is_datetime64_any_dtype(df[time_col]):
df[time_col] = pd.to_datetime(df[time_col])
# 依時間排序
df_sorted = df.sort_values(time_col)
# 繪製折線圖
plt.plot(
df_sorted[time_col],
df_sorted[value_col],
linewidth=2,
marker='o',
markersize=4,
alpha=0.7,
label=value_col
)
# 計算移動平均
window_size = min(7, len(df_sorted) // 10)
if window_size > 2:
rolling_mean = df_sorted[value_col].rolling(window=window_size).mean()
plt.plot(
df_sorted[time_col],
rolling_mean,
linewidth=3,
color='red',
alpha=0.8,
label=f'{window_size}期移動平均'
)
plt.title(f'{value_col} 時間序列趨勢', fontsize=16, fontweight='bold', pad=20)
plt.xlabel('時間', fontsize=12)
plt.ylabel(value_col, fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('time_series_plot.png', dpi=300, bbox_inches='tight')
plt.show()
def create_box_plots(df, category_col, value_cols, figsize=(15, 5)):
"""
建立箱形圖
Args:
df (pd.DataFrame): 資料框
category_col (str): 分類欄位
value_cols (list): 數值欄位清單
figsize (tuple): 圖表大小
"""
n_plots = len(value_cols)
fig, axes = plt.subplots(1, n_plots, figsize=figsize)
if n_plots == 1:
axes = [axes]
for idx, value_col in enumerate(value_cols):
ax = axes[idx]
# 繪製箱形圖
sns.boxplot(
data=df,
x=category_col,
y=value_col,
ax=ax,
palette='Set2'
)
# 添加數據點
sns.stripplot(
data=df,
x=category_col,
y=value_col,
ax=ax,
color='black',
alpha=0.3,
size=3
)
ax.set_title(f'{value_col} 分佈比較', fontsize=12, fontweight='bold')
ax.set_xlabel(category_col, fontsize=10)
ax.set_ylabel(value_col, fontsize=10)
ax.grid(True, alpha=0.3, axis='y')
# 旋轉 x 軸標籤
ax.tick_params(axis='x', rotation=45)
plt.suptitle(f'按 {category_col} 分組的箱形圖分析', fontsize=16, fontweight='bold', y=1.02)
plt.tight_layout()
plt.savefig('box_plots.png', dpi=300, bbox_inches='tight')
plt.show()
def main():
"""
視覺化示範主程式
"""
# 載入 Iris 資料集作為範例
from sklearn.datasets import load_iris
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['species'] = pd.Categorical.from_codes(iris.target, iris.target_names)
# 重新命名欄位為中文
df.columns = ['萼片長度', '萼片寬度', '花瓣長度', '花瓣寬度', '品種']
print("建立視覺化圖表...")
# 1. 相關性熱力圖
print("\n1. 建立相關性熱力圖...")
numeric_cols = ['萼片長度', '萼片寬度', '花瓣長度', '花瓣寬度']
create_correlation_heatmap(df, numeric_cols)
# 2. 分佈圖
print("\n2. 建立分佈圖...")
create_distribution_plots(df, numeric_cols)
# 3. 散點圖矩陣
print("\n3. 建立散點圖矩陣...")
create_scatter_matrix(df, numeric_cols)
# 4. 箱形圖
print("\n4. 建立箱形圖...")
create_box_plots(df, '品種', numeric_cols[:2])
print("\n所有圖表已建立完成!")
if __name__ == '__main__':
main()
蒙特卡羅模擬的實務應用
蒙特卡羅模擬(Monte Carlo Simulation)是一種基於隨機抽樣的數值計算方法,廣泛應用於金融風險評估、專案管理、科學計算等領域。這種方法的核心思想是透過大量的隨機抽樣來估計複雜系統的行為或計算難以直接求解的數學問題。蒙特卡羅這個名稱來自於摩納哥的蒙地卡羅賭場,因為這種方法的本質就是透過隨機性來進行計算。
在資料科學中,蒙特卡羅模擬常用於不確定性分析和風險評估。例如,在金融領域,我們可以使用蒙特卡羅模擬來估計投資組合的風險值(Value at Risk, VaR)。在專案管理中,可以模擬專案完成時間的分佈。在科學計算中,可以用來估計複雜的數學積分或解決高維度的最佳化問題。
蒙特卡羅模擬的有效性取決於兩個關鍵因素。第一是隨機數產生器的品質,我們需要確保產生的隨機數具有良好的統計特性。第二是抽樣數量,一般來說,抽樣數量越多,估計結果越準確,但計算成本也越高。在實務應用中,我們需要在精確度和計算效率之間取得平衡。
# 蒙特卡羅模擬完整範例
# 展示多種蒙特卡羅模擬的應用場景
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei']
plt.rcParams['axes.unicode_minus'] = False
def estimate_pi(n_samples=100000):
"""
使用蒙特卡羅方法估計圓周率 π
原理:
在單位正方形內隨機撒點
落在單位圓內的點的比例約為 π/4
Args:
n_samples (int): 隨機樣本數量
Returns:
float: π 的估計值
"""
print(f"\n使用 {n_samples:,} 個樣本估計圓周率...")
# 產生隨機點的座標
# 範圍為 [0, 1] × [0, 1]
x = np.random.uniform(0, 1, n_samples)
y = np.random.uniform(0, 1, n_samples)
# 計算每個點到原點的距離
distances = np.sqrt(x**2 + y**2)
# 判斷點是否在單位圓內
# 距離小於等於 1 表示在圓內
inside_circle = distances <= 1
# 計算在圓內的點的比例
ratio = np.sum(inside_circle) / n_samples
# 估計 π
# 圓的面積 = πr² (r=1 時為 π)
# 正方形面積 = 1
# 比例 = π/4
# 因此 π = 4 × 比例
pi_estimate = 4 * ratio
# 計算誤差
error = abs(pi_estimate - np.pi)
error_pct = error / np.pi * 100
print(f"估計值: {pi_estimate:.6f}")
print(f"真實值: {np.pi:.6f}")
print(f"絕對誤差: {error:.6f}")
print(f"相對誤差: {error_pct:.4f}%")
# 視覺化
plt.figure(figsize=(8, 8))
# 只繪製前 5000 個點以提高效能
sample_size = min(5000, n_samples)
plt.scatter(
x[:sample_size][inside_circle[:sample_size]],
y[:sample_size][inside_circle[:sample_size]],
c='blue', s=1, alpha=0.5, label='圓內'
)
plt.scatter(
x[:sample_size][~inside_circle[:sample_size]],
y[:sample_size][~inside_circle[:sample_size]],
c='red', s=1, alpha=0.5, label='圓外'
)
# 繪製單位圓
theta = np.linspace(0, 2*np.pi, 100)
plt.plot(np.cos(theta), np.sin(theta), 'k-', linewidth=2, label='單位圓')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.xlabel('X 座標')
plt.ylabel('Y 座標')
plt.title(f'蒙特卡羅估計 π\n估計值: {pi_estimate:.6f}', fontweight='bold')
plt.legend()
plt.axis('equal')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('monte_carlo_pi.png', dpi=300, bbox_inches='tight')
plt.show()
return pi_estimate
def simulate_stock_price(
initial_price=100,
mu=0.1,
sigma=0.2,
days=252,
n_simulations=1000
):
"""
模擬股票價格路徑
使用幾何布朗運動模型 (Geometric Brownian Motion)
Args:
initial_price (float): 初始股價
mu (float): 年化預期報酬率
sigma (float): 年化波動率
days (int): 交易天數
n_simulations (int): 模擬路徑數量
Returns:
np.ndarray: 股價路徑矩陣 (n_simulations × days)
"""
print(f"\n模擬 {n_simulations:,} 條股價路徑...")
print(f"初始股價: ${initial_price}")
print(f"預期年化報酬率: {mu*100:.1f}%")
print(f"年化波動率: {sigma*100:.1f}%")
print(f"模擬天數: {days}")
# 時間步長 (以年為單位)
dt = 1 / 252 # 假設一年 252 個交易日
# 建立儲存股價路徑的矩陣
prices = np.zeros((n_simulations, days))
prices[:, 0] = initial_price
# 產生隨機衝擊
# 使用標準常態分佈
random_shocks = np.random.normal(0, 1, (n_simulations, days - 1))
# 模擬股價路徑
for t in range(1, days):
# 幾何布朗運動公式
# dS = μS dt + σS dW
# S(t+1) = S(t) × exp((μ - σ²/2)dt + σ√dt × Z)
# 其中 Z ~ N(0,1)
drift = (mu - 0.5 * sigma**2) * dt
diffusion = sigma * np.sqrt(dt) * random_shocks[:, t-1]
prices[:, t] = prices[:, t-1] * np.exp(drift + diffusion)
# 計算統計資訊
final_prices = prices[:, -1]
mean_final = np.mean(final_prices)
median_final = np.median(final_prices)
std_final = np.std(final_prices)
# 計算風險指標
# VaR (Value at Risk) at 95% confidence level
var_95 = np.percentile(final_prices, 5)
loss_95 = initial_price - var_95
print(f"\n最終股價統計:")
print(f"平均值: ${mean_final:.2f}")
print(f"中位數: ${median_final:.2f}")
print(f"標準差: ${std_final:.2f}")
print(f"\n風險指標:")
print(f"95% 信賴水準的 VaR: ${var_95:.2f}")
print(f"潛在損失: ${loss_95:.2f} ({loss_95/initial_price*100:.2f}%)")
# 視覺化
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
# 繪製部分模擬路徑
sample_paths = min(100, n_simulations)
for i in range(sample_paths):
ax1.plot(prices[i, :], alpha=0.3, linewidth=0.5)
# 繪製平均路徑
mean_path = np.mean(prices, axis=0)
ax1.plot(mean_path, 'r-', linewidth=2, label='平均路徑')
# 繪製信賴區間
percentile_5 = np.percentile(prices, 5, axis=0)
percentile_95 = np.percentile(prices, 95, axis=0)
ax1.fill_between(
range(days),
percentile_5,
percentile_95,
alpha=0.2,
color='blue',
label='90% 信賴區間'
)
ax1.axhline(initial_price, color='k', linestyle='--', linewidth=1, label='初始股價')
ax1.set_xlabel('交易天數')
ax1.set_ylabel('股價 ($)')
ax1.set_title('股價模擬路徑', fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 繪製最終股價分佈
ax2.hist(final_prices, bins=50, density=True, alpha=0.7, edgecolor='black')
ax2.axvline(mean_final, color='r', linestyle='--', linewidth=2, label=f'平均值: ${mean_final:.2f}')
ax2.axvline(var_95, color='orange', linestyle='--', linewidth=2, label=f'95% VaR: ${var_95:.2f}')
# 添加理論分佈
x = np.linspace(final_prices.min(), final_prices.max(), 100)
# 對數常態分佈
log_mean = np.log(initial_price) + (mu - 0.5 * sigma**2) * (days * dt)
log_std = sigma * np.sqrt(days * dt)
theoretical_pdf = stats.lognorm.pdf(x, s=log_std, scale=np.exp(log_mean))
ax2.plot(x, theoretical_pdf, 'g-', linewidth=2, label='理論分佈')
ax2.set_xlabel('最終股價 ($)')
ax2.set_ylabel('機率密度')
ax2.set_title('最終股價分佈', fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('monte_carlo_stock.png', dpi=300, bbox_inches='tight')
plt.show()
return prices
def simulate_project_completion(
task_estimates,
n_simulations=10000
):
"""
模擬專案完成時間
每個任務的時間服從三角分佈 (樂觀、最可能、悲觀)
Args:
task_estimates (list): 任務時間估計列表
每個元素為 (樂觀, 最可能, 悲觀) 的三元組
n_simulations (int): 模擬次數
Returns:
np.ndarray: 專案完成時間的模擬結果
"""
print(f"\n模擬專案完成時間 ({n_simulations:,} 次)...")
n_tasks = len(task_estimates)
print(f"任務數量: {n_tasks}")
# 儲存每次模擬的專案完成時間
completion_times = np.zeros(n_simulations)
for sim in range(n_simulations):
project_time = 0
for optimistic, likely, pessimistic in task_estimates:
# 從三角分佈抽樣任務時間
task_time = np.random.triangular(optimistic, likely, pessimistic)
project_time += task_time
completion_times[sim] = project_time
# 統計分析
mean_time = np.mean(completion_times)
median_time = np.median(completion_times)
std_time = np.std(completion_times)
# 信賴區間
ci_50 = np.percentile(completion_times, 50)
ci_80 = np.percentile(completion_times, 80)
ci_95 = np.percentile(completion_times, 95)
print(f"\n專案完成時間統計:")
print(f"平均值: {mean_time:.1f} 天")
print(f"中位數: {median_time:.1f} 天")
print(f"標準差: {std_time:.1f} 天")
print(f"\n信賴水準:")
print(f"50% 機率在 {ci_50:.1f} 天內完成")
print(f"80% 機率在 {ci_80:.1f} 天內完成")
print(f"95% 機率在 {ci_95:.1f} 天內完成")
# 視覺化
plt.figure(figsize=(12, 6))
# 繪製直方圖
plt.hist(completion_times, bins=50, density=True, alpha=0.7, edgecolor='black')
# 添加統計線
plt.axvline(mean_time, color='r', linestyle='--', linewidth=2, label=f'平均值: {mean_time:.1f}天')
plt.axvline(ci_80, color='orange', linestyle='--', linewidth=2, label=f'80%信賴: {ci_80:.1f}天')
plt.axvline(ci_95, color='green', linestyle='--', linewidth=2, label=f'95%信賴: {ci_95:.1f}天')
plt.xlabel('專案完成時間 (天)')
plt.ylabel('機率密度')
plt.title('專案完成時間分佈', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('monte_carlo_project.png', dpi=300, bbox_inches='tight')
plt.show()
return completion_times
def main():
"""
蒙特卡羅模擬示範主程式
"""
# 1. 估計圓周率
pi_estimate = estimate_pi(n_samples=100000)
# 2. 股價模擬
stock_prices = simulate_stock_price(
initial_price=100,
mu=0.10,
sigma=0.25,
days=252,
n_simulations=1000
)
# 3. 專案完成時間模擬
# 任務估計: (樂觀, 最可能, 悲觀)
task_estimates = [
(5, 7, 12), # 任務 1
(3, 5, 8), # 任務 2
(10, 15, 25), # 任務 3
(7, 10, 15), # 任務 4
(4, 6, 10) # 任務 5
]
completion_times = simulate_project_completion(task_estimates, n_simulations=10000)
print("\n所有蒙特卡羅模擬完成!")
if __name__ == '__main__':
main()
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam linetype ortho
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
title 蒙特卡羅模擬應用架構與流程
package "蒙特卡羅模擬引擎" {
component [隨機數產生器\nNumPy Random] as rng
component [統計分析模組\nSciPy Stats] as stats
component [視覺化模組\nMatplotlib] as viz
}
package "應用場景" {
usecase "數學計算\n(估計圓周率)" as math
usecase "金融模擬\n(股價路徑)" as finance
usecase "專案管理\n(完成時間)" as project
}
actor "資料科學家\n決策者" as user
user --> math : 設定樣本數量
user --> finance : 設定市場參數
user --> project : 設定任務估計
math --> rng : 產生均勻分佈\n隨機座標
note right of math
演算法:
1. 產生隨機點 (x, y)
2. 計算距離 d = √(x²+y²)
3. 判斷是否在圓內
4. 計算比例估計 π
end note
finance --> rng : 產生常態分佈\n隨機衝擊
note right of finance
幾何布朗運動:
dS = μS dt + σS dW
參數:
μ = 預期報酬率
σ = 波動率
dW = 隨機過程
end note
project --> rng : 產生三角分佈\n任務時間
note right of project
三參數估計:
樂觀時間 (a)
最可能時間 (m)
悲觀時間 (b)
期望值:
E = (a + 4m + b) / 6
end note
rng --> stats : 傳遞模擬結果
stats --> stats : 計算統計指標
note right of stats
統計分析包括:
集中趨勢
平均值、中位數
離散程度
標準差、變異數
風險指標
VaR、信賴區間
百分位數
end note
stats --> viz : 傳遞分析結果
viz --> viz : 建立視覺化圖表
note right of viz
圖表類型:
分佈圖
直方圖、密度圖
路徑圖
時間序列、軌跡
統計圖
箱形圖、分位數
end note
viz --> user : 返回分析報告\n視覺化結果
note bottom
蒙特卡羅模擬優勢
適用範圍廣
處理複雜非線性系統
評估多變數相互作用
直觀易懂
透過大量抽樣逼近真實分佈
結果可視覺化呈現
實務價值高
風險評估與決策支援
不確定性量化分析
注意事項
樣本數量影響精確度
需要高品質隨機數產生器
計算成本與精度的平衡
適當的統計檢驗驗證結果
end note
@enduml主成分分析的降維技術
主成分分析(Principal Component Analysis, PCA)是一種廣泛使用的降維技術,在資料科學中佔有重要地位。當我們面對高維度的資料集時,往往會遇到維度詛咒(Curse of Dimensionality)的問題。隨著特徵數量的增加,資料點在高維空間中變得越來越稀疏,這會導致許多機器學習演算法的效能下降。PCA 透過找到資料的主要變異方向,將高維資料投影到低維空間,在降低維度的同時盡可能保留原始資料的資訊。
PCA 的數學基礎是線性代數中的特徵值分解。它會找到資料協方差矩陣的特徵向量,這些特徵向量指向資料變異最大的方向,稱為主成分。第一主成分對應最大的特徵值,捕捉了資料中最多的變異。第二主成分與第一主成分正交,捕捉了剩餘變異中最多的部分,以此類推。透過選擇前幾個主成分,我們可以在保留大部分資訊的同時大幅降低資料的維度。
在實務應用中,PCA 不僅用於降維,還可以用於資料視覺化、去除噪音、特徵提取等多種場景。在視覺化方面,我們可以將高維資料投影到二維或三維空間進行繪圖。在特徵工程中,主成分可以作為新的特徵用於機器學習模型。在資料壓縮中,可以用較少的主成分來近似原始資料,減少儲存空間和計算成本。
# 主成分分析完整範例
# 展示 PCA 在資料降維、視覺化和特徵提取中的應用
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_iris, load_wine
from mpl_toolkits.mplot3d import Axes3D
plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei']
plt.rcParams['axes.unicode_minus'] = False
def perform_pca_analysis(X, y, feature_names, target_names, n_components=2):
"""
執行完整的 PCA 分析
Args:
X (np.ndarray): 特徵矩陣
y (np.ndarray): 目標變數
feature_names (list): 特徵名稱列表
target_names (list): 類別名稱列表
n_components (int): 要保留的主成分數量
Returns:
tuple: (PCA 模型, 轉換後的資料)
"""
print("="*80)
print("主成分分析 (PCA)")
print("="*80)
# 1. 資料標準化
# PCA 對特徵的尺度敏感,需要先標準化
print("\n步驟 1: 資料標準化")
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
print(f"原始資料形狀: {X.shape}")
print(f"特徵數量: {X.shape[1]}")
print(f"樣本數量: {X.shape[0]}")
# 顯示標準化前後的統計量
print(f"\n標準化前:")
print(f"平均值範圍: [{X.mean(axis=0).min():.2f}, {X.mean(axis=0).max():.2f}]")
print(f"標準差範圍: [{X.std(axis=0).min():.2f}, {X.std(axis=0).max():.2f}]")
print(f"\n標準化後:")
print(f"平均值範圍: [{X_scaled.mean(axis=0).min():.2e}, {X_scaled.mean(axis=0).max():.2e}]")
print(f"標準差: {X_scaled.std(axis=0)[0]:.2f} (全部特徵)")
# 2. 執行 PCA
print(f"\n步驟 2: 執行 PCA (保留 {n_components} 個主成分)")
pca = PCA(n_components=n_components)
X_pca = pca.fit_transform(X_scaled)
# 3. 分析解釋變異量
print(f"\n步驟 3: 分析主成分")
explained_variance = pca.explained_variance_ratio_
cumulative_variance = np.cumsum(explained_variance)
print(f"\n各主成分解釋的變異量:")
for i, (var, cum_var) in enumerate(zip(explained_variance, cumulative_variance), 1):
print(f" PC{i}: {var*100:.2f}% (累積: {cum_var*100:.2f}%)")
# 4. 分析特徵貢獻
print(f"\n步驟 4: 分析特徵對主成分的貢獻")
components_df = pd.DataFrame(
pca.components_.T,
columns=[f'PC{i+1}' for i in range(n_components)],
index=feature_names
)
print("\n主成分載荷矩陣:")
print(components_df.round(3))
# 找出每個主成分的主要貢獻特徵
print(f"\n主要貢獻特徵:")
for i in range(n_components):
pc_name = f'PC{i+1}'
abs_loadings = components_df[pc_name].abs()
top_features = abs_loadings.nlargest(3)
print(f"\n{pc_name}:")
for feat, loading in top_features.items():
sign = '+' if components_df.loc[feat, pc_name] > 0 else '-'
print(f" {sign} {feat}: {abs(loading):.3f}")
return pca, X_pca, X_scaled
def visualize_pca_2d(X_pca, y, target_names, pca, figsize=(12, 5)):
"""
二維 PCA 視覺化
Args:
X_pca (np.ndarray): PCA 轉換後的資料
y (np.ndarray): 目標變數
target_names (list): 類別名稱
pca: PCA 模型
figsize (tuple): 圖表大小
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
# 左圖: 散點圖
colors = plt.cm.Set1(np.linspace(0, 1, len(target_names)))
for target, color in zip(range(len(target_names)), colors):
mask = y == target
ax1.scatter(
X_pca[mask, 0],
X_pca[mask, 1],
c=[color],
label=target_names[target],
alpha=0.7,
s=100,
edgecolors='k',
linewidth=0.5
)
ax1.set_xlabel(
f'第一主成分 (解釋 {pca.explained_variance_ratio_[0]*100:.1f}% 變異)',
fontsize=11
)
ax1.set_ylabel(
f'第二主成分 (解釋 {pca.explained_variance_ratio_[1]*100:.1f}% 變異)',
fontsize=11
)
ax1.set_title('PCA 二維投影', fontweight='bold', fontsize=13)
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.axhline(y=0, color='k', linestyle='--', linewidth=0.5)
ax1.axvline(x=0, color='k', linestyle='--', linewidth=0.5)
# 右圖: 解釋變異量
explained_var = pca.explained_variance_ratio_
cumulative_var = np.cumsum(explained_var)
x_pos = np.arange(len(explained_var))
ax2.bar(x_pos, explained_var * 100, alpha=0.7, label='個別變異量')
ax2.plot(x_pos, cumulative_var * 100, 'ro-', linewidth=2, label='累積變異量')
ax2.set_xlabel('主成分', fontsize=11)
ax2.set_ylabel('解釋的變異量 (%)', fontsize=11)
ax2.set_title('主成分解釋變異量', fontweight='bold', fontsize=13)
ax2.set_xticks(x_pos)
ax2.set_xticklabels([f'PC{i+1}' for i in x_pos])
ax2.legend()
ax2.grid(True, alpha=0.3, axis='y')
# 添加累積百分比標籤
for i, (var, cum) in enumerate(zip(explained_var, cumulative_var)):
ax2.text(i, cum * 100 + 2, f'{cum*100:.1f}%', ha='center', fontsize=9)
plt.tight_layout()
plt.savefig('pca_2d_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
def visualize_pca_3d(X_pca, y, target_names, pca, figsize=(10, 8)):
"""
三維 PCA 視覺化
Args:
X_pca (np.ndarray): PCA 轉換後的資料 (至少 3 個主成分)
y (np.ndarray): 目標變數
target_names (list): 類別名稱
pca: PCA 模型
figsize (tuple): 圖表大小
"""
if X_pca.shape[1] < 3:
print("需要至少 3 個主成分才能建立三維視覺化")
return
fig = plt.figure(figsize=figsize)
ax = fig.add_subplot(111, projection='3d')
colors = plt.cm.Set1(np.linspace(0, 1, len(target_names)))
for target, color in zip(range(len(target_names)), colors):
mask = y == target
ax.scatter(
X_pca[mask, 0],
X_pca[mask, 1],
X_pca[mask, 2],
c=[color],
label=target_names[target],
alpha=0.7,
s=100,
edgecolors='k',
linewidth=0.5
)
ax.set_xlabel(
f'PC1 ({pca.explained_variance_ratio_[0]*100:.1f}%)',
fontsize=10
)
ax.set_ylabel(
f'PC2 ({pca.explained_variance_ratio_[1]*100:.1f}%)',
fontsize=10
)
ax.set_zlabel(
f'PC3 ({pca.explained_variance_ratio_[2]*100:.1f}%)',
fontsize=10
)
ax.set_title(
f'PCA 三維投影\n(累積解釋 {np.sum(pca.explained_variance_ratio_[:3])*100:.1f}% 變異)',
fontweight='bold',
fontsize=13
)
ax.legend()
plt.tight_layout()
plt.savefig('pca_3d_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
def determine_optimal_components(X, max_components=None):
"""
決定最佳主成分數量
使用累積解釋變異量來判斷
Args:
X (np.ndarray): 標準化後的特徵矩陣
max_components (int): 最大主成分數量
Returns:
int: 建議的主成分數量
"""
if max_components is None:
max_components = min(X.shape)
# 執行完整 PCA
pca = PCA(n_components=max_components)
pca.fit(X)
# 計算累積解釋變異量
cumulative_var = np.cumsum(pca.explained_variance_ratio_)
# 尋找解釋 95% 變異所需的主成分數量
n_components_95 = np.argmax(cumulative_var >= 0.95) + 1
# 視覺化
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(range(1, max_components + 1), cumulative_var * 100, 'bo-', linewidth=2)
plt.axhline(y=95, color='r', linestyle='--', linewidth=2, label='95% 門檻')
plt.axvline(x=n_components_95, color='g', linestyle='--', linewidth=2,
label=f'建議: {n_components_95} 個主成分')
plt.xlabel('主成分數量')
plt.ylabel('累積解釋變異量 (%)')
plt.title('主成分數量選擇', fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.bar(range(1, max_components + 1), pca.explained_variance_ratio_ * 100, alpha=0.7)
plt.xlabel('主成分')
plt.ylabel('解釋變異量 (%)')
plt.title('各主成分貢獻', fontweight='bold')
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.savefig('pca_component_selection.png', dpi=300, bbox_inches='tight')
plt.show()
print(f"\n建議使用 {n_components_95} 個主成分")
print(f"可解釋 {cumulative_var[n_components_95-1]*100:.2f}% 的變異")
return n_components_95
def main():
"""
PCA 分析示範主程式
"""
# 載入 Iris 資料集
print("="*80)
print("範例 1: Iris 資料集 PCA 分析")
print("="*80)
iris = load_iris()
X_iris = iris.data
y_iris = iris.target
feature_names = iris.feature_names
target_names = iris.target_names
# 標準化
scaler = StandardScaler()
X_iris_scaled = scaler.fit_transform(X_iris)
# 決定最佳主成分數量
optimal_n = determine_optimal_components(X_iris_scaled)
# 執行 PCA 分析 (2D)
pca_2d, X_pca_2d, _ = perform_pca_analysis(
X_iris, y_iris, feature_names, target_names, n_components=2
)
# 二維視覺化
visualize_pca_2d(X_pca_2d, y_iris, target_names, pca_2d)
# 執行 PCA 分析 (3D)
pca_3d, X_pca_3d, _ = perform_pca_analysis(
X_iris, y_iris, feature_names, target_names, n_components=3
)
# 三維視覺化
visualize_pca_3d(X_pca_3d, y_iris, target_names, pca_3d)
print("\nPCA 分析完成!")
if __name__ == '__main__':
main()
結語
Python 在資料科學領域的應用已經發展成為一個完整且成熟的生態系統。從本文的深入探討中,我們看到了資料科學工作流程的各個環節,從資料擷取開始,透過 Twitter API 和網頁爬蟲技術收集原始資料。接著使用 Pandas 進行資料清理、轉換和聚合,建立適合分析的資料結構。在資料視覺化環節,我們運用 Matplotlib 和 Seaborn 建立各種統計圖表,讓資料的模式和趨勢一目了然。最後透過蒙特卡羅模擬和主成分分析這些進階技術,我們展示了如何運用統計方法來解決實際的商業問題。
在實務應用中,資料科學專案的成功不僅取決於技術的選擇,更在於對問題本質的理解和對整個工作流程的掌握。每個環節都需要仔細考量,從資料擷取時的合法性和道德問題,到資料處理中的品質控制,再到分析結果的解釋和呈現,都需要專業的判斷和決策。Python 提供了強大的工具,但真正發揮這些工具的價值,需要深厚的領域知識和實務經驗。
隨著人工智慧和機器學習技術的快速發展,資料科學的應用範圍還在不斷擴大。從傳統的商業智慧分析,到深度學習的影像辨識和自然語言處理,Python 都扮演著核心的角色。掌握本文介紹的基礎技術,是進入這個令人興奮的領域的第一步。透過持續的學習和實踐,結合具體的業務場景,資料科學家能夠為企業創造真正的價值,推動數據驅動的決策文化。記住,資料科學不僅是技術,更是一種思維方式,一種從資料中發現洞察、解決問題的能力。