在資料科學與資料工程的領域中,Pandas 是 Python 生態系中最重要的資料處理套件之一。它提供了直觀且強大的資料結構,讓開發者能夠高效地進行資料的載入、清理、轉換和分析。無論是處理小型的實驗資料還是大規模的生產資料,Pandas 都能提供適當的工具和方法來完成任務。
然而,隨著資料量的增長,單純使用 Pandas 的基本功能可能會面臨效能瓶頸和記憶體不足的問題。本文將深入探討 Pandas 的進階使用技巧,包括如何有效處理大型資料集、最佳化記憶體使用、以及與其他資料來源的整合應用。透過這些技巧的掌握,讀者將能夠更有效率地運用 Pandas 來解決實際的資料處理挑戰。
Pandas 基礎資料結構回顧
在深入進階技巧之前,有必要先回顧 Pandas 的核心資料結構。Pandas 主要提供兩種資料結構:Series 和 DataFrame。Series 是一維的標籤陣列,可以存放任何資料型態。DataFrame 則是二維的表格結構,由多個 Series 組成,類似於 Excel 試算表或 SQL 資料表。
DataFrame 是 Pandas 中最常用的資料結構,它具有行索引和列索引,可以方便地進行資料的選取、篩選和運算。理解 DataFrame 的內部結構對於效能最佳化非常重要,因為不同的操作方式會有顯著的效能差異。
# 匯入必要的套件
# pandas 是主要的資料處理套件
# numpy 提供高效能的數值運算支援
import pandas as pd
import numpy as np
# 建立一個簡單的 DataFrame
# 使用字典的方式建立,鍵為欄位名稱,值為資料列表
data = {
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 32],
'city': ['Taipei', 'Kaohsiung', 'Taichung', 'Taipei', 'Tainan'],
'salary': [50000, 60000, 55000, 58000, 52000]
}
df = pd.DataFrame(data)
# 檢視 DataFrame 的基本資訊
# info() 顯示欄位名稱、非空值數量和資料型態
print(df.info())
# 檢視資料的統計摘要
# describe() 計算數值欄位的統計指標
print(df.describe())
# 檢視資料的前幾筆記錄
# head() 預設顯示前 5 筆,可以指定數量
print(df.head())
# 檢視資料的記憶體使用量
# memory_usage() 顯示每個欄位佔用的記憶體大小
print(df.memory_usage(deep=True))
資料載入的效能最佳化
資料載入是資料處理流程的第一步,也是影響整體效能的關鍵環節。Pandas 支援多種資料格式的載入,包括 CSV、Excel、JSON、Parquet、HDF5 等。不同的載入方式和參數設定會對效能和記憶體使用產生顯著影響。
在載入 CSV 檔案時,最重要的最佳化策略是指定資料型態。預設情況下,Pandas 會自動推測每個欄位的資料型態,這個過程需要讀取整個檔案,不僅耗時也可能產生不正確的型態推測。透過明確指定資料型態,可以避免型態推測的開銷,並確保資料以最有效的方式儲存。
import pandas as pd
# 定義各欄位的資料型態
# 使用更精確的型態可以大幅減少記憶體使用
# Float32 比 Float64 節省一半的記憶體
# Int16 適合小範圍的整數值
dtypes = {
'product_id': pd.Int32Dtype(), # 32位元整數,支援空值
'product_name': pd.StringDtype(), # 字串型態
'category': pd.StringDtype(), # 類別欄位
'price': pd.Float32Dtype(), # 32位元浮點數
'quantity': pd.Int16Dtype(), # 16位元整數
'discount': pd.Float32Dtype() # 折扣率
}
# 載入 CSV 檔案並指定資料型態
# usecols 指定只載入需要的欄位,減少記憶體使用
# nrows 限制載入的行數,適合測試和預覽
df = pd.read_csv(
'data/products.csv',
dtype=dtypes,
usecols=dtypes.keys(),
na_values=['', 'NULL', 'N/A'] # 指定空值的表示方式
)
# 將類別欄位轉換為 Categorical 型態
# Categorical 型態對於重複值較多的欄位可以大幅節省記憶體
# 因為它只儲存類別的整數索引而非完整字串
categorical_columns = ['category']
for col in categorical_columns:
df[col] = df[col].astype('category')
# 比較記憶體使用量
print("記憶體使用量:")
print(df.memory_usage(deep=True).sum() / 1024**2, "MB")
對於特別大的檔案,即使最佳化了資料型態,仍可能無法一次性載入記憶體。此時可以使用分塊處理(chunking)的方式,將檔案分成多個小塊逐一處理。
import pandas as pd
# 定義資料型態
dtypes = {
'user_id': pd.Int32Dtype(),
'timestamp': pd.StringDtype(),
'event_type': pd.StringDtype(),
'value': pd.Float32Dtype()
}
# 使用 chunksize 參數進行分塊讀取
# 每次讀取 10000 行資料
# 這會回傳一個迭代器而非 DataFrame
chunk_iter = pd.read_csv(
'data/large_events.csv',
dtype=dtypes,
chunksize=10000,
parse_dates=['timestamp'] # 自動解析日期欄位
)
# 用於儲存處理結果的列表
results = []
# 迭代處理每個資料塊
for i, chunk in enumerate(chunk_iter):
# 對每個資料塊進行處理
# 例如:計算每個事件類型的統計值
chunk_result = chunk.groupby('event_type')['value'].agg(['sum', 'count', 'mean'])
results.append(chunk_result)
# 顯示處理進度
if (i + 1) % 10 == 0:
print(f"已處理 {(i + 1) * 10000} 行資料")
# 合併所有結果
# 使用 concat 將所有分塊的結果合併
final_result = pd.concat(results)
# 對合併後的結果進行最終聚合
# 因為每個分塊的統計是獨立計算的,需要重新計算總體統計
final_result = final_result.groupby(level=0).agg({
'sum': 'sum',
'count': 'sum',
'mean': lambda x: (x * results[0]['count']).sum() / results[0]['count'].sum()
})
print(final_result)
@startuml
!define PLANTUML_FORMAT svg
!theme _none_
skinparam dpi auto
skinparam shadowing false
skinparam roundcorner 5
skinparam defaultFontName "Microsoft JhengHei UI"
skinparam defaultFontSize 16
skinparam minClassWidth 100
title 分塊處理流程
start
:讀取大型 CSV 檔案;
:設定 chunksize 參數;
while (還有資料塊?) is (是)
:載入一個資料塊;
:處理資料塊;
note right
- 資料清理
- 欄位轉換
- 統計計算
end note
:儲存處理結果;
endwhile (否)
:合併所有結果;
:輸出最終結果;
stop
@enduml記憶體使用最佳化
記憶體管理是處理大型資料集時最重要的考量之一。Pandas 預設使用較大的資料型態來確保精確度和相容性,但這往往會造成不必要的記憶體浪費。透過適當的資料型態選擇和轉換,可以大幅減少記憶體使用。
數值型態的最佳化是最直接有效的方法。對於整數,根據數值範圍選擇適當的位元數可以節省大量記憶體。例如,如果數值範圍在 0 到 255 之間,使用 uint8 而非預設的 int64 可以節省 87.5% 的記憶體。
import pandas as pd
import numpy as np
def optimize_numeric_columns(df):
"""
最佳化 DataFrame 中數值欄位的記憶體使用
自動將數值欄位轉換為最小的適當型態
"""
# 取得原始記憶體使用量
start_mem = df.memory_usage(deep=True).sum() / 1024**2
# 處理整數欄位
for col in df.select_dtypes(include=['int64', 'int32']).columns:
col_min = df[col].min()
col_max = df[col].max()
# 根據數值範圍選擇最小的整數型態
if col_min >= 0: # 無符號整數
if col_max <= 255:
df[col] = df[col].astype(np.uint8)
elif col_max <= 65535:
df[col] = df[col].astype(np.uint16)
elif col_max <= 4294967295:
df[col] = df[col].astype(np.uint32)
else: # 有符號整數
if col_min >= -128 and col_max <= 127:
df[col] = df[col].astype(np.int8)
elif col_min >= -32768 and col_max <= 32767:
df[col] = df[col].astype(np.int16)
elif col_min >= -2147483648 and col_max <= 2147483647:
df[col] = df[col].astype(np.int32)
# 處理浮點數欄位
for col in df.select_dtypes(include=['float64']).columns:
# 將 float64 轉換為 float32
# 對於大多數應用場景,float32 的精確度已經足夠
df[col] = df[col].astype(np.float32)
# 取得最佳化後的記憶體使用量
end_mem = df.memory_usage(deep=True).sum() / 1024**2
print(f"記憶體使用從 {start_mem:.2f} MB 減少到 {end_mem:.2f} MB")
print(f"減少了 {100 * (start_mem - end_mem) / start_mem:.1f}%")
return df
# 建立測試資料
np.random.seed(42)
n_rows = 100000
df = pd.DataFrame({
'id': np.arange(n_rows),
'category': np.random.randint(0, 10, n_rows),
'value': np.random.randn(n_rows) * 100,
'count': np.random.randint(0, 1000, n_rows)
})
# 執行最佳化
df_optimized = optimize_numeric_columns(df.copy())
對於字串欄位,如果欄位中的唯一值數量遠小於總行數,將其轉換為 Categorical 型態可以大幅節省記憶體。Categorical 型態內部使用整數索引來表示類別,只儲存一份類別字串的副本。
import pandas as pd
import numpy as np
def optimize_categorical_columns(df, threshold=0.5):
"""
將適合的字串欄位轉換為 Categorical 型態
參數:
df: DataFrame
threshold: 唯一值比例閾值,低於此值的欄位會被轉換
"""
start_mem = df.memory_usage(deep=True).sum() / 1024**2
for col in df.select_dtypes(include=['object', 'string']).columns:
# 計算唯一值比例
n_unique = df[col].nunique()
n_total = len(df[col])
ratio = n_unique / n_total
if ratio < threshold:
# 轉換為 Categorical 型態
df[col] = df[col].astype('category')
print(f"{col}: {n_unique} 個唯一值 / {n_total} 行 (比例: {ratio:.2%})")
end_mem = df.memory_usage(deep=True).sum() / 1024**2
print(f"\n記憶體使用從 {start_mem:.2f} MB 減少到 {end_mem:.2f} MB")
print(f"減少了 {100 * (start_mem - end_mem) / start_mem:.1f}%")
return df
# 建立測試資料
n_rows = 100000
categories = ['Electronics', 'Clothing', 'Food', 'Books', 'Sports']
cities = ['Taipei', 'Kaohsiung', 'Taichung', 'Tainan', 'Hsinchu']
statuses = ['Pending', 'Processing', 'Completed', 'Cancelled']
df = pd.DataFrame({
'product_id': np.arange(n_rows),
'category': np.random.choice(categories, n_rows),
'city': np.random.choice(cities, n_rows),
'status': np.random.choice(statuses, n_rows),
'description': [f"Product description {i}" for i in range(n_rows)] # 高唯一值比例
})
# 執行最佳化
df_optimized = optimize_categorical_columns(df.copy())
Excel 檔案的進階處理
Excel 是商業環境中最常見的資料格式之一。Pandas 提供了完整的 Excel 讀寫功能,但在處理複雜的 Excel 檔案時,需要了解一些進階技巧。
讀取特定工作表和範圍是常見的需求。Excel 檔案可能包含多個工作表,而且資料可能不是從第一行第一列開始。使用 sheet_name、skiprows 和 usecols 參數可以精確控制要讀取的內容。
import pandas as pd
# 讀取特定工作表
# sheet_name 可以是工作表名稱(字串)或索引(整數)
# 索引從 0 開始
df = pd.read_excel(
'data/sales_report.xlsx',
sheet_name='Q4_2024', # 指定工作表名稱
skiprows=3, # 跳過前 3 行(標題區域)
usecols='B:F', # 只讀取 B 到 F 欄
dtype_backend='numpy_nullable' # 使用可空型態
)
# 讀取多個工作表
# 傳入工作表名稱的列表,會回傳一個字典
sheets = pd.read_excel(
'data/sales_report.xlsx',
sheet_name=['Q1_2024', 'Q2_2024', 'Q3_2024', 'Q4_2024'],
dtype_backend='numpy_nullable'
)
# 合併所有季度資料
# 為每個季度添加識別欄位後合併
all_quarters = []
for quarter, df in sheets.items():
df['quarter'] = quarter
all_quarters.append(df)
combined_df = pd.concat(all_quarters, ignore_index=True)
# 讀取所有工作表
# 傳入 None 會讀取所有工作表
all_sheets = pd.read_excel(
'data/sales_report.xlsx',
sheet_name=None,
dtype_backend='numpy_nullable'
)
print(f"工作表數量: {len(all_sheets)}")
for name, df in all_sheets.items():
print(f" {name}: {len(df)} 行")
處理具有層級結構的 Excel 資料是另一個常見需求。企業報表經常使用多層次的行標題和列標題來組織資料。Pandas 的 MultiIndex 功能可以完美處理這種結構。
import pandas as pd
# 讀取具有層級結構的 Excel 檔案
# index_col 指定哪些欄位作為行索引(支援多層)
# header 指定哪些行作為列標題(支援多層)
df = pd.read_excel(
'data/hierarchical_report.xlsx',
index_col=[0, 1], # 使用第一和第二欄作為行索引
header=[0, 1], # 使用第一和第二行作為列標題
dtype_backend='numpy_nullable'
)
# 存取層級索引的資料
# 使用 xs(cross-section)方法選取特定層級
# 例如:選取 "Asia" 區域的所有資料
asia_data = df.xs('Asia', level=0)
# 使用 loc 進行多層級選取
# 需要傳入元組來指定多個層級
specific_data = df.loc[('Asia', 'Taiwan'), :]
# 重設索引將 MultiIndex 轉換為一般欄位
# 這在需要進行資料庫匯出或一般處理時很有用
df_flat = df.reset_index()
# 堆疊(stack)操作:將列索引轉換為行索引
# 適合將寬表轉換為長表
df_stacked = df.stack(level=0)
# 樞紐(unstack)操作:將行索引轉換為列索引
# 適合將長表轉換為寬表
df_unstacked = df_stacked.unstack(level=1)
print("層級索引結構:")
print(f"行索引層級: {df.index.names}")
print(f"列索引層級: {df.columns.names}")
將 DataFrame 寫入 Excel 時,也可以使用各種格式化選項來產生專業的報表。
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows
# 建立範例資料
df = pd.DataFrame({
'Product': ['Widget A', 'Widget B', 'Widget C', 'Widget D'],
'Category': ['Electronics', 'Electronics', 'Clothing', 'Food'],
'Q1 Sales': [1500, 2300, 800, 1200],
'Q2 Sales': [1800, 2100, 950, 1400],
'Q3 Sales': [2000, 1900, 1100, 1600],
'Q4 Sales': [2200, 2500, 1300, 1800]
})
# 計算總計
df['Total'] = df[['Q1 Sales', 'Q2 Sales', 'Q3 Sales', 'Q4 Sales']].sum(axis=1)
# 使用 ExcelWriter 寫入並設定格式
with pd.ExcelWriter('output/sales_report.xlsx', engine='openpyxl') as writer:
# 寫入資料
df.to_excel(writer, sheet_name='Sales', index=False, startrow=1)
# 取得工作表物件進行格式化
workbook = writer.book
worksheet = writer.sheets['Sales']
# 設定標題
worksheet['A1'] = '2024 年度銷售報表'
worksheet['A1'].font = Font(size=16, bold=True)
worksheet.merge_cells('A1:G1')
# 設定欄寬
column_widths = {'A': 15, 'B': 15, 'C': 12, 'D': 12, 'E': 12, 'F': 12, 'G': 12}
for col, width in column_widths.items():
worksheet.column_dimensions[col].width = width
# 設定標題行格式
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
header_font = Font(color='FFFFFF', bold=True)
for cell in worksheet[2]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center')
# 設定數字格式
for row in range(3, len(df) + 3):
for col in range(3, 8): # C 到 G 欄
cell = worksheet.cell(row=row, column=col)
cell.number_format = '#,##0'
cell.alignment = Alignment(horizontal='right')
print("報表已產生:output/sales_report.xlsx")
資料庫整合應用
Pandas 可以與各種關聯式資料庫無縫整合,透過 SQLAlchemy 作為資料庫連線的抽象層。這讓開發者能夠直接將資料庫查詢結果載入為 DataFrame,或將 DataFrame 寫入資料庫。
import pandas as pd
from sqlalchemy import create_engine, text
# 建立資料庫連線
# SQLAlchemy 支援多種資料庫,包括 PostgreSQL、MySQL、SQLite 等
# 連線字串格式:dialect+driver://username:password@host:port/database
# SQLite 範例(用於本機測試)
sqlite_engine = create_engine('sqlite:///data/example.db')
# PostgreSQL 範例
# pg_engine = create_engine('postgresql://user:password@localhost:5432/dbname')
# MySQL 範例
# mysql_engine = create_engine('mysql+pymysql://user:password@localhost:3306/dbname')
# 從資料庫讀取資料
# 使用 SQL 查詢並直接載入為 DataFrame
query = """
SELECT
o.order_id,
o.order_date,
c.customer_name,
p.product_name,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price AS subtotal
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.order_date DESC
"""
df = pd.read_sql(query, con=sqlite_engine)
# 使用參數化查詢防止 SQL 注入
# 這是更安全的做法
from sqlalchemy import text
parameterized_query = text("""
SELECT * FROM orders
WHERE order_date BETWEEN :start_date AND :end_date
AND status = :status
""")
df = pd.read_sql(
parameterized_query,
con=sqlite_engine,
params={
'start_date': '2024-01-01',
'end_date': '2024-12-31',
'status': 'completed'
}
)
# 分塊讀取大型查詢結果
# 適合記憶體有限的情況
chunk_iter = pd.read_sql(
"SELECT * FROM large_table",
con=sqlite_engine,
chunksize=10000
)
for chunk in chunk_iter:
# 處理每個資料塊
process_chunk(chunk)
將 DataFrame 寫入資料庫同樣簡單,Pandas 提供了 to_sql 方法來完成這項工作。
import pandas as pd
from sqlalchemy import create_engine
# 建立連線
engine = create_engine('sqlite:///data/output.db')
# 建立範例 DataFrame
df = pd.DataFrame({
'product_id': range(1, 101),
'product_name': [f'Product {i}' for i in range(1, 101)],
'category': ['A', 'B', 'C', 'D'] * 25,
'price': [10.0 + i * 0.5 for i in range(100)],
'stock': [100 - i for i in range(100)]
})
# 寫入資料庫
# if_exists 參數控制表格已存在時的行為:
# 'fail': 拋出錯誤(預設)
# 'replace': 刪除舊表格並建立新表格
# 'append': 將資料附加到現有表格
df.to_sql(
name='products', # 資料表名稱
con=engine, # 資料庫連線
if_exists='replace', # 如果表格存在則替換
index=False, # 不將索引寫入資料庫
chunksize=1000, # 分批寫入
method='multi' # 使用多值 INSERT 語法提升效能
)
# 驗證寫入結果
result = pd.read_sql("SELECT COUNT(*) as count FROM products", con=engine)
print(f"已寫入 {result['count'].values[0]} 筆資料")
# 使用自訂的資料型態
# 預設情況下 Pandas 會自動推測 SQL 資料型態
# 但有時需要明確指定
from sqlalchemy import Integer, String, Float
df.to_sql(
name='products_typed',
con=engine,
if_exists='replace',
index=False,
dtype={
'product_id': Integer,
'product_name': String(100),
'category': String(10),
'price': Float,
'stock': Integer
}
)
效能最佳化技巧
除了記憶體最佳化之外,還有許多技巧可以提升 Pandas 的運算效能。了解這些技巧對於處理大型資料集尤其重要。
向量化運算是 Pandas 效能最佳化的核心原則。Pandas 建構在 NumPy 之上,NumPy 的向量化運算比 Python 原生迴圈快很多倍。因此,應該盡量使用 Pandas 提供的內建方法,而不是使用迴圈逐行處理資料。
import pandas as pd
import numpy as np
import time
# 建立測試資料
n_rows = 100000
df = pd.DataFrame({
'a': np.random.randn(n_rows),
'b': np.random.randn(n_rows)
})
# 不好的做法:使用迴圈逐行處理
start = time.time()
result_loop = []
for i in range(len(df)):
result_loop.append(df.iloc[i]['a'] ** 2 + df.iloc[i]['b'] ** 2)
df['c_loop'] = result_loop
print(f"迴圈方式耗時: {time.time() - start:.3f} 秒")
# 好的做法:使用向量化運算
start = time.time()
df['c_vectorized'] = df['a'] ** 2 + df['b'] ** 2
print(f"向量化方式耗時: {time.time() - start:.3f} 秒")
# 使用 apply 函式(中等效能)
# apply 比迴圈快,但比向量化慢
start = time.time()
df['c_apply'] = df.apply(lambda row: row['a'] ** 2 + row['b'] ** 2, axis=1)
print(f"apply 方式耗時: {time.time() - start:.3f} 秒")
# 使用 NumPy 函式(最佳效能)
start = time.time()
df['c_numpy'] = np.square(df['a'].values) + np.square(df['b'].values)
print(f"NumPy 方式耗時: {time.time() - start:.3f} 秒")
使用適當的資料選取方法也會影響效能。loc 使用標籤選取,iloc 使用位置選取,at 和 iat 則用於選取單一值。選擇正確的方法可以提升效能。
import pandas as pd
import numpy as np
import time
# 建立測試資料
n_rows = 100000
df = pd.DataFrame({
'a': np.random.randn(n_rows),
'b': np.random.randn(n_rows)
}, index=[f'row_{i}' for i in range(n_rows)])
# 選取單一值時,使用 at/iat 比 loc/iloc 更快
# loc 方式
start = time.time()
for i in range(1000):
_ = df.loc['row_500', 'a']
print(f"loc 選取單一值: {time.time() - start:.3f} 秒")
# at 方式
start = time.time()
for i in range(1000):
_ = df.at['row_500', 'a']
print(f"at 選取單一值: {time.time() - start:.3f} 秒")
# iloc 方式
start = time.time()
for i in range(1000):
_ = df.iloc[500, 0]
print(f"iloc 選取單一值: {time.time() - start:.3f} 秒")
# iat 方式
start = time.time()
for i in range(1000):
_ = df.iat[500, 0]
print(f"iat 選取單一值: {time.time() - start:.3f} 秒")
使用 eval 和 query 方法可以在某些情況下提升效能,特別是對於複雜的條件運算。這些方法使用 numexpr 套件進行最佳化,可以避免建立中間結果。
import pandas as pd
import numpy as np
# 建立測試資料
n_rows = 1000000
df = pd.DataFrame({
'a': np.random.randn(n_rows),
'b': np.random.randn(n_rows),
'c': np.random.randn(n_rows),
'd': np.random.randn(n_rows)
})
# 一般方式
%timeit df['e'] = df['a'] + df['b'] * df['c'] - df['d']
# 使用 eval 方式
# eval 會解析字串表達式並進行最佳化
%timeit df.eval('e = a + b * c - d', inplace=True)
# 使用 query 進行條件篩選
# 比一般的布林索引更快,語法也更清晰
%timeit df_filtered = df[(df['a'] > 0) & (df['b'] < 0) & (df['c'] > 0.5)]
%timeit df_filtered = df.query('a > 0 and b < 0 and c > 0.5')
高效能儲存格式
對於需要頻繁讀寫的大型資料集,選擇適當的儲存格式可以大幅提升效能。Parquet 和 Feather 是兩種常用的高效能格式,它們都支援欄位壓縮和快速隨機存取。
import pandas as pd
import numpy as np
import time
# 建立測試資料
n_rows = 1000000
df = pd.DataFrame({
'id': np.arange(n_rows),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
'value1': np.random.randn(n_rows),
'value2': np.random.randn(n_rows),
'timestamp': pd.date_range('2024-01-01', periods=n_rows, freq='s')
})
# 將類別欄位轉換為 Categorical 型態
df['category'] = df['category'].astype('category')
# 比較不同格式的讀寫效能
# CSV 格式
start = time.time()
df.to_csv('data/test.csv', index=False)
csv_write_time = time.time() - start
start = time.time()
df_csv = pd.read_csv('data/test.csv')
csv_read_time = time.time() - start
# Parquet 格式
# Parquet 是一種列式儲存格式,對於分析查詢非常有效
start = time.time()
df.to_parquet('data/test.parquet', index=False, compression='snappy')
parquet_write_time = time.time() - start
start = time.time()
df_parquet = pd.read_parquet('data/test.parquet')
parquet_read_time = time.time() - start
# Feather 格式
# Feather 專為快速序列化設計,適合 Python 和 R 之間的資料交換
start = time.time()
df.to_feather('data/test.feather')
feather_write_time = time.time() - start
start = time.time()
df_feather = pd.read_feather('data/test.feather')
feather_read_time = time.time() - start
# 輸出比較結果
import os
print("格式比較:")
print(f"{'格式':<10} {'寫入時間':<12} {'讀取時間':<12} {'檔案大小':<12}")
print("-" * 46)
print(f"{'CSV':<10} {csv_write_time:<12.3f} {csv_read_time:<12.3f} {os.path.getsize('data/test.csv')/1024**2:<12.1f} MB")
print(f"{'Parquet':<10} {parquet_write_time:<12.3f} {parquet_read_time:<12.3f} {os.path.getsize('data/test.parquet')/1024**2:<12.1f} MB")
print(f"{'Feather':<10} {feather_write_time:<12.3f} {feather_read_time:<12.3f} {os.path.getsize('data/test.feather')/1024**2:<12.1f} MB")
Parquet 格式特別適合需要選取特定欄位的分析場景,因為它是列式儲存,可以只讀取需要的欄位而不必讀取整個檔案。
import pandas as pd
# 只讀取特定欄位
# 這比讀取整個檔案後再選取欄位快很多
df_selected = pd.read_parquet(
'data/test.parquet',
columns=['id', 'value1']
)
# 使用 pyarrow 進行更進階的操作
import pyarrow.parquet as pq
# 讀取 Parquet 檔案的元資料
parquet_file = pq.ParquetFile('data/test.parquet')
print(f"行數: {parquet_file.metadata.num_rows}")
print(f"欄數: {parquet_file.metadata.num_columns}")
print(f"列群組數: {parquet_file.metadata.num_row_groups}")
# 讀取特定的列群組
# 這在處理超大型檔案時很有用
table = parquet_file.read_row_group(0, columns=['id', 'value1'])
df_row_group = table.to_pandas()
結語
Pandas 是一個功能強大且靈活的資料處理工具,但要充分發揮其潛力,需要深入了解它的內部運作機制和最佳實踐。本文介紹了資料載入最佳化、記憶體管理、Excel 進階處理、資料庫整合以及效能調校等主題,這些都是實際工作中經常會遇到的挑戰。
在處理大型資料集時,分塊處理和記憶體最佳化是關鍵策略。透過指定適當的資料型態、使用 Categorical 型態處理低基數欄位、以及選擇高效能的儲存格式,可以顯著減少記憶體使用並提升處理速度。向量化運算則是效能最佳化的核心原則,應該盡量避免使用 Python 迴圈逐行處理資料。
隨著資料量的持續增長,Pandas 的這些進階技巧變得越來越重要。同時,對於超大規模的資料處理需求,也可以考慮使用 Dask、Vaex 或 Polars 等平行運算框架,它們提供了類似 Pandas 的 API,但能夠處理超出記憶體容量的資料集。掌握 Pandas 的基礎和進階技巧,將為學習這些更進階的工具奠定堅實的基礎。