在資料科學與資料工程的領域中,Pandas 是 Python 生態系中最重要的資料處理套件之一。它提供了直觀且強大的資料結構,讓開發者能夠高效地進行資料的載入、清理、轉換和分析。無論是處理小型的實驗資料還是大規模的生產資料,Pandas 都能提供適當的工具和方法來完成任務。

然而,隨著資料量的增長,單純使用 Pandas 的基本功能可能會面臨效能瓶頸和記憶體不足的問題。本文將深入探討 Pandas 的進階使用技巧,包括如何有效處理大型資料集、最佳化記憶體使用、以及與其他資料來源的整合應用。透過這些技巧的掌握,讀者將能夠更有效率地運用 Pandas 來解決實際的資料處理挑戰。

Pandas 基礎資料結構回顧

在深入進階技巧之前,有必要先回顧 Pandas 的核心資料結構。Pandas 主要提供兩種資料結構:Series 和 DataFrame。Series 是一維的標籤陣列,可以存放任何資料型態。DataFrame 則是二維的表格結構,由多個 Series 組成,類似於 Excel 試算表或 SQL 資料表。

DataFrame 是 Pandas 中最常用的資料結構,它具有行索引和列索引,可以方便地進行資料的選取、篩選和運算。理解 DataFrame 的內部結構對於效能最佳化非常重要,因為不同的操作方式會有顯著的效能差異。

# 匯入必要的套件
# pandas 是主要的資料處理套件
# numpy 提供高效能的數值運算支援
import pandas as pd
import numpy as np

# 建立一個簡單的 DataFrame
# 使用字典的方式建立,鍵為欄位名稱,值為資料列表
data = {
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'city': ['Taipei', 'Kaohsiung', 'Taichung', 'Taipei', 'Tainan'],
    'salary': [50000, 60000, 55000, 58000, 52000]
}
df = pd.DataFrame(data)

# 檢視 DataFrame 的基本資訊
# info() 顯示欄位名稱、非空值數量和資料型態
print(df.info())

# 檢視資料的統計摘要
# describe() 計算數值欄位的統計指標
print(df.describe())

# 檢視資料的前幾筆記錄
# head() 預設顯示前 5 筆,可以指定數量
print(df.head())

# 檢視資料的記憶體使用量
# memory_usage() 顯示每個欄位佔用的記憶體大小
print(df.memory_usage(deep=True))

資料載入的效能最佳化

資料載入是資料處理流程的第一步,也是影響整體效能的關鍵環節。Pandas 支援多種資料格式的載入,包括 CSV、Excel、JSON、Parquet、HDF5 等。不同的載入方式和參數設定會對效能和記憶體使用產生顯著影響。

在載入 CSV 檔案時,最重要的最佳化策略是指定資料型態。預設情況下,Pandas 會自動推測每個欄位的資料型態,這個過程需要讀取整個檔案,不僅耗時也可能產生不正確的型態推測。透過明確指定資料型態,可以避免型態推測的開銷,並確保資料以最有效的方式儲存。

import pandas as pd

# 定義各欄位的資料型態
# 使用更精確的型態可以大幅減少記憶體使用
# Float32 比 Float64 節省一半的記憶體
# Int16 適合小範圍的整數值
dtypes = {
    'product_id': pd.Int32Dtype(),      # 32位元整數,支援空值
    'product_name': pd.StringDtype(),   # 字串型態
    'category': pd.StringDtype(),       # 類別欄位
    'price': pd.Float32Dtype(),         # 32位元浮點數
    'quantity': pd.Int16Dtype(),        # 16位元整數
    'discount': pd.Float32Dtype()       # 折扣率
}

# 載入 CSV 檔案並指定資料型態
# usecols 指定只載入需要的欄位,減少記憶體使用
# nrows 限制載入的行數,適合測試和預覽
df = pd.read_csv(
    'data/products.csv',
    dtype=dtypes,
    usecols=dtypes.keys(),
    na_values=['', 'NULL', 'N/A']  # 指定空值的表示方式
)

# 將類別欄位轉換為 Categorical 型態
# Categorical 型態對於重複值較多的欄位可以大幅節省記憶體
# 因為它只儲存類別的整數索引而非完整字串
categorical_columns = ['category']
for col in categorical_columns:
    df[col] = df[col].astype('category')

