開發一個穩定的網路應用程式,除了核心功能的實作外,錯誤處理和優雅關閉更是不可或缺的環節。本文將探討如何建構更健壯的 asyncio echo 伺服器,重點關注錯誤處理和優雅關閉的機制,以確保應用程式在各種情況下都能穩定執行。

訊號中斷與優雅關閉

在 Unix-like 系統中,訊號 (Signal) 是一種非同步通知程式事件的機制。常見的訊號包括 SIGINT (按下 CTRL-C 時觸發) 和 SIGTERM (使用 kill 命令停止程式時觸發)。為了讓應用程式在收到這些訊號時能妥善處理資源並結束執行,我們需要實作優雅關閉機制。

訊號監聽

Asyncio 的事件迴圈提供 add_signal_handler 方法,讓我們可以直接監聽指定的訊號。這個方法與 signal 模組中的 signal.signal 函式不同,它能安全地與事件迴圈互動,避免衝突。

以下程式碼示範如何新增訊號處理程式來取消所有正在執行的任務:

import asyncio
import signal

async def cancel_tasks():
    print('收到 SIGINT 訊號!')
    tasks = asyncio.all_tasks()
    print(f'正在取消 {len(tasks)} 個任務.')
    [task.cancel() for task in tasks]

async def main():
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, cancel_tasks)
    await asyncio.sleep(10)

asyncio.run(main())

這段程式碼設定了一個 SIGINT 訊號處理程式 cancel_tasks。當按下 CTRL-C 時,cancel_tasks 會被呼叫,印出訊息並取消所有正在執行的任務。asyncio.sleep(10) 模擬了一個長時間執行的任務。

等待任務完成

理想的關閉機制應該允許正在執行的任務在一定時間內完成,而不是立即終止。我們可以使用 asyncio.wait_for 來設定任務的逾時時間,並使用 try...except 結構來處理逾時錯誤。

import asyncio
import signal
from typing import List

class GracefulExit(SystemExit):
    pass

def shutdown():
    raise GracefulExit()

async def echo(connection, loop):
    # ... existing echo code ...
    pass  # Placeholder for existing code

async def connection_listener(server_socket, loop):
    # ... existing connection_listener code ...
    pass  # Placeholder for existing code

echo_tasks: List[asyncio.Task] = []

async def close_echo_tasks(echo_tasks):
    waiters = [asyncio.wait_for(task, 2) for task in echo_tasks]
    for task in waiters:
        try:
            await task
        except asyncio.TimeoutError:
            print("任務逾時!")
            pass

async def main():
    loop = asyncio.get_event_loop()
    # ... existing main code ...
    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(getattr(signal, signame), shutdown)
    await connection_listener(None, loop)  # Placeholder - remove None in actual code


loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(main())
except GracefulExit:
    print("開始關閉程式...")
    loop.run_until_complete(close_echo_tasks(echo_tasks))
finally:
    loop.close()

close_echo_tasks 函式會等待所有 echo_tasks 中的任務完成,最多等待 2 秒。shutdown 函式引發 GracefulExit 異常來觸發關閉流程。try...except...finally 結構確保在關閉過程中資源得到釋放。

Asyncio 高效能網路應用

Asyncio 提供了強大的工具來構建高效能的網路應用。aiohttp 函式庫則是一個根據 Asyncio 的非同步 HTTP 客戶端/伺服器,它使用非阻塞 Socket 進行網路請求,大幅提升網路請求的效率。

aiohttp 的優勢

相較於 requests 等同步函式庫,aiohttp 在 Asyncio 環境下表現更出色。它允許同時發起多個網路請求,而不會阻塞事件迴圈,從而提升應用程式的回應速度。

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "http://python.org")
        print(html)

asyncio.run(main())

這段程式碼使用 aiohttp 發起一個 GET 請求到 http://python.orgasync with 語法確保資源的正確管理。fetch 函式是一個協程,它使用 session.get 發起請求,並使用 await 等待回應。

