Python 的平行與非同步之路:GIL 的影響
在探討非同步 I/O 之前,讓我們先了解 Python 的全域性直譯器鎖(GIL)如何影響多執行緒的效能。
GIL 的效能瓶頸:費氏數列案例分析
以下程式碼展示了計算費氏數列的多執行緒版本:
import time
import threading
def fib(n):
if n < 2:
return 1
return fib(n - 1) + fib(n - 2)
start = time.time()
threads = []
for i in range(40, 42):
thread = threading.Thread(target=fib, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
end = time.time()
print(f"多執行緒計算耗時:{end - start:.4f} 秒")
這段程式碼使用多執行緒計算 fib(40)
和 fib(41)
。然而,由於 GIL 的存在,多執行緒版本並沒有顯著提升效能,甚至可能更慢。GIL 限制了同一時間只有一個執行緒可以執行 Python 程式碼,其他執行緒只能等待。
GIL 的釋放時機與 I/O 密集型任務
GIL 並非永遠持有,它會在 I/O 操作期間釋放。這意味著我們可以利用多執行緒來處理 I/O 密集型任務,但對於 CPU 密集型任務,多執行緒的優勢並不明顯。
以下程式碼展示了同步讀取網頁狀態碼的範例:
import time
import requests
def read_example():
response = requests.get('https://www.example.com')
print(response.status_code)
sync_start = time.time()
read_example()
read_example()
sync_end = time.time()
print(f'同步執行耗時:{sync_end - sync_start:.4f} 秒')
這段程式碼兩次讀取 example.com
的狀態碼。由於是同步執行,第二次讀取必須等待第一次讀取完成。
接著,我們使用多執行緒版本:
import time
import threading
import requests
def read_example():
response = requests.get('https://www.example.com')
print(response.status_code)
thread_1 = threading.Thread(target=read_example)
thread_2 = threading.Thread(target=read_example)
thread_start = time.time()
thread_1.start()
thread_2.start()
print('所有執行緒已啟動!')
thread_1.join()
thread_2.join()
thread_end = time.time()
print(f'多執行緒執行耗時:{thread_end - thread_start:.4f} 秒')
這段程式碼使用兩個執行緒同時讀取 example.com
的狀態碼。由於 I/O 操作會釋放 GIL,因此兩個執行緒可以平行執行,從而提高效率。
單執行緒平行:asyncio 的奧秘
asyncio
利用 I/O 操作釋放 GIL 的特性,在單執行緒內實作平行。asyncio
使用協程(coroutine)來模擬輕量級執行緒。每個協程可以執行自己的 I/O 操作,當等待 I/O 完成時,事件迴圈可以切換到其他協程執行,從而實作平行。
深入理解單執行緒平行:非阻塞 Socket 的魔力
單執行緒平行並不需要多執行緒,它可以透過非阻塞 Socket 實作。非阻塞 Socket 允許應用程式在傳送或接收資料時無需等待,可以繼續執行其他任務。作業系統會通知應用程式何時可以讀取資料。
什麼是 Socket?
Socket 是一種低階抽象,用於在網路中傳送和接收資料。它支援兩個主要操作:傳送位元組和接收位元組。預設情況下,Socket 是阻塞的,這意味著當應用程式等待伺服器回覆資料時,它會停止執行其他任務,直到收到資料、發生錯誤或超時。
然而,Socket 也可以以非阻塞模式運作。在非阻塞模式下,應用程式可以傳送或接收資料而無需等待,作業系統會在資料準備好時通知應用程式。這使得應用程式可以在等待資料的同時執行其他任務。
graph LR B[B] A[客戶端] --> B{Socket}; B --> C[伺服器]; C --> B; B --> A;
這張圖展示了客戶端和伺服器之間透過 Socket 進行資料交換的過程。客戶端將請求傳送到 Socket,Socket 將請求傳送到伺服器。伺服器處理請求後將結果傳回給 Socket,Socket 再將結果傳回給客戶端。
非阻塞 Socket 的使用讓單執行緒平行成為可能,這也是 asyncio
的核心機制。
asyncio 的事件迴圈機制
asyncio 能夠巧妙地切換不同的事件通知系統,以適應不同的作業系統環境。這些系統包括 kqueue (FreeBSD 和 MacOS)、epoll (Linux) 以及 IOCP (Windows)。它們負責追蹤非阻塞通訊端,並在通訊端準備就緒時通知應用程式。
asyncio 採用單執行緒併發模型。當遇到 I/O 操作時,asyncio 將其交給作業系統的事件通知系統處理,Python 執行緒便可繼續執行其他程式碼。I/O 操作完成後,系統會通知 asyncio,然後繼續執行後續的 Python 程式碼。
事件迴圈:asyncio 的核心
事件迴圈是 asyncio 應用的核心。它維護一個任務佇列,任務是 coroutine 的包裝器。Coroutine 可以在遇到 I/O 繫結操作時暫停執行,讓事件迴圈執行其他不等待 I/O 操作的任務。
graph LR B[B] A[主執行緒] --> B{事件迴圈}; B --> C[任務 1]; B --> D[任務 2]; B --> E[任務 3]; C --> F[I/O 操作 1]; D --> G[I/O 操作 2]; E --> H[I/O 操作 3]; F --> I[作業系統]; G --> I; H --> I; I --> B;
主執行緒將任務提交到事件迴圈,事件迴圈執行任務,遇到 I/O 操作則交給作業系統處理,作業系統完成後通知事件迴圈,事件迴圈喚醒對應的任務。
asyncio 執行流程模擬
假設我們有三個任務,每個任務都傳送一個非同步網頁請求。這些任務有一些 CPU 繫結的設定程式碼,然後傳送網頁請求,最後執行一些 CPU 繫結的後處理程式碼。
由於是單執行緒,只有第一個任務開始執行程式碼。當任務 1 的 CPU 繫結設定工作完成後,它遇到 I/O 繫結操作,並暫停自身。此時,任務 2 可以開始執行。任務 2 也會在遇到 I/O 操作後暫停。接著,任務 3 開始執行。
現在假設當任務 3 暫停以等待其 I/O 完成時,任務 1 的網頁請求已完成。作業系統會通知事件迴圈,事件迴圈喚醒任務 1,任務 1 繼續執行。
透過事件迴圈機制,asyncio 能夠有效地管理併發任務,在單執行緒環境下實作高效的 I/O 操作。
認識 Coroutine
Coroutine 就像擁有特殊能力的 Python 函式,它可以在執行過程中暫停,等待耗時操作完成後再繼續執行。
使用 async
關鍵字建立 Coroutine
async def my_coroutine():
print('Hello world!')
這個 Coroutine 只是簡單地印出 “Hello world!",並沒有執行任何耗時操作。
讓我們比較一下普通函式和 Coroutine 的區別:
import asyncio
async def coroutine_add_one(number: int) -> int:
return number + 1
def add_one(number: int) -> int:
return number + 1
function_result = add_one(1)
coroutine_result = coroutine_add_one(1)
print(f'函式結果:{function_result},型別:{type(function_result)}')
print(f'Coroutine 結果:{coroutine_result},型別:{type(coroutine_result)}')
result = asyncio.run(coroutine_add_one(1))
print(f'asyncio.run 執行結果:{result}')
普通函式 add_one
直接傳回結果 2,而 coroutine_add_one
傳回的是一個 Coroutine 物件。要執行 Coroutine,需要使用 asyncio.run()
。
使用 await
關鍵字暫停執行
await
關鍵字用於暫停 Coroutine 的執行,等待耗時操作完成。
import asyncio
async def add_one(number: int) -> int:
return number + 1
async def main():
one_plus_one = await add_one(1)
two_plus_one = await add_one(2)
print(one_plus_one)
print(two_plus_one)
asyncio.run(main())
main()
函式中,await add_one(1)
會暫停執行,等待 add_one(1)
完成並傳回結果。然後,await add_one(2)
也會執行相同的操作。
本文探討了 Python 的 GIL、asyncio 的事件迴圈機制以及 Coroutine 的基本概念。希望透過玄貓的講解,能幫助您更好地理解 Python 的非同步程式設計。
```python
import asyncio
async def hello_world_message():
await asyncio.sleep(1)
return "Hello World!"
async def main():
message = await hello_world_message()
print(message)
asyncio.run(main())
這段程式碼示範了 asyncio 中協程的基礎用法。hello_world_message()
是一個協程,使用 await asyncio.sleep(1)
模擬耗時一秒的操作。main()
函式也是一個協程,它呼叫 hello_world_message()
並等待其傳回結果。asyncio.run(main())
啟動事件迴圈並執行 main()
協程。
import asyncio
async def delay(delay_seconds: int) -> int:
print(f"開始等待 {delay_seconds} 秒")
await asyncio.sleep(delay_seconds)
print(f"等待 {delay_seconds} 秒結束")
return delay_seconds
delay()
函式是一個協程,它接受一個整數 delay_seconds
作為引數,表示要等待的秒數。它使用 asyncio.sleep()
模擬耗時操作,並在等待前後印出訊息。
import asyncio
from util import delay # 假設 util.py 中包含 delay 函式
async def main():
sleep_for_three = asyncio.create_task(delay(3))
print(type(sleep_for_three)) # 輸出: <class '_asyncio.Task'>
result = await sleep_for_three
print(result)
asyncio.run(main())
這段程式碼示範瞭如何使用 asyncio.create_task()
建立任務。delay(3)
協程被包裝成一個任務 sleep_for_three
。print(type(sleep_for_three))
顯示 sleep_for_three
的型別是 _asyncio.Task
。await sleep_for_three
等待任務完成並取得結果。
sequenceDiagram participant main participant delay_task main->>delay_task: asyncio.create_task(delay(3)) activate delay_task main->>console: print(type(sleep_for_three)) delay_task->>delay_task: await asyncio.sleep(3) delay_task-->>main: return 3 deactivate delay_task main->>console: print(result)
圖表説明: 這個時序圖描述了 main()
函式和 delay
任務的執行流程。main()
函式建立 delay
任務後,會繼續執行 print
陳述式,而不會阻塞等待 delay
任務完成。
import asyncio
from util import delay
async def main():
task1 = asyncio.create_task(delay(3))
task2 = asyncio.create_task(delay(3))
task3 = asyncio.create_task(delay(3))
await task1
await task2
await task3
asyncio.run(main())
這段程式碼示範瞭如何併發執行多個任務。三個 delay(3)
協程被包裝成任務並併發執行。儘管每個任務都需要 3 秒才能完成,但由於它們是併發執行的,因此總執行時間大約是 3 秒,而不是 9 秒。
gantt title 併發執行任務; dateFormat HH:mm:ss; axisFormat %H:%M:%S; section Task 1; delay(3) :a1, 00:00:00, 3s section Task 2; delay(3) :a2, 00:00:00, 3s section Task 3; delay(3) :a3, 00:00:00, 3s
圖表説明: 這個甘特圖展示了三個任務的併發執行情況。它們幾乎同時開始和結束,總執行時間約為 3 秒。
import asyncio
from asyncio import CancelledError
from util import delay
async def main():
long_task = asyncio.create_task(delay(10))
seconds_elapsed = 0
while not long_task.done():
print('任務尚未完成,一秒後再次檢查。')
await asyncio.sleep(1)
seconds_elapsed += 1
if seconds_elapsed == 5:
long_task.cancel()
try:
await long_task
except CancelledError:
print('任務已取消')
asyncio.run(main())
這段程式碼示範瞭如何取消任務。long_task
是一個執行 delay(10)
的任務。程式每秒檢查一次 long_task
是否完成。如果 5 秒後 long_task
仍未完成,則會被取消。try...except
區塊用於捕捉 CancelledError
異常。
透過 asyncio,我們可以有效地管理併發任務,提升 Python 程式的效能。理解任務的建立、執行、取消和超時控制,是掌握 asyncio 關鍵的第一步。
任務取消的探討
在Python的asyncio中,取消任務的機制並非立即生效。呼叫cancel()
方法後,任務並不會突然中止,而是在執行到下一個await
表示式時才會引發CancelledError
異常。這意味著如果任務正在執行一段普通的Python程式碼,即使呼叫了cancel()
,這段程式碼也會繼續執行直到遇到await
或結束。
更優雅的逾時處理:asyncio.wait_for
相較於每隔一段時間檢查任務狀態再取消,asyncio.wait_for
提供了更簡潔的逾時處理機制。它接受一個協程或任務物件以及逾時時間,如果任務在指定時間內未完成,就會引發TimeoutError
並自動取消任務。
以下範例示範一個任務預計執行2秒,但我們設定了1秒的逾時:
import asyncio
from util import delay # delay為自定義的延遲函式
async def main():
delay_task = asyncio.create_task(delay(2))
try:
result = await asyncio.wait_for(delay_task, timeout=1)
print(result)
except asyncio.exceptions.TimeoutError:
print('逾時!')
print(f'任務是否已取消? {delay_task.cancelled()}')
asyncio.run(main())
輸出結果顯示任務在逾時後被成功取消:
正在休眠 2 秒...
逾時!
任務是否已取消? True
阻止任務取消:asyncio.shield
有時我們不希望任務在逾時後被取消,例如,只想通知使用者任務執行時間過長,但仍允許其繼續執行。此時可以使用asyncio.shield
函式來保護任務免受取消。
import asyncio
from util import delay
async def main():
task = asyncio.create_task(delay(10)) # 建立任務,模擬長時間操作
try:
await asyncio.wait_for(asyncio.shield(task), 5) # 使用shield保護任務
print(result)
except TimeoutError:
print("任務執行時間超過五秒,但它很快就會完成!")
result = await task # 等待任務完成
print(result)
asyncio.run(main())
asyncio.shield(task)
保護了 task
不被取消,即使 wait_for
超時,task
依然會繼續執行直到完成。
graph LR B[B] No[No] Yes[Yes] A[建立 Task] --> B{wait_for 超時?}; B -- Yes --> C[印出提示訊息]; C --> D[等待 Task 完成]; B -- No --> D; D --> E[程式結束];
解析 asyncio 的核心概念:Future、Task、Coroutine 和 Awaitable
Future:對未來結果的承諾
Future 物件代表一個未來可能獲得的值,它一開始處於未完成狀態,直到結果被設定。
from asyncio import Future
my_future = Future()
print(f'my_future 是否已完成? {my_future.done()}') # False
my_future.set_result(42)
print(f'my_future 是否已完成? {my_future.done()}') # True
print(f'my_future 的結果是什麼? {my_future.result()}') # 42
await
與 Future
await future
表示暫停執行,直到 future 的值被設定。
from asyncio import Future
import asyncio
# ... (先前程式碼)
async def main():
future = make_request()
print(f'Future 是否已完成? {future.done()}') # False
value = await future # 等待 future 完成
print(f'Future 是否已完成? {future.done()}') # True
print(value) # 42
asyncio.run(main())
Task:Coroutine 的執行載體
Task 結合了 coroutine 和 future。它執行 coroutine,並將結果儲存在 future 中。
Awaitable:可被 await
的物件
Awaitable 是 coroutine、future 和 task 的共同抽象基礎類別,任何實作了 __await__
方法的物件都是 awaitable。
graph TD Awaitable[Awaitable] Coroutine[Coroutine] Future[Future] Task[Task] Awaitable --> Coroutine Awaitable --> Future Future --> Task
使用裝飾器測量 Coroutine 執行時間
async_timed
裝飾器可以方便地測量 coroutine 的執行時間:
import functools
import time
from typing import Callable, Any
def async_timed():
def wrapper(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapped(*args, **kwargs) -> Any:
print(f'開始執行 {func.__name__},引數:{args} {kwargs}')
start = time.time()
try:
return await func(*args, **kwargs)
finally:
end = time.time()
total = end - start
print(f'完成 {func.__name__},耗時:{total:.4f} 秒')
return wrapped
return wrapper
# 使用範例
@async_timed()
async def my_coroutine():
# ...
pass
async_timed
裝飾器利用 functools.wraps
保留了原函式的名稱和檔案字串。它記錄了 coroutine 的開始和結束時間,並計算總執行時間。
這個裝飾器可以應用於任何 coroutine,提供便捷的效能分析。
透過以上説明,我們更深入地理解了 asyncio 中任務取消、逾時處理以及核心概念之間的關係,為構建更穩健、高效的非同步應用程式奠定了基礎。