# 比較記憶體使用量
print("記憶體使用量:")
print(df.memory_usage(deep=True).sum() / 1024**2, "MB")

對於特別大的檔案,即使最佳化了資料型態,仍可能無法一次性載入記憶體。此時可以使用分塊處理(chunking)的方式,將檔案分成多個小塊逐一處理。

import pandas as pd

# 定義資料型態
dtypes = {
    'user_id': pd.Int32Dtype(),
    'timestamp': pd.StringDtype(),
    'event_type': pd.StringDtype(),
    'value': pd.Float32Dtype()
}

# 使用 chunksize 參數進行分塊讀取
# 每次讀取 10000 行資料
# 這會回傳一個迭代器而非 DataFrame
chunk_iter = pd.read_csv(
    'data/large_events.csv',
    dtype=dtypes,
    chunksize=10000,
    parse_dates=['timestamp']  # 自動解析日期欄位
)

# 用於儲存處理結果的列表
results = []

# 迭代處理每個資料塊
for i, chunk in enumerate(chunk_iter):
    # 對每個資料塊進行處理
    # 例如:計算每個事件類型的統計值
    chunk_result = chunk.groupby('event_type')['value'].agg(['sum', 'count', 'mean'])
    results.append(chunk_result)

    # 顯示處理進度
    if (i + 1) % 10 == 0:
        print(f"已處理 {(i + 1) * 10000} 行資料")

# 合併所有結果
# 使用 concat 將所有分塊的結果合併
final_result = pd.concat(results)

# 對合併後的結果進行最終聚合
# 因為每個分塊的統計是獨立計算的,需要重新計算總體統計
final_result = final_result.groupby(level=0).agg({
    'sum': 'sum',
    'count': 'sum',
    'mean': lambda x: (x * results[0]['count']).sum() / results[0]['count'].sum()
})

print(final_result)
@startuml
!define PLANTUML_FORMAT svg
!theme _none_

skinparam dpi auto
skinparam shadowing false
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100

title 分塊處理流程

start
:讀取大型 CSV 檔案;

:設定 chunksize 參數;

while (還有資料塊?) is (是)
    :載入一個資料塊;
    :處理資料塊;
    note right
        - 資料清理
        - 欄位轉換
        - 統計計算
    end note
    :儲存處理結果;
endwhile (否)

:合併所有結果;
:輸出最終結果;

stop

@enduml

記憶體使用最佳化

記憶體管理是處理大型資料集時最重要的考量之一。Pandas 預設使用較大的資料型態來確保精確度和相容性,但這往往會造成不必要的記憶體浪費。透過適當的資料型態選擇和轉換,可以大幅減少記憶體使用。

數值型態的最佳化是最直接有效的方法。對於整數,根據數值範圍選擇適當的位元數可以節省大量記憶體。例如,如果數值範圍在 0 到 255 之間,使用 uint8 而非預設的 int64 可以節省 87.5% 的記憶體。

import pandas as pd
import numpy as np

def optimize_numeric_columns(df):
    """
    最佳化 DataFrame 中數值欄位的記憶體使用
    自動將數值欄位轉換為最小的適當型態
    """
    # 取得原始記憶體使用量
    start_mem = df.memory_usage(deep=True).sum() / 1024**2

    # 處理整數欄位
    for col in df.select_dtypes(include=['int64', 'int32']).columns:
        col_min = df[col].min()
        col_max = df[col].max()

        # 根據數值範圍選擇最小的整數型態
        if col_min >= 0:  # 無符號整數
            if col_max <= 255:
                df[col] = df[col].astype(np.uint8)
            elif col_max <= 65535:
                df[col] = df[col].astype(np.uint16)
            elif col_max <= 4294967295:
                df[col] = df[col].astype(np.uint32)
        else:  # 有符號整數
            if col_min >= -128 and col_max <= 127:
                df[col] = df[col].astype(np.int8)
            elif col_min >= -32768 and col_max <= 32767:
                df[col] = df[col].astype(np.int16)
            elif col_min >= -2147483648 and col_max <= 2147483647:
                df[col] = df[col].astype(np.int32)

    # 處理浮點數欄位
    for col in df.select_dtypes(include=['float64']).columns:
        # 將 float64 轉換為 float32
        # 對於大多數應用場景,float32 的精確度已經足夠
        df[col] = df[col].astype(np.float32)

    # 取得最佳化後的記憶體使用量
    end_mem = df.memory_usage(deep=True).sum() / 1024**2

    print(f"記憶體使用從 {start_mem:.2f} MB 減少到 {end_mem:.2f} MB")
    print(f"減少了 {100 * (start_mem - end_mem) / start_mem:.1f}%")

    return df