Asyncio 應用架構

  graph LR
    C[C]
    A[Client] --> B(Connection Listener)
    B --> C{Echo Task}
    C --> D[Response]
    D --> A
    subgraph Asyncio Event Loop
        B
        C
    end

圖表說明: 此圖展示了 Asyncio Echo 伺服器的基本架構。客戶端請求經由連線監聽器傳遞到 Echo 任務,處理後將回應傳回給客戶端。所有這些操作都在 Asyncio 事件迴圈中進行。

這個架構展示了 Asyncio 如何處理併發連線。連線監聽器接收新的連線,並為每個連線建立一個 Echo 任務。這些任務在事件迴圈中併發執行,從而實作高效能的網路服務。

本文探討瞭如何建構更穩固的 Asyncio 伺服器,涵蓋了錯誤處理、優雅關閉和使用 aiohttp 進行高效能網路程式設計。這些技術對於構建可靠與高效的非同步應用程式至關重要。透過理解和應用這些概念,開發者可以更好地掌控 Asyncio 應用程式的行為,並提升其穩定性和效能。

import asyncio
import aiohttp

async def fetch_page(session: aiohttp.ClientSession, url: str) -> str:
    """非同步擷取網頁內容。"""
    async with session.get(url) as response:
        return await response.text()

async def process_data(data: str):
    """處理網頁資料。"""
    #  在此加入資料處理邏輯
    print(f"處理資料:{data[:50]}...")


async def main():
    """主程式進入點。"""
    async with aiohttp.ClientSession() as session:
        urls = [f"https://example.com/page/{i}" for i in range(1, 11)]  # 產生10個測試網址
        tasks = [fetch_page(session, url) for url in urls]
        pages = await asyncio.gather(*tasks)

        for page in pages:
            await process_data(page)


if __name__ == "__main__":
    asyncio.run(main())
  graph LR
    B[B]
    A[建立 ClientSession] --> B{建立非同步任務};
    B --> C[使用 asyncio.gather 併發執行];
    C --> D[處理每個頁面資料];
    D --> E[結束];

此程式碼示範如何使用 aiohttpasyncio.gather 併發處理多個網頁請求。

  1. fetch_page 函式:負責非同步擷取指定網址的網頁內容。利用 session.get 方法發出 HTTP GET 請求,並使用 await response.text() 取得網頁的文字內容。

  2. process_data 函式:模擬處理網頁資料的邏輯。實際應用中,可以根據需求修改此函式,例如解析 HTML、提取特定資訊等。

  3. main 函式:

    • 建立一個 aiohttp.ClientSession 物件,用於管理 HTTP 連線和資源。
    • 使用 list comprehension 產生 10 個測試網址。
    • 使用 list comprehension 建立多個非同步任務,每個任務呼叫 fetch_page 函式擷取一個網頁。
    • 使用 asyncio.gather 併發執行所有任務,並將結果儲存在 pages 列表中。
    • 迴圈處理 pages 列表中的每個網頁資料,呼叫 process_data 函式進行處理。
  4. if __name__ == "__main__": 區塊:確保程式碼只有在直接執行時才會執行,避免在被其他模組匯入時執行。

這個範例展示瞭如何有效地使用 aiohttpasyncio.gather 進行非同步網頁抓取和處理,提升程式效能。

這個程式碼範例展示瞭如何使用 aiohttp 函式庫搭配 asyncio.gather 併發處理多個網頁請求,有效提升網頁抓取效率。同時,透過設定逾時,可以更精確地控制請求時間,避免程式卡住。

藉由這些技巧,玄貓期望能幫助讀者更有效率地使用 aiohttp 進行非同步網頁抓取,開發高效能的網路應用。

在網頁抓取或與多個網路服務互動時,效能至關重要。Python 的 asyncio 函式庫提供了一個優雅的解決方案,讓我們能夠以非同步的方式處理網路請求,從而顯著提升效能。本文將探討如何使用 asyncioaiohttp 函式庫實作高效的網頁請求,並分享我在實際應用中的一些心得體會。

併發請求:asyncio.gather 的威力

