Python 的非同步魔法:asyncio 的世界
在現代軟體開發中,高效處理 I/O 密集型任務至關重要。Python 的 asyncio
函式庫提供了一種優雅而強大的非同步程式設計方式,從而實作更高的併發性和回應速度。本文將探討 asyncio
的核心概念,並結合實際案例講解其應用。
非同步程式設計基礎:併發、平行與多工
在深入 asyncio
之前,有必要釐清幾個關鍵概念:
- 併發 (Concurrency): 指在同一時間段內處理多個任務,但同一時刻只執行一個任務。這就像一位廚師在多個爐灶之間來回切換,每個爐灶都烹飪不同的菜餚,但同一時刻廚師只操作一個爐灶。
- 平行 (Parallelism): 指在同一時刻同時執行多個任務。這就像多位廚師同時在不同的爐灶上烹飪菜餚。
- 多工 (Multitasking): 作業系統執行多個任務的能力,可以是併發或平行。
asyncio
利用協作式多工處理,允許多個任務分享同一個執行緒,透過任務之間的協作來實作併發。
asyncio 的核心元件:事件迴圈、協程與任務
asyncio
的核心是事件迴圈,它負責排程和執行非同步任務。協程是特殊的函式,可以使用 async
和 await
關鍵字定義,允許任務暫停和還原執行,從而實作非阻塞操作。任務是對協程的封裝,可以被事件迴圈排程執行。
import asyncio
async def my_coroutine():
print("協程開始執行")
await asyncio.sleep(1) # 模擬 I/O 操作
print("協程執行結束")
async def main():
task = asyncio.create_task(my_coroutine())
await task
asyncio.run(main())
內容解密:
這段程式碼定義了一個簡單的協程 my_coroutine
,它模擬了一個耗時 1 秒的 I/O 操作。main
函式使用 asyncio.create_task
建立了一個任務,並將協程 my_coroutine
封裝到任務中。await task
暫停 main
函式的執行,直到任務完成。asyncio.run
函式啟動事件迴圈並執行 main
函式。
asyncio 的應用:網路請求、資料函式庫操作
asyncio
可以應用於各種 I/O 密集型場景,例如網路請求、資料函式庫操作等。
import asyncio
import aiohttp
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["http://example.com", "http://google.com"]
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
內容解密:
這段程式碼使用 aiohttp
函式庫進行非同步網路請求。fetch_data
協程使用 aiohttp.ClientSession
傳送 GET 請求並傳迴回應文字。main
函式建立多個任務併發執行網路請求,並使用 asyncio.gather
等待所有任務完成。
CPU 密集型任務的處理
對於 CPU 密集型任務,asyncio
可以與多行程結合使用,以充分利用多核 CPU 的效能。 這部分會在後續文章中詳細講解。
asyncio 的優勢
- 單執行緒: 避免了多執行緒的複雜性和效能損耗。
- 非阻塞 I/O: 允許在等待 I/O 操作完成時執行其他任務,提高效率。
- 高效能: 在 I/O 密集型應用中表現出色。
深入理解 asyncio 的併發模型
graph LR A[事件迴圈] --> B(協程1); A --> C(協程2); A --> D(協程3);
圖示説明: 上圖展示了事件迴圈如何管理多個協程。事件迴圈驅動協程的執行,並在協程等待 I/O 操作時切換到其他協程,實作併發。
sequenceDiagram participant Client participant EventLoop participant Task1 participant Task2 Client->>EventLoop: 建立任務 activate EventLoop EventLoop->>Task1: 執行任務1 activate Task1 Task1->>EventLoop: 等待 I/O deactivate Task1 EventLoop->>Task2: 執行任務2 activate Task2 Task2->>EventLoop: 等待 I/O deactivate Task2 EventLoop->>Task1: I/O 完成 activate Task1 Task1->>EventLoop: 任務1 完成 deactivate Task1 EventLoop->>Task2: I/O 完成 activate Task2 Task2->>EventLoop: 任務2 完成 deactivate Task2 deactivate EventLoop
圖示説明: 上圖展示了事件迴圈如何排程多個任務併發執行。當任務遇到 I/O 操作時,會暫停執行並將控制權交回事件迴圈,事件迴圈會排程其他任務執行,直到 I/O 操作完成。
Python 併發程式設計的核心概念
在探討 asyncio 之前,我們先來瞭解 Python 併發程式設計的一些核心概念,包括 CPU 密集型工作和 I/O 密集型工作。
CPU 密集型工作
CPU 密集型工作指的是需要大量 CPU 計算的工作,例如:
- 影像處理
- 科學計算
- 機器學習模型訓練
import time
def cpu_bound_task(n):
start_time = time.time()
result = 0
for i in range(n):
result += i * i
end_time = time.time()
print(f"CPU 密集型工作耗時:{end_time - start_time} 秒")
return result
cpu_bound_task(10000000)
內容解密:
這個函式模擬 CPU 密集型工作,透過執行大量的計算來消耗 CPU 資源。
I/O 密集型工作
I/O 密集型工作指的是需要等待外部資源(例如網路、磁碟)的工作,例如:
- 網路請求
- 檔案讀寫
- 資料函式庫查詢
import asyncio
import aiohttp
async def io_bound_task(url):
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
await response.text()
end_time = time.time()
print(f"I/O 密集型工作耗時:{end_time - start_time} 秒")
async def main():
await io_bound_task("https://www.google.com")
asyncio.run(main())
內容解密:
這個函式模擬 I/O 密集型工作,使用 aiohttp
進行非同步網路請求。
asyncio 的單執行緒併發模型
asyncio 是一個用於在 Python 中編寫單執行緒併發程式碼的函式庫。它使用協程、事件迴圈和非同步 I/O 來實作併發。
asyncio
為 Python 非同步程式設計提供了強大的工具,能夠顯著提升 I/O 密集型應用的效能。 透過理解事件迴圈、協程和任務等核心概念,並結合實際應用場景,開發者可以構建高效能、高回應的非同步應用。 後續文章將探討 asyncio 的更多特性、高階應用以及與多行程的結合使用。
非同步 I/O:併發程式設計的 Python 之道
Python 提供了多種方法來實作併發 I/O,例如多執行緒、多行程以及 asyncio
。asyncio
代表非同步 I/O,允許我們使用非同步程式設計模型執行程式碼,從而同時處理多個 I/O 操作,並保持應用程式的回應能力。
非同步程式設計意味著一個長時間執行的任務可以在後台獨立執行,而無需阻塞主應用程式。 系統可以在等待任務完成的同時執行其他工作,並在任務完成後收到通知。
asyncio
使用 async
和 await
關鍵字定義和管理協程,協程是一種可以在等待操作完成時暫停並稍後還原的函式。 asyncio
利用單執行緒事件迴圈併發模型來執行這些協程。
I/O 密集型與 CPU 密集型
I/O 密集型操作主要受 I/O 裝置速度的限制,例如網路請求或檔案讀寫。 CPU 密集型操作主要受 CPU 處理能力的限制,例如複雜的計算或資料處理。 選擇合適的併發模型取決於操作的型別。
import os
import threading
import multiprocessing
import time
# CPU 密集型任務
def cpu_bound_task(n):
result = 0
for i in range(n):
result += i * i
return result
# 多執行緒版本
def multithreaded_version(n, num_threads):
threads = []
for _ in range(num_threads):
thread = threading.Thread(target=cpu_bound_task, args=(n,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# 多程式版本
def multiprocess_version(n, num_processes):
processes = []
for _ in range(num_processes):
process = multiprocessing.Process(target=cpu_bound_task, args=(n,))
processes.append(process)
process.start()
for process in processes:
process.join()
if __name__ == "__main__":
n = 10000000
num_threads = 4
num_processes = 4
start_time = time.time()
multithreaded_version(n, num_threads)
end_time = time.time()
print(f"多執行緒版本耗時:{end_time - start_time:.4f} 秒")
start_time = time.time()
multiprocess_version(n, num_processes)
end_time = time.time()
print(f"多程式版本耗時:{end_time - start_time:.4f} 秒")
這段程式碼比較了多執行緒和多程式在 CPU 密集型任務上的效能差異。cpu_bound_task
函式模擬一個 CPU 密集型任務。multithreaded_version
使用多執行緒執行該任務,而 multiprocess_version
使用多程式執行。透過比較兩者的執行時間,可以看出多程式版本通常比多執行緒版本更快,尤其是在多核心處理器上。
graph LR subgraph 多執行緒 A[主執行緒] --> B(GIL) C[執行緒 1] --> B D[執行緒 2] --> B end subgraph 多程式 A1[父程式 - GIL] --> B1[子程式 1 - GIL] A1 --> C1[子程式 2 - GIL] end
圖 1.3 多執行緒模型中,所有執行緒分享同一個 GIL,而多程式模型中,每個程式都有自己的 GIL。
透過以上程式碼和圖表,我們可以更清楚地理解多執行緒和多程式的差異,以及 GIL 對 Python 程式效能的影響。選擇哪種併發模型取決於任務的性質:I/O 密集型任務適合多執行緒,而 CPU 密集型任務適合多程式。
在 Python 的併發程式設計中,理解 GIL 的限制至關重要。對於 CPU 密集型任務,多程式是更有效的選擇,因為它可以繞過 GIL 的限制,充分利用多核心處理器的優勢。而對於 I/O 密集型任務,多執行緒仍然是一個不錯的選擇。
選擇正確的併發模型可以顯著提升程式效能。深入理解 GIL 的運作機制,以及多執行緒和多程式的特性,是 Python 開發者的必備技能。
此外,非同步程式設計模型(例如使用 asyncio
)也提供了一種高效的併發解決方案,尤其適用於 I/O 密集型任務。非同步程式設計允許多個任務在單執行緒內併發執行,避免了 GIL 的限制,並且具有輕量級、高效能等優點。
import time
import threading
import multiprocessing
import os
def fibonacci(n: int) -> int:
if n <= 1:
return n
else:
return fibonacci(n-1) + fibonacci(n-2)
def display_fibonacci(number: int) -> None:
print(f"Fibonacci({number}) = {fibonacci(number)}")
def no_threading_fibonacci():
display_fibonacci(40)
display_fibonacci(41)
def threaded_fibonacci():
thread_40 = threading.Thread(target=display_fibonacci, args=(40,))
thread_41 = threading.Thread(target=display_fibonacci, args=(41,))
thread_40.start()
thread_41.start()
thread_40.join()
thread_41.join()
def process_hello():
print(f"Greetings from child process {os.getpid()}!")
def multiprocess_fibonacci():
process_40 = multiprocessing.Process(target=display_fibonacci, args=(40,))
process_41 = multiprocessing.Process(target=display_fibonacci, args=(41,))
process_40.start()
process_41.start()
process_40.join()
process_41.join()
time_start = time.time()
no_threading_fibonacci()
time_end = time.time()
print(f"Single thread execution time: {time_end - time_start:.4f} seconds")
time_threaded_start = time.time()
threaded_fibonacci()
time_threaded_end = time.time()
print(f"Multi-thread execution time: {time_threaded_end - time_threaded_start:.4f} seconds")
time_process_start = time.time()
multiprocess_fibonacci()
time_process_end = time.time()
print(f"Multi-process execution time: {time_process_end - time_process_start:.4f} seconds")
if __name__ == '__main__':
greeting_process = multiprocessing.Process(target=process_hello)
greeting_process.start()
print(f"Greetings from parent process {os.getpid()}")
greeting_process.join()
這段程式碼示範了在 Python 中計算 Fibonacci 數列的三種不同方法:單執行緒、多執行緒和多程式,並比較了它們的執行時間。Fibonacci 函式本身是一個計算密集型操作,使用遞迴方式計算。display_fibonacci 函式則負責印出計算結果。
首先,no_threading_fibonacci
函式以單執行緒方式依序計算並顯示 Fibonacci(40) 和 Fibonacci(41)。接著,threaded_fibonacci
函式使用兩個執行緒平行計算 Fibonacci(40) 和 Fibonacci(41)。最後,multiprocess_fibonacci
函式使用兩個程式平行執行相同的計算。
Python 的 GIL 與多程式的優勢
Python 的全域性直譯器鎖 (GIL) 限制了多執行緒在 CPU 密集型任務中的效能提升。由於 GIL 的存在,同一時間只有一個執行緒可以持有 Python 直譯器的控制權,即使在多核心處理器上也是如此。因此,對於 CPU 密集型任務,多執行緒的效能提升有限,甚至可能比單執行緒更慢。
相對地,多程式可以繞過 GIL 的限制。每個程式都有自己獨立的直譯器和記憶體空間,可以在不同的 CPU 核心上真正平行執行,從而充分利用多核心處理器的效能。
玄貓的效能調校心法
- 針對 I/O 密集型任務,多執行緒是提高效率的利器。
- 對於 CPU 密集型任務,多程式才能釋放多核心的威力。
- 避免在多執行緒中分享大量資料,因為 GIL 會造成效能瓶頸。
- 使用效能分析工具找出程式碼的效能瓶頸,例如 cProfile。
- 考慮使用 Cython 或 Numba 等工具將 Python 程式碼編譯成機器碼,以提升效能。
透過理解 GIL 的特性並善用多程式,才能讓 Python 程式碼在多核心時代發揮最佳效能。
藉由比較三種方法的執行時間,我們可以清楚地看到多程式在 CPU 密集型任務中的效能優勢。這段程式碼也提供了一個實際案例,説明如何根據任務型別選擇合適的平行處理方式。