# 建立測試資料
np.random.seed(42)
n_rows = 100000
df = pd.DataFrame({
    'id': np.arange(n_rows),
    'category': np.random.randint(0, 10, n_rows),
    'value': np.random.randn(n_rows) * 100,
    'count': np.random.randint(0, 1000, n_rows)
})

# 執行最佳化
df_optimized = optimize_numeric_columns(df.copy())

對於字串欄位,如果欄位中的唯一值數量遠小於總行數,將其轉換為 Categorical 型態可以大幅節省記憶體。Categorical 型態內部使用整數索引來表示類別,只儲存一份類別字串的副本。

import pandas as pd
import numpy as np

def optimize_categorical_columns(df, threshold=0.5):
    """
    將適合的字串欄位轉換為 Categorical 型態

    參數:
        df: DataFrame
        threshold: 唯一值比例閾值,低於此值的欄位會被轉換
    """
    start_mem = df.memory_usage(deep=True).sum() / 1024**2

    for col in df.select_dtypes(include=['object', 'string']).columns:
        # 計算唯一值比例
        n_unique = df[col].nunique()
        n_total = len(df[col])
        ratio = n_unique / n_total

        if ratio < threshold:
            # 轉換為 Categorical 型態
            df[col] = df[col].astype('category')
            print(f"{col}: {n_unique} 個唯一值 / {n_total} 行 (比例: {ratio:.2%})")

    end_mem = df.memory_usage(deep=True).sum() / 1024**2

    print(f"\n記憶體使用從 {start_mem:.2f} MB 減少到 {end_mem:.2f} MB")
    print(f"減少了 {100 * (start_mem - end_mem) / start_mem:.1f}%")

    return df

# 建立測試資料
n_rows = 100000
categories = ['Electronics', 'Clothing', 'Food', 'Books', 'Sports']
cities = ['Taipei', 'Kaohsiung', 'Taichung', 'Tainan', 'Hsinchu']
statuses = ['Pending', 'Processing', 'Completed', 'Cancelled']

df = pd.DataFrame({
    'product_id': np.arange(n_rows),
    'category': np.random.choice(categories, n_rows),
    'city': np.random.choice(cities, n_rows),
    'status': np.random.choice(statuses, n_rows),
    'description': [f"Product description {i}" for i in range(n_rows)]  # 高唯一值比例
})

# 執行最佳化
df_optimized = optimize_categorical_columns(df.copy())

Excel 檔案的進階處理

Excel 是商業環境中最常見的資料格式之一。Pandas 提供了完整的 Excel 讀寫功能,但在處理複雜的 Excel 檔案時,需要了解一些進階技巧。

讀取特定工作表和範圍是常見的需求。Excel 檔案可能包含多個工作表,而且資料可能不是從第一行第一列開始。使用 sheet_name、skiprows 和 usecols 參數可以精確控制要讀取的內容。

import pandas as pd

# 讀取特定工作表
# sheet_name 可以是工作表名稱(字串)或索引(整數)
# 索引從 0 開始
df = pd.read_excel(
    'data/sales_report.xlsx',
    sheet_name='Q4_2024',          # 指定工作表名稱
    skiprows=3,                     # 跳過前 3 行(標題區域)
    usecols='B:F',                 # 只讀取 B 到 F 欄
    dtype_backend='numpy_nullable'  # 使用可空型態
)