asyncio.gather 允許我們併發執行多個非同步操作。以下程式碼範例展示瞭如何使用 asyncio.gather 併發取得多個網頁的狀態碼:

import asyncio
import aiohttp
from aiohttp import ClientSession

async def fetch_status(session: ClientSession, url: str) -> int:
    async with session.get(url) as response:
        return response.status

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com' for _ in range(1000)]
        tasks = [fetch_status(session, url) for url in urls]
        status_codes = await asyncio.gather(*tasks)
        print(status_codes)

asyncio.run(main())

這段程式碼首先建立了一個 aiohttp.ClientSession,用於管理網路連線。接著,它建立了一個包含 1000 個 URL 的列表,並使用 fetch_status 函式為每個 URL 建立一個 coroutine。asyncio.gather 將這些 coroutine 封裝成 tasks 並併發執行,有效地節省了等待時間。status_codes 列表將包含所有請求的結果,按照請求發起的順序排列。

效能對比:同步 vs. 非同步

為了比較同步和非同步請求的效能差異,我們可以修改 main 函式,使其以同步的方式執行請求:

import asyncio
import aiohttp
from aiohttp import ClientSession

async def fetch_status(session: ClientSession, url: str) -> int:
    async with session.get(url) as response:
        return response.status

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com' for _ in range(1000)]
        status_codes = []
        for url in urls:
            status_codes.append(await fetch_status(session, url))
        print(status_codes)

asyncio.run(main())

此版本使用迴圈和 await 關鍵字,使每個請求都以同步的方式執行,也就是說,下一個請求必須等待前一個請求完成後才能開始。在我的測試中,非同步版本耗時約 600 毫秒,而同步版本耗時約 18 秒。非同步版本的執行速度比同步版本快了 30 倍!這凸顯了非同步程式設計在提升網路請求效能方面的巨大優勢。

asyncio.gather 與結果順序

asyncio.gather 的結果順序與傳入的 coroutine 順序一致,即使 coroutine 的完成時間不同。

import asyncio

async def delay(seconds: int) -> int:
    await asyncio.sleep(seconds)
    return seconds

async def main():
    results = await asyncio.gather(delay(3), delay(1))
    print(results)

asyncio.run(main())

儘管 delay(1)delay(3) 先完成,但 results 的順序仍然是 [3, 1],與傳入 asyncio.gather 的順序相同。這確保了結果的可預測性,方便我們處理資料。

例外處理:確保程式穩定性

當網路請求失敗時,asyncio.gather 提供了 return_exceptions 引數來控制例外處理。

import asyncio
import aiohttp
from aiohttp import ClientSession

async def fetch_status(session: ClientSession, url: str):
    try:
        async with session.get(url) as response:
            return response.status
    except aiohttp.ClientError as e:
        return e

async def main():
    async with aiohttp.ClientSession() as session:
        urls = ['https://example.com', 'python://example.com']  # 包含一個無效 URL
        tasks = [fetch_status(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for result in results:
            if isinstance(result, Exception):
                print(f"請求發生錯誤: {result}")
            else:
                print(f"狀態碼: {result}")

asyncio.run(main())

此程式碼示範瞭如何使用 return_exceptions=True 捕捉所有異常。fetch_status 函式現在會捕捉 aiohttp.ClientError 例外,並將其傳回。在 main 函式中,我們檢查 results 列表中的每個元素,判斷是狀態碼還是例外,並進行相應處理。

視覺化非同步流程

  graph LR
    B[B]
    D[D]
    A[建立 ClientSession] --> B{建立多個請求任務};
    B --> C[使用 asyncio.gather 併發執行];
    C --> D{等待所有任務完成};
    D --> E[傳回結果列表];

這個流程圖清晰地展示了 asyncio.gather 如何併發處理多個請求任務,並傳回結果列表。

透過以上技巧,我們可以利用 asyncioaiohttp 構建高效與穩定的網路應用程式。非同步程式設計對於提升網路爬蟲、API 互動等場景的效能至關重要。

希望這些技巧和範例能幫助您更好地理解和應用 aiohttp,提升您的 Python 網路程式設計技能。