Python 的平行與非同步之路:GIL 的影響

在探討非同步 I/O 之前,讓我們先了解 Python 的全域性直譯器鎖(GIL)如何影響多執行緒的效能。

GIL 的效能瓶頸:費氏數列案例分析

以下程式碼展示了計算費氏數列的多執行緒版本:

import time
import threading

def fib(n):
    if n < 2:
        return 1
    return fib(n - 1) + fib(n - 2)

start = time.time()
threads = []
for i in range(40, 42):
    thread = threading.Thread(target=fib, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

end = time.time()
print(f"多執行緒計算耗時:{end - start:.4f} 秒")

這段程式碼使用多執行緒計算 fib(40)fib(41)。然而,由於 GIL 的存在,多執行緒版本並沒有顯著提升效能,甚至可能更慢。GIL 限制了同一時間只有一個執行緒可以執行 Python 程式碼,其他執行緒只能等待。

GIL 的釋放時機與 I/O 密集型任務

GIL 並非永遠持有,它會在 I/O 操作期間釋放。這意味著我們可以利用多執行緒來處理 I/O 密集型任務,但對於 CPU 密集型任務,多執行緒的優勢並不明顯。

以下程式碼展示了同步讀取網頁狀態碼的範例:

import time
import requests

def read_example():
    response = requests.get('https://www.example.com')
    print(response.status_code)

sync_start = time.time()
read_example()
read_example()
sync_end = time.time()
print(f'同步執行耗時:{sync_end - sync_start:.4f} 秒')

這段程式碼兩次讀取 example.com 的狀態碼。由於是同步執行,第二次讀取必須等待第一次讀取完成。

接著,我們使用多執行緒版本:

import time
import threading
import requests

def read_example():
    response = requests.get('https://www.example.com')
    print(response.status_code)

thread_1 = threading.Thread(target=read_example)
thread_2 = threading.Thread(target=read_example)
thread_start = time.time()
thread_1.start()
thread_2.start()
print('所有執行緒已啟動!')
thread_1.join()
thread_2.join()
thread_end = time.time()
print(f'多執行緒執行耗時:{thread_end - thread_start:.4f} 秒')

這段程式碼使用兩個執行緒同時讀取 example.com 的狀態碼。由於 I/O 操作會釋放 GIL,因此兩個執行緒可以平行執行,從而提高效率。

單執行緒平行:asyncio 的奧秘

asyncio 利用 I/O 操作釋放 GIL 的特性,在單執行緒內實作平行。asyncio 使用協程(coroutine)來模擬輕量級執行緒。每個協程可以執行自己的 I/O 操作,當等待 I/O 完成時,事件迴圈可以切換到其他協程執行,從而實作平行。

深入理解單執行緒平行:非阻塞 Socket 的魔力

單執行緒平行並不需要多執行緒,它可以透過非阻塞 Socket 實作。非阻塞 Socket 允許應用程式在傳送或接收資料時無需等待,可以繼續執行其他任務。作業系統會通知應用程式何時可以讀取資料。

什麼是 Socket?

Socket 是一種低階抽象,用於在網路中傳送和接收資料。它支援兩個主要操作:傳送位元組和接收位元組。預設情況下,Socket 是阻塞的,這意味著當應用程式等待伺服器回覆資料時,它會停止執行其他任務,直到收到資料、發生錯誤或超時。

然而,Socket 也可以以非阻塞模式運作。在非阻塞模式下,應用程式可以傳送或接收資料而無需等待,作業系統會在資料準備好時通知應用程式。這使得應用程式可以在等待資料的同時執行其他任務。

  graph LR
    B[B]
    A[客戶端] --> B{Socket};
    B --> C[伺服器];
    C --> B;
    B --> A;

這張圖展示了客戶端和伺服器之間透過 Socket 進行資料交換的過程。客戶端將請求傳送到 Socket,Socket 將請求傳送到伺服器。伺服器處理請求後將結果傳回給 Socket,Socket 再將結果傳回給客戶端。

非阻塞 Socket 的使用讓單執行緒平行成為可能,這也是 asyncio 的核心機制。

asyncio 的事件迴圈機制

asyncio 能夠巧妙地切換不同的事件通知系統,以適應不同的作業系統環境。這些系統包括 kqueue (FreeBSD 和 MacOS)、epoll (Linux) 以及 IOCP (Windows)。它們負責追蹤非阻塞通訊端,並在通訊端準備就緒時通知應用程式。

asyncio 採用單執行緒併發模型。當遇到 I/O 操作時,asyncio 將其交給作業系統的事件通知系統處理,Python 執行緒便可繼續執行其他程式碼。I/O 操作完成後,系統會通知 asyncio,然後繼續執行後續的 Python 程式碼。

事件迴圈:asyncio 的核心

事件迴圈是 asyncio 應用的核心。它維護一個任務佇列,任務是 coroutine 的包裝器。Coroutine 可以在遇到 I/O 繫結操作時暫停執行,讓事件迴圈執行其他不等待 I/O 操作的任務。

  graph LR
    B[B]
    A[主執行緒] --> B{事件迴圈};
    B --> C[任務 1];
    B --> D[任務 2];
    B --> E[任務 3];
    C --> F[I/O 操作 1];
    D --> G[I/O 操作 2];
    E --> H[I/O 操作 3];
    F --> I[作業系統];
    G --> I;
    H --> I;
    I --> B;

主執行緒將任務提交到事件迴圈,事件迴圈執行任務,遇到 I/O 操作則交給作業系統處理,作業系統完成後通知事件迴圈,事件迴圈喚醒對應的任務。

asyncio 執行流程模擬

假設我們有三個任務,每個任務都傳送一個非同步網頁請求。這些任務有一些 CPU 繫結的設定程式碼,然後傳送網頁請求,最後執行一些 CPU 繫結的後處理程式碼。

由於是單執行緒,只有第一個任務開始執行程式碼。當任務 1 的 CPU 繫結設定工作完成後,它遇到 I/O 繫結操作,並暫停自身。此時,任務 2 可以開始執行。任務 2 也會在遇到 I/O 操作後暫停。接著,任務 3 開始執行。

現在假設當任務 3 暫停以等待其 I/O 完成時,任務 1 的網頁請求已完成。作業系統會通知事件迴圈,事件迴圈喚醒任務 1,任務 1 繼續執行。

透過事件迴圈機制,asyncio 能夠有效地管理併發任務,在單執行緒環境下實作高效的 I/O 操作。

認識 Coroutine

Coroutine 就像擁有特殊能力的 Python 函式,它可以在執行過程中暫停,等待耗時操作完成後再繼續執行。

使用 async 關鍵字建立 Coroutine

async def my_coroutine():
    print('Hello world!')

這個 Coroutine 只是簡單地印出 “Hello world!",並沒有執行任何耗時操作。

讓我們比較一下普通函式和 Coroutine 的區別:

import asyncio

async def coroutine_add_one(number: int) -> int:
    return number + 1

def add_one(number: int) -> int:
    return number + 1

function_result = add_one(1)
coroutine_result = coroutine_add_one(1)

print(f'函式結果:{function_result},型別:{type(function_result)}')
print(f'Coroutine 結果:{coroutine_result},型別:{type(coroutine_result)}')

result = asyncio.run(coroutine_add_one(1))
print(f'asyncio.run 執行結果:{result}')

普通函式 add_one 直接傳回結果 2,而 coroutine_add_one 傳回的是一個 Coroutine 物件。要執行 Coroutine,需要使用 asyncio.run()

使用 await 關鍵字暫停執行

await 關鍵字用於暫停 Coroutine 的執行,等待耗時操作完成。

import asyncio

async def add_one(number: int) -> int:
    return number + 1

async def main():
    one_plus_one = await add_one(1)
    two_plus_one = await add_one(2)
    print(one_plus_one)
    print(two_plus_one)

asyncio.run(main())

main() 函式中,await add_one(1) 會暫停執行,等待 add_one(1) 完成並傳回結果。然後,await add_one(2) 也會執行相同的操作。

本文探討了 Python 的 GIL、asyncio 的事件迴圈機制以及 Coroutine 的基本概念。希望透過玄貓的講解,能幫助您更好地理解 Python 的非同步程式設計。

```python
import asyncio

async def hello_world_message():
    await asyncio.sleep(1)
    return "Hello World!"

async def main():
    message = await hello_world_message()
    print(message)

asyncio.run(main())

這段程式碼示範了 asyncio 中協程的基礎用法。hello_world_message() 是一個協程,使用 await asyncio.sleep(1) 模擬耗時一秒的操作。main() 函式也是一個協程,它呼叫 hello_world_message() 並等待其傳回結果。asyncio.run(main()) 啟動事件迴圈並執行 main() 協程。

import asyncio

async def delay(delay_seconds: int) -> int:
    print(f"開始等待 {delay_seconds} 秒")
    await asyncio.sleep(delay_seconds)
    print(f"等待 {delay_seconds} 秒結束")
    return delay_seconds

delay() 函式是一個協程,它接受一個整數 delay_seconds 作為引數,表示要等待的秒數。它使用 asyncio.sleep() 模擬耗時操作,並在等待前後印出訊息。

import asyncio
from util import delay  # 假設 util.py 中包含 delay 函式

async def main():
    sleep_for_three = asyncio.create_task(delay(3))
    print(type(sleep_for_three))  # 輸出: <class '_asyncio.Task'>
    result = await sleep_for_three
    print(result)

asyncio.run(main())

這段程式碼示範瞭如何使用 asyncio.create_task() 建立任務。delay(3) 協程被包裝成一個任務 sleep_for_threeprint(type(sleep_for_three)) 顯示 sleep_for_three 的型別是 _asyncio.Taskawait sleep_for_three 等待任務完成並取得結果。

  sequenceDiagram
    participant main
    participant delay_task

    main->>delay_task: asyncio.create_task(delay(3))
    activate delay_task
    main->>console: print(type(sleep_for_three))
    delay_task->>delay_task: await asyncio.sleep(3)
    delay_task-->>main: return 3
    deactivate delay_task
    main->>console: print(result)

圖表説明: 這個時序圖描述了 main() 函式和 delay 任務的執行流程。main() 函式建立 delay 任務後,會繼續執行 print 陳述式,而不會阻塞等待 delay 任務完成。

import asyncio
from util import delay

async def main():
    task1 = asyncio.create_task(delay(3))
    task2 = asyncio.create_task(delay(3))
    task3 = asyncio.create_task(delay(3))

    await task1
    await task2
    await task3

asyncio.run(main())

這段程式碼示範瞭如何併發執行多個任務。三個 delay(3) 協程被包裝成任務並併發執行。儘管每個任務都需要 3 秒才能完成,但由於它們是併發執行的,因此總執行時間大約是 3 秒,而不是 9 秒。

  gantt
    title 併發執行任務;
    dateFormat  HH:mm:ss;
    axisFormat  %H:%M:%S;
    section Task 1;
    delay(3)       :a1, 00:00:00, 3s
    section Task 2;
    delay(3)       :a2, 00:00:00, 3s
    section Task 3;
    delay(3)       :a3, 00:00:00, 3s

圖表説明: 這個甘特圖展示了三個任務的併發執行情況。它們幾乎同時開始和結束,總執行時間約為 3 秒。

import asyncio
from asyncio import CancelledError
from util import delay

async def main():
    long_task = asyncio.create_task(delay(10))
    seconds_elapsed = 0

    while not long_task.done():
        print('任務尚未完成,一秒後再次檢查。')
        await asyncio.sleep(1)
        seconds_elapsed += 1

        if seconds_elapsed == 5:
            long_task.cancel()
            try:
                await long_task
            except CancelledError:
                print('任務已取消')

asyncio.run(main())

這段程式碼示範瞭如何取消任務。long_task 是一個執行 delay(10) 的任務。程式每秒檢查一次 long_task 是否完成。如果 5 秒後 long_task 仍未完成,則會被取消。try...except 區塊用於捕捉 CancelledError 異常。

透過 asyncio,我們可以有效地管理併發任務,提升 Python 程式的效能。理解任務的建立、執行、取消和超時控制,是掌握 asyncio 關鍵的第一步。

任務取消的探討

在Python的asyncio中,取消任務的機制並非立即生效。呼叫cancel()方法後,任務並不會突然中止,而是在執行到下一個await表示式時才會引發CancelledError異常。這意味著如果任務正在執行一段普通的Python程式碼,即使呼叫了cancel(),這段程式碼也會繼續執行直到遇到await或結束。

更優雅的逾時處理:asyncio.wait_for

相較於每隔一段時間檢查任務狀態再取消,asyncio.wait_for 提供了更簡潔的逾時處理機制。它接受一個協程或任務物件以及逾時時間,如果任務在指定時間內未完成,就會引發TimeoutError並自動取消任務。

以下範例示範一個任務預計執行2秒,但我們設定了1秒的逾時:

import asyncio
from util import delay  # delay為自定義的延遲函式

async def main():
    delay_task = asyncio.create_task(delay(2))
    try:
        result = await asyncio.wait_for(delay_task, timeout=1)
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('逾時!')
        print(f'任務是否已取消? {delay_task.cancelled()}')

asyncio.run(main())

輸出結果顯示任務在逾時後被成功取消:

正在休眠 2 秒...
逾時!
任務是否已取消? True

阻止任務取消:asyncio.shield

有時我們不希望任務在逾時後被取消,例如,只想通知使用者任務執行時間過長,但仍允許其繼續執行。此時可以使用asyncio.shield函式來保護任務免受取消。

import asyncio
from util import delay

async def main():
    task = asyncio.create_task(delay(10))  # 建立任務,模擬長時間操作
    try:
        await asyncio.wait_for(asyncio.shield(task), 5)  # 使用shield保護任務
        print(result)
    except TimeoutError:
        print("任務執行時間超過五秒,但它很快就會完成!")
    result = await task  # 等待任務完成
    print(result)

asyncio.run(main())

asyncio.shield(task) 保護了 task 不被取消,即使 wait_for 超時,task 依然會繼續執行直到完成。

  graph LR
    B[B]
    No[No]
    Yes[Yes]
A[建立 Task] --> B{wait_for 超時?};
B -- Yes --> C[印出提示訊息];
C --> D[等待 Task 完成];
B -- No --> D;
D --> E[程式結束];

解析 asyncio 的核心概念:Future、Task、Coroutine 和 Awaitable

Future:對未來結果的承諾

Future 物件代表一個未來可能獲得的值,它一開始處於未完成狀態,直到結果被設定。

from asyncio import Future

my_future = Future()
print(f'my_future 是否已完成? {my_future.done()}')  # False
my_future.set_result(42)
print(f'my_future 是否已完成? {my_future.done()}')  # True
print(f'my_future 的結果是什麼? {my_future.result()}')  # 42

await 與 Future

await future 表示暫停執行,直到 future 的值被設定。

from asyncio import Future
import asyncio

# ... (先前程式碼)

async def main():
    future = make_request()
    print(f'Future 是否已完成? {future.done()}')  # False
    value = await future  # 等待 future 完成
    print(f'Future 是否已完成? {future.done()}')  # True
    print(value)  # 42

asyncio.run(main())

Task:Coroutine 的執行載體

Task 結合了 coroutine 和 future。它執行 coroutine,並將結果儲存在 future 中。

Awaitable:可被 await 的物件

Awaitable 是 coroutine、future 和 task 的共同抽象基礎類別,任何實作了 __await__ 方法的物件都是 awaitable。

  graph TD
    Awaitable[Awaitable]
    Coroutine[Coroutine]
    Future[Future]
    Task[Task]
    Awaitable --> Coroutine
    Awaitable --> Future
    Future --> Task

使用裝飾器測量 Coroutine 執行時間

async_timed 裝飾器可以方便地測量 coroutine 的執行時間:

import functools
import time
from typing import Callable, Any

def async_timed():
    def wrapper(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapped(*args, **kwargs) -> Any:
            print(f'開始執行 {func.__name__},引數:{args} {kwargs}')
            start = time.time()
            try:
                return await func(*args, **kwargs)
            finally:
                end = time.time()
                total = end - start
                print(f'完成 {func.__name__},耗時:{total:.4f} 秒')
        return wrapped
    return wrapper

# 使用範例
@async_timed()
async def my_coroutine():
    # ...
    pass

async_timed 裝飾器利用 functools.wraps 保留了原函式的名稱和檔案字串。它記錄了 coroutine 的開始和結束時間,並計算總執行時間。

這個裝飾器可以應用於任何 coroutine,提供便捷的效能分析。

透過以上説明,我們更深入地理解了 asyncio 中任務取消、逾時處理以及核心概念之間的關係,為構建更穩健、高效的非同步應用程式奠定了基礎。