# 讀取多個工作表
# 傳入工作表名稱的列表,會回傳一個字典
sheets = pd.read_excel(
    'data/sales_report.xlsx',
    sheet_name=['Q1_2024', 'Q2_2024', 'Q3_2024', 'Q4_2024'],
    dtype_backend='numpy_nullable'
)

# 合併所有季度資料
# 為每個季度添加識別欄位後合併
all_quarters = []
for quarter, df in sheets.items():
    df['quarter'] = quarter
    all_quarters.append(df)

combined_df = pd.concat(all_quarters, ignore_index=True)

# 讀取所有工作表
# 傳入 None 會讀取所有工作表
all_sheets = pd.read_excel(
    'data/sales_report.xlsx',
    sheet_name=None,
    dtype_backend='numpy_nullable'
)

print(f"工作表數量: {len(all_sheets)}")
for name, df in all_sheets.items():
    print(f"  {name}: {len(df)} 行")

處理具有層級結構的 Excel 資料是另一個常見需求。企業報表經常使用多層次的行標題和列標題來組織資料。Pandas 的 MultiIndex 功能可以完美處理這種結構。

import pandas as pd

# 讀取具有層級結構的 Excel 檔案
# index_col 指定哪些欄位作為行索引(支援多層)
# header 指定哪些行作為列標題(支援多層)
df = pd.read_excel(
    'data/hierarchical_report.xlsx',
    index_col=[0, 1],      # 使用第一和第二欄作為行索引
    header=[0, 1],         # 使用第一和第二行作為列標題
    dtype_backend='numpy_nullable'
)

# 存取層級索引的資料
# 使用 xs(cross-section)方法選取特定層級
# 例如:選取 "Asia" 區域的所有資料
asia_data = df.xs('Asia', level=0)

# 使用 loc 進行多層級選取
# 需要傳入元組來指定多個層級
specific_data = df.loc[('Asia', 'Taiwan'), :]

# 重設索引將 MultiIndex 轉換為一般欄位
# 這在需要進行資料庫匯出或一般處理時很有用
df_flat = df.reset_index()

# 堆疊(stack)操作:將列索引轉換為行索引
# 適合將寬表轉換為長表
df_stacked = df.stack(level=0)

# 樞紐(unstack)操作:將行索引轉換為列索引
# 適合將長表轉換為寬表
df_unstacked = df_stacked.unstack(level=1)

print("層級索引結構:")
print(f"行索引層級: {df.index.names}")
print(f"列索引層級: {df.columns.names}")

將 DataFrame 寫入 Excel 時,也可以使用各種格式化選項來產生專業的報表。

import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows

# 建立範例資料
df = pd.DataFrame({
    'Product': ['Widget A', 'Widget B', 'Widget C', 'Widget D'],
    'Category': ['Electronics', 'Electronics', 'Clothing', 'Food'],
    'Q1 Sales': [1500, 2300, 800, 1200],
    'Q2 Sales': [1800, 2100, 950, 1400],
    'Q3 Sales': [2000, 1900, 1100, 1600],
    'Q4 Sales': [2200, 2500, 1300, 1800]
})

# 計算總計
df['Total'] = df[['Q1 Sales', 'Q2 Sales', 'Q3 Sales', 'Q4 Sales']].sum(axis=1)

# 使用 ExcelWriter 寫入並設定格式
with pd.ExcelWriter('output/sales_report.xlsx', engine='openpyxl') as writer:
    # 寫入資料
    df.to_excel(writer, sheet_name='Sales', index=False, startrow=1)

    # 取得工作表物件進行格式化
    workbook = writer.book
    worksheet = writer.sheets['Sales']

    # 設定標題
    worksheet['A1'] = '2024 年度銷售報表'
    worksheet['A1'].font = Font(size=16, bold=True)
    worksheet.merge_cells('A1:G1')

    # 設定欄寬
    column_widths = {'A': 15, 'B': 15, 'C': 12, 'D': 12, 'E': 12, 'F': 12, 'G': 12}
    for col, width in column_widths.items():
        worksheet.column_dimensions[col].width = width

    # 設定標題行格式
    header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
    header_font = Font(color='FFFFFF', bold=True)

    for cell in worksheet[2]:
        cell.fill = header_fill
        cell.font = header_font
        cell.alignment = Alignment(horizontal='center')

    # 設定數字格式
    for row in range(3, len(df) + 3):
        for col in range(3, 8):  # C 到 G 欄
            cell = worksheet.cell(row=row, column=col)
            cell.number_format = '#,##0'
            cell.alignment = Alignment(horizontal='right')

