開發一個穩定的網路應用程式,除了核心功能的實作外,錯誤處理和優雅關閉更是不可或缺的環節。本文將探討如何建構更健壯的 asyncio echo 伺服器,重點關注錯誤處理和優雅關閉的機制,以確保應用程式在各種情況下都能穩定執行。
訊號中斷與優雅關閉
在 Unix-like 系統中,訊號 (Signal) 是一種非同步通知程式事件的機制。常見的訊號包括 SIGINT
(按下 CTRL-C
時觸發) 和 SIGTERM
(使用 kill
命令停止程式時觸發)。為了讓應用程式在收到這些訊號時能妥善處理資源並結束執行,我們需要實作優雅關閉機制。
訊號監聽
Asyncio 的事件迴圈提供 add_signal_handler
方法,讓我們可以直接監聽指定的訊號。這個方法與 signal
模組中的 signal.signal
函式不同,它能安全地與事件迴圈互動,避免衝突。
以下程式碼示範如何新增訊號處理程式來取消所有正在執行的任務:
import asyncio
import signal
async def cancel_tasks():
print('收到 SIGINT 訊號!')
tasks = asyncio.all_tasks()
print(f'正在取消 {len(tasks)} 個任務.')
[task.cancel() for task in tasks]
async def main():
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, cancel_tasks)
await asyncio.sleep(10)
asyncio.run(main())
這段程式碼設定了一個 SIGINT
訊號處理程式 cancel_tasks
。當按下 CTRL-C
時,cancel_tasks
會被呼叫,印出訊息並取消所有正在執行的任務。asyncio.sleep(10)
模擬了一個長時間執行的任務。
等待任務完成
理想的關閉機制應該允許正在執行的任務在一定時間內完成,而不是立即終止。我們可以使用 asyncio.wait_for
來設定任務的逾時時間,並使用 try...except
結構來處理逾時錯誤。
import asyncio
import signal
from typing import List
class GracefulExit(SystemExit):
pass
def shutdown():
raise GracefulExit()
async def echo(connection, loop):
# ... existing echo code ...
pass # Placeholder for existing code
async def connection_listener(server_socket, loop):
# ... existing connection_listener code ...
pass # Placeholder for existing code
echo_tasks: List[asyncio.Task] = []
async def close_echo_tasks(echo_tasks):
waiters = [asyncio.wait_for(task, 2) for task in echo_tasks]
for task in waiters:
try:
await task
except asyncio.TimeoutError:
print("任務逾時!")
pass
async def main():
loop = asyncio.get_event_loop()
# ... existing main code ...
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(getattr(signal, signame), shutdown)
await connection_listener(None, loop) # Placeholder - remove None in actual code
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except GracefulExit:
print("開始關閉程式...")
loop.run_until_complete(close_echo_tasks(echo_tasks))
finally:
loop.close()
close_echo_tasks
函式會等待所有 echo_tasks
中的任務完成,最多等待 2 秒。shutdown
函式引發 GracefulExit
異常來觸發關閉流程。try...except...finally
結構確保在關閉過程中資源得到釋放。
Asyncio 高效能網路應用
Asyncio 提供了強大的工具來構建高效能的網路應用。aiohttp
函式庫則是一個根據 Asyncio 的非同步 HTTP 客戶端/伺服器,它使用非阻塞 Socket 進行網路請求,大幅提升網路請求的效率。
aiohttp
的優勢
相較於 requests
等同步函式庫,aiohttp
在 Asyncio 環境下表現更出色。它允許同時發起多個網路請求,而不會阻塞事件迴圈,從而提升應用程式的回應速度。
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "http://python.org")
print(html)
asyncio.run(main())
這段程式碼使用 aiohttp
發起一個 GET 請求到 http://python.org
。async with
語法確保資源的正確管理。fetch
函式是一個協程,它使用 session.get
發起請求,並使用 await
等待回應。
Asyncio 應用架構
graph LR C[C] A[Client] --> B(Connection Listener) B --> C{Echo Task} C --> D[Response] D --> A subgraph Asyncio Event Loop B C end
圖表說明: 此圖展示了 Asyncio Echo 伺服器的基本架構。客戶端請求經由連線監聽器傳遞到 Echo 任務,處理後將回應傳回給客戶端。所有這些操作都在 Asyncio 事件迴圈中進行。
這個架構展示了 Asyncio 如何處理併發連線。連線監聽器接收新的連線,並為每個連線建立一個 Echo 任務。這些任務在事件迴圈中併發執行,從而實作高效能的網路服務。
本文探討瞭如何建構更穩固的 Asyncio 伺服器,涵蓋了錯誤處理、優雅關閉和使用 aiohttp
進行高效能網路程式設計。這些技術對於構建可靠與高效的非同步應用程式至關重要。透過理解和應用這些概念,開發者可以更好地掌控 Asyncio 應用程式的行為,並提升其穩定性和效能。
import asyncio
import aiohttp
async def fetch_page(session: aiohttp.ClientSession, url: str) -> str:
"""非同步擷取網頁內容。"""
async with session.get(url) as response:
return await response.text()
async def process_data(data: str):
"""處理網頁資料。"""
# 在此加入資料處理邏輯
print(f"處理資料:{data[:50]}...")
async def main():
"""主程式進入點。"""
async with aiohttp.ClientSession() as session:
urls = [f"https://example.com/page/{i}" for i in range(1, 11)] # 產生10個測試網址
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
for page in pages:
await process_data(page)
if __name__ == "__main__":
asyncio.run(main())
graph LR B[B] A[建立 ClientSession] --> B{建立非同步任務}; B --> C[使用 asyncio.gather 併發執行]; C --> D[處理每個頁面資料]; D --> E[結束];
此程式碼示範如何使用 aiohttp
和 asyncio.gather
併發處理多個網頁請求。
fetch_page
函式:負責非同步擷取指定網址的網頁內容。利用session.get
方法發出 HTTP GET 請求,並使用await response.text()
取得網頁的文字內容。process_data
函式:模擬處理網頁資料的邏輯。實際應用中,可以根據需求修改此函式,例如解析 HTML、提取特定資訊等。main
函式:- 建立一個
aiohttp.ClientSession
物件,用於管理 HTTP 連線和資源。 - 使用 list comprehension 產生 10 個測試網址。
- 使用 list comprehension 建立多個非同步任務,每個任務呼叫
fetch_page
函式擷取一個網頁。 - 使用
asyncio.gather
併發執行所有任務,並將結果儲存在pages
列表中。 - 迴圈處理
pages
列表中的每個網頁資料,呼叫process_data
函式進行處理。
- 建立一個
if __name__ == "__main__":
區塊:確保程式碼只有在直接執行時才會執行,避免在被其他模組匯入時執行。
這個範例展示瞭如何有效地使用 aiohttp
和 asyncio.gather
進行非同步網頁抓取和處理,提升程式效能。
這個程式碼範例展示瞭如何使用 aiohttp
函式庫搭配 asyncio.gather
併發處理多個網頁請求,有效提升網頁抓取效率。同時,透過設定逾時,可以更精確地控制請求時間,避免程式卡住。
藉由這些技巧,玄貓期望能幫助讀者更有效率地使用 aiohttp
進行非同步網頁抓取,開發高效能的網路應用。
在網頁抓取或與多個網路服務互動時,效能至關重要。Python 的 asyncio
函式庫提供了一個優雅的解決方案,讓我們能夠以非同步的方式處理網路請求,從而顯著提升效能。本文將探討如何使用 asyncio
和 aiohttp
函式庫實作高效的網頁請求,並分享我在實際應用中的一些心得體會。
併發請求:asyncio.gather 的威力
asyncio.gather
允許我們併發執行多個非同步操作。以下程式碼範例展示瞭如何使用 asyncio.gather
併發取得多個網頁的狀態碼:
import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetch_status(session: ClientSession, url: str) -> int:
async with session.get(url) as response:
return response.status
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com' for _ in range(1000)]
tasks = [fetch_status(session, url) for url in urls]
status_codes = await asyncio.gather(*tasks)
print(status_codes)
asyncio.run(main())
這段程式碼首先建立了一個 aiohttp.ClientSession
,用於管理網路連線。接著,它建立了一個包含 1000 個 URL 的列表,並使用 fetch_status
函式為每個 URL 建立一個 coroutine。asyncio.gather
將這些 coroutine 封裝成 tasks 並併發執行,有效地節省了等待時間。status_codes
列表將包含所有請求的結果,按照請求發起的順序排列。
效能對比:同步 vs. 非同步
為了比較同步和非同步請求的效能差異,我們可以修改 main
函式,使其以同步的方式執行請求:
import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetch_status(session: ClientSession, url: str) -> int:
async with session.get(url) as response:
return response.status
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com' for _ in range(1000)]
status_codes = []
for url in urls:
status_codes.append(await fetch_status(session, url))
print(status_codes)
asyncio.run(main())
此版本使用迴圈和 await
關鍵字,使每個請求都以同步的方式執行,也就是說,下一個請求必須等待前一個請求完成後才能開始。在我的測試中,非同步版本耗時約 600 毫秒,而同步版本耗時約 18 秒。非同步版本的執行速度比同步版本快了 30 倍!這凸顯了非同步程式設計在提升網路請求效能方面的巨大優勢。
asyncio.gather 與結果順序
asyncio.gather
的結果順序與傳入的 coroutine 順序一致,即使 coroutine 的完成時間不同。
import asyncio
async def delay(seconds: int) -> int:
await asyncio.sleep(seconds)
return seconds
async def main():
results = await asyncio.gather(delay(3), delay(1))
print(results)
asyncio.run(main())
儘管 delay(1)
比 delay(3)
先完成,但 results
的順序仍然是 [3, 1]
,與傳入 asyncio.gather
的順序相同。這確保了結果的可預測性,方便我們處理資料。
例外處理:確保程式穩定性
當網路請求失敗時,asyncio.gather
提供了 return_exceptions
引數來控制例外處理。
import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetch_status(session: ClientSession, url: str):
try:
async with session.get(url) as response:
return response.status
except aiohttp.ClientError as e:
return e
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://example.com', 'python://example.com'] # 包含一個無效 URL
tasks = [fetch_status(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"請求發生錯誤: {result}")
else:
print(f"狀態碼: {result}")
asyncio.run(main())
此程式碼示範瞭如何使用 return_exceptions=True
捕捉所有異常。fetch_status
函式現在會捕捉 aiohttp.ClientError
例外,並將其傳回。在 main
函式中,我們檢查 results
列表中的每個元素,判斷是狀態碼還是例外,並進行相應處理。
視覺化非同步流程
graph LR B[B] D[D] A[建立 ClientSession] --> B{建立多個請求任務}; B --> C[使用 asyncio.gather 併發執行]; C --> D{等待所有任務完成}; D --> E[傳回結果列表];
這個流程圖清晰地展示了 asyncio.gather
如何併發處理多個請求任務,並傳回結果列表。
透過以上技巧,我們可以利用 asyncio
和 aiohttp
構建高效與穩定的網路應用程式。非同步程式設計對於提升網路爬蟲、API 互動等場景的效能至關重要。
希望這些技巧和範例能幫助您更好地理解和應用 aiohttp
,提升您的 Python 網路程式設計技能。