print("報表已產生:output/sales_report.xlsx")

資料庫整合應用

Pandas 可以與各種關聯式資料庫無縫整合,透過 SQLAlchemy 作為資料庫連線的抽象層。這讓開發者能夠直接將資料庫查詢結果載入為 DataFrame,或將 DataFrame 寫入資料庫。

import pandas as pd
from sqlalchemy import create_engine, text

# 建立資料庫連線
# SQLAlchemy 支援多種資料庫,包括 PostgreSQL、MySQL、SQLite 等
# 連線字串格式:dialect+driver://username:password@host:port/database

# SQLite 範例(用於本機測試)
sqlite_engine = create_engine('sqlite:///data/example.db')

# PostgreSQL 範例
# pg_engine = create_engine('postgresql://user:password@localhost:5432/dbname')

# MySQL 範例
# mysql_engine = create_engine('mysql+pymysql://user:password@localhost:3306/dbname')

# 從資料庫讀取資料
# 使用 SQL 查詢並直接載入為 DataFrame
query = """
SELECT
    o.order_id,
    o.order_date,
    c.customer_name,
    p.product_name,
    oi.quantity,
    oi.unit_price,
    oi.quantity * oi.unit_price AS subtotal
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.order_date DESC
"""

df = pd.read_sql(query, con=sqlite_engine)

# 使用參數化查詢防止 SQL 注入
# 這是更安全的做法
from sqlalchemy import text

parameterized_query = text("""
SELECT * FROM orders
WHERE order_date BETWEEN :start_date AND :end_date
AND status = :status
""")

df = pd.read_sql(
    parameterized_query,
    con=sqlite_engine,
    params={
        'start_date': '2024-01-01',
        'end_date': '2024-12-31',
        'status': 'completed'
    }
)

# 分塊讀取大型查詢結果
# 適合記憶體有限的情況
chunk_iter = pd.read_sql(
    "SELECT * FROM large_table",
    con=sqlite_engine,
    chunksize=10000
)

for chunk in chunk_iter:
    # 處理每個資料塊
    process_chunk(chunk)

將 DataFrame 寫入資料庫同樣簡單,Pandas 提供了 to_sql 方法來完成這項工作。

import pandas as pd
from sqlalchemy import create_engine

# 建立連線
engine = create_engine('sqlite:///data/output.db')

# 建立範例 DataFrame
df = pd.DataFrame({
    'product_id': range(1, 101),
    'product_name': [f'Product {i}' for i in range(1, 101)],
    'category': ['A', 'B', 'C', 'D'] * 25,
    'price': [10.0 + i * 0.5 for i in range(100)],
    'stock': [100 - i for i in range(100)]
})

# 寫入資料庫
# if_exists 參數控制表格已存在時的行為:
# 'fail': 拋出錯誤(預設)
# 'replace': 刪除舊表格並建立新表格
# 'append': 將資料附加到現有表格
df.to_sql(
    name='products',          # 資料表名稱
    con=engine,               # 資料庫連線
    if_exists='replace',      # 如果表格存在則替換
    index=False,              # 不將索引寫入資料庫
    chunksize=1000,           # 分批寫入
    method='multi'            # 使用多值 INSERT 語法提升效能
)

# 驗證寫入結果
result = pd.read_sql("SELECT COUNT(*) as count FROM products", con=engine)
print(f"已寫入 {result['count'].values[0]} 筆資料")

# 使用自訂的資料型態
# 預設情況下 Pandas 會自動推測 SQL 資料型態
# 但有時需要明確指定
from sqlalchemy import Integer, String, Float

df.to_sql(
    name='products_typed',
    con=engine,
    if_exists='replace',
    index=False,
    dtype={
        'product_id': Integer,
        'product_name': String(100),
        'category': String(10),
        'price': Float,
        'stock': Integer
    }
)

效能最佳化技巧

除了記憶體最佳化之外,還有許多技巧可以提升 Pandas 的運算效能。了解這些技巧對於處理大型資料集尤其重要。

向量化運算是 Pandas 效能最佳化的核心原則。Pandas 建構在 NumPy 之上,NumPy 的向量化運算比 Python 原生迴圈快很多倍。因此,應該盡量使用 Pandas 提供的內建方法,而不是使用迴圈逐行處理資料。

import pandas as pd
import numpy as np
import time

# 建立測試資料
n_rows = 100000
df = pd.DataFrame({
    'a': np.random.randn(n_rows),
    'b': np.random.randn(n_rows)
})

# 不好的做法:使用迴圈逐行處理
start = time.time()
result_loop = []
for i in range(len(df)):
    result_loop.append(df.iloc[i]['a'] ** 2 + df.iloc[i]['b'] ** 2)
df['c_loop'] = result_loop
print(f"迴圈方式耗時: {time.time() - start:.3f} 秒")

# 好的做法:使用向量化運算
start = time.time()
df['c_vectorized'] = df['a'] ** 2 + df['b'] ** 2
print(f"向量化方式耗時: {time.time() - start:.3f} 秒")

# 使用 apply 函式(中等效能)
# apply 比迴圈快,但比向量化慢
start = time.time()
df['c_apply'] = df.apply(lambda row: row['a'] ** 2 + row['b'] ** 2, axis=1)
print(f"apply 方式耗時: {time.time() - start:.3f} 秒")

# 使用 NumPy 函式(最佳效能)
start = time.time()
df['c_numpy'] = np.square(df['a'].values) + np.square(df['b'].values)
print(f"NumPy 方式耗時: {time.time() - start:.3f} 秒")

使用適當的資料選取方法也會影響效能。loc 使用標籤選取,iloc 使用位置選取,at 和 iat 則用於選取單一值。選擇正確的方法可以提升效能。

import pandas as pd
import numpy as np
import time

# 建立測試資料
n_rows = 100000
df = pd.DataFrame({
    'a': np.random.randn(n_rows),
    'b': np.random.randn(n_rows)
}, index=[f'row_{i}' for i in range(n_rows)])

# 選取單一值時,使用 at/iat 比 loc/iloc 更快
# loc 方式
start = time.time()
for i in range(1000):
    _ = df.loc['row_500', 'a']
print(f"loc 選取單一值: {time.time() - start:.3f} 秒")

# at 方式
start = time.time()
for i in range(1000):
    _ = df.at['row_500', 'a']
print(f"at 選取單一值: {time.time() - start:.3f} 秒")

# iloc 方式
start = time.time()
for i in range(1000):
    _ = df.iloc[500, 0]
print(f"iloc 選取單一值: {time.time() - start:.3f} 秒")

# iat 方式
start = time.time()
for i in range(1000):
    _ = df.iat[500, 0]
print(f"iat 選取單一值: {time.time() - start:.3f} 秒")

使用 eval 和 query 方法可以在某些情況下提升效能,特別是對於複雜的條件運算。這些方法使用 numexpr 套件進行最佳化,可以避免建立中間結果。

import pandas as pd
import numpy as np

# 建立測試資料
n_rows = 1000000
df = pd.DataFrame({
    'a': np.random.randn(n_rows),
    'b': np.random.randn(n_rows),
    'c': np.random.randn(n_rows),
    'd': np.random.randn(n_rows)
})

# 一般方式
%timeit df['e'] = df['a'] + df['b'] * df['c'] - df['d']

# 使用 eval 方式
# eval 會解析字串表達式並進行最佳化
%timeit df.eval('e = a + b * c - d', inplace=True)

# 使用 query 進行條件篩選
# 比一般的布林索引更快,語法也更清晰
%timeit df_filtered = df[(df['a'] > 0) & (df['b'] < 0) & (df['c'] > 0.5)]
%timeit df_filtered = df.query('a > 0 and b < 0 and c > 0.5')

高效能儲存格式

對於需要頻繁讀寫的大型資料集,選擇適當的儲存格式可以大幅提升效能。Parquet 和 Feather 是兩種常用的高效能格式,它們都支援欄位壓縮和快速隨機存取。

import pandas as pd
import numpy as np
import time

# 建立測試資料
n_rows = 1000000
df = pd.DataFrame({
    'id': np.arange(n_rows),
    'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
    'value1': np.random.randn(n_rows),
    'value2': np.random.randn(n_rows),
    'timestamp': pd.date_range('2024-01-01', periods=n_rows, freq='s')
})

# 將類別欄位轉換為 Categorical 型態
df['category'] = df['category'].astype('category')

# 比較不同格式的讀寫效能

# CSV 格式
start = time.time()
df.to_csv('data/test.csv', index=False)
csv_write_time = time.time() - start

start = time.time()
df_csv = pd.read_csv('data/test.csv')
csv_read_time = time.time() - start

# Parquet 格式
# Parquet 是一種列式儲存格式,對於分析查詢非常有效
start = time.time()
df.to_parquet('data/test.parquet', index=False, compression='snappy')
parquet_write_time = time.time() - start

start = time.time()
df_parquet = pd.read_parquet('data/test.parquet')
parquet_read_time = time.time() - start

# Feather 格式
# Feather 專為快速序列化設計,適合 Python 和 R 之間的資料交換
start = time.time()
df.to_feather('data/test.feather')
feather_write_time = time.time() - start

start = time.time()
df_feather = pd.read_feather('data/test.feather')
feather_read_time = time.time() - start

# 輸出比較結果
import os
print("格式比較:")
print(f"{'格式':<10} {'寫入時間':<12} {'讀取時間':<12} {'檔案大小':<12}")
print("-" * 46)
print(f"{'CSV':<10} {csv_write_time:<12.3f} {csv_read_time:<12.3f} {os.path.getsize('data/test.csv')/1024**2:<12.1f} MB")
print(f"{'Parquet':<10} {parquet_write_time:<12.3f} {parquet_read_time:<12.3f} {os.path.getsize('data/test.parquet')/1024**2:<12.1f} MB")
print(f"{'Feather':<10} {feather_write_time:<12.3f} {feather_read_time:<12.3f} {os.path.getsize('data/test.feather')/1024**2:<12.1f} MB")

Parquet 格式特別適合需要選取特定欄位的分析場景,因為它是列式儲存,可以只讀取需要的欄位而不必讀取整個檔案。

import pandas as pd

# 只讀取特定欄位
# 這比讀取整個檔案後再選取欄位快很多
df_selected = pd.read_parquet(
    'data/test.parquet',
    columns=['id', 'value1']
)

# 使用 pyarrow 進行更進階的操作
import pyarrow.parquet as pq

# 讀取 Parquet 檔案的元資料
parquet_file = pq.ParquetFile('data/test.parquet')
print(f"行數: {parquet_file.metadata.num_rows}")
print(f"欄數: {parquet_file.metadata.num_columns}")
print(f"列群組數: {parquet_file.metadata.num_row_groups}")

# 讀取特定的列群組
# 這在處理超大型檔案時很有用
table = parquet_file.read_row_group(0, columns=['id', 'value1'])
df_row_group = table.to_pandas()

結語

Pandas 是一個功能強大且靈活的資料處理工具,但要充分發揮其潛力,需要深入了解它的內部運作機制和最佳實踐。本文介紹了資料載入最佳化、記憶體管理、Excel 進階處理、資料庫整合以及效能調校等主題,這些都是實際工作中經常會遇到的挑戰。

在處理大型資料集時,分塊處理和記憶體最佳化是關鍵策略。透過指定適當的資料型態、使用 Categorical 型態處理低基數欄位、以及選擇高效能的儲存格式,可以顯著減少記憶體使用並提升處理速度。向量化運算則是效能最佳化的核心原則,應該盡量避免使用 Python 迴圈逐行處理資料。

隨著資料量的持續增長,Pandas 的這些進階技巧變得越來越重要。同時,對於超大規模的資料處理需求,也可以考慮使用 Dask、Vaex 或 Polars 等平行運算框架,它們提供了類似 Pandas 的 API,但能夠處理超出記憶體容量的資料集。掌握 Pandas 的基礎和進階技巧,將為學習這些更進階的工具奠定堅實的基礎。