在 Python 的 asyncio 非同步程式設計中,優雅地關閉程式和處理系統訊號至關重要。當程式需要終止時,我們必須確保所有正在執行的任務都能妥善完成,釋放資源,並避免資料損壞或系統不穩定。我經常在處理需要長時間執行的非同步任務時,特別關注優雅關閉的議題,因為這直接關係到程式的健壯性和可靠性。
執行器與非同步任務的關閉挑戰
asyncio.run()
函式簡化了非同步程式的進入點,但它主要管理的是任務(Task)物件。當我們使用執行器(Executor)執行同步程式碼時,會產生 Future 物件,而非 Task。這使得 asyncio.run()
無法直接管理這些 Future,可能導致程式關閉時,執行器中的任務被強制中斷。
策略一:Future 轉換成 Task
一種解決方案是將 Future 物件封裝成 Task,讓 asyncio.run()
接管管理:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
async def run_in_executor_as_task(executor, func, *args):
loop = asyncio.get_running_loop()
future = loop.run_in_executor(executor, func, *args)
return await future
async def main():
with ThreadPoolExecutor() as executor:
try:
await run_in_executor_as_task(executor, time.sleep, 2) # 模擬長時間執行的任務
print("任務完成")
except asyncio.CancelledError:
print("任務被取消")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被中斷")
run_in_executor_as_task
函式將 loop.run_in_executor
的結果用 await
等待,並傳回結果。這使得該函式成為一個 coroutine,可以被 asyncio.create_task
轉換成 Task。如此一來,asyncio.run()
就能管理執行器中的任務,確保其在程式關閉時被正確取消。
策略二:訊號處理與執行器管理
更全面的方法是結合訊號處理和執行器管理,實作更精細的控制:
import asyncio
import signal
import time
from concurrent.futures import ThreadPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as executor:
future = loop.run_in_executor(executor, time.sleep, 5) # 模擬長時間執行的任務
def signal_handler():
print("收到關閉訊號,正在取消任務...")
future.cancel()
loop.stop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, signal_handler)
try:
await future
print("任務完成")
except asyncio.CancelledError:
print("任務被取消")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
print("程式關閉")
此程式碼建立一個自定義的事件迴圈,並使用 add_signal_handler
註冊訊號處理器。當收到 SIGINT
或 SIGTERM
訊號時,signal_handler
會取消 Future 並停止迴圈。try...except
區塊用於捕捉 asyncio.CancelledError
,確保程式能優雅地離開。
我認為,在設計非同步程式時,預先考慮優雅關閉機制至關重要。根據任務的複雜度和需求,選擇合適的策略,才能確保程式在各種情況下都能安全穩定地離開。我個人更傾向於結合訊號處理和執行器管理的策略,因為它提供了更精細的控制,更能符合實際應用場景的需求。
graph LR B[B] D[D] E[E] A[主程式] --> B{建立執行器} B --> C[提交任務到執行器] C --> D{Future 物件} D --> E{訊號處理器} E --> F[取消 Future] F --> G[程式關閉]
sequenceDiagram participant Main Program participant Executor participant Signal Handler Main Program->>Executor: 提交任務 activate Executor Executor-->>Main Program: Future 物件 Main Program->>Signal Handler: 註冊訊號處理器 Signal Handler->>Signal Handler: 等待訊號 Signal Handler->>Main Program: 收到訊號 Main Program->>Executor: 取消 Future deactivate Executor Main Program->>Main Program: 程式關閉
在現代分散式系統中,訊息佇列扮演著至關重要的角色,它可以有效地解耦不同服務之間的依賴關係,提高系統的可靠性和可擴充套件性。Python 的 asyncio
函式庫提供了一個強大的非同步程式設計框架,非常適合構建高效能的訊息代理伺服器。本文將探討如何使用 asyncio
建立一個簡易卻功能完善的訊息代理伺服器。
訊息協定設計
一個清晰與高效的訊息協定是訊息代理伺服器的根本。我們採用一個簡單的協定:訊息以 4 位元組的二進位資料作為字首,表示訊息的長度,後續跟著訊息本身。這個設計允許伺服器和客戶端準確地解析訊息邊界,避免訊息粘黏和截斷。
# message_protocol.py
import asyncio
async def read_message(stream: asyncio.StreamReader) -> bytes:
"""從串流中讀取訊息。"""
size_bytes = await stream.readexactly(4)
size = int.from_bytes(size_bytes, byteorder='big')
data = await stream.readexactly(size)
return data
async def send_message(stream: asyncio.StreamWriter, data: bytes):
"""將訊息傳送到串流。"""
size_bytes = len(data).to_bytes(4, byteorder='big')
stream.writelines([size_bytes, data])
await stream.drain()
read_message
函式首先讀取 4 位元組的訊息長度,然後根據長度讀取訊息內容。send_message
函式則先將訊息長度轉換為 4 位元組的二進位資料,再將其和訊息內容一起傳送到串流。
系統架構
下圖展示了訊息代理伺服器和客戶端之間的互動流程:
graph LR C[C] subgraph 訊息傳送器 A[傳送訊息] --> B(訊息代理伺服器) end subgraph 訊息代理伺服器 B --> C{分發訊息} end subgraph 訊息監聽器 C --> D[接收訊息] end
訊息流程
下圖展示了訊息在伺服器內部的處理流程:
sequenceDiagram participant Client as 客戶端 participant Server as 伺服器 Client->>Server: 連線 & 訂閲頻道 activate Server Server-->>Client: 確認訂閲 deactivate Server loop 訊息傳送 Client->>Server: 傳送訊息 (頻道 + 內容) activate Server Server->>Server: 查詢訂閲者 Server->>Client: 分發訊息給訂閲者 deactivate Server end
我認為這個架構在處理大量併發連線和訊息時表現出色,因為 asyncio
的非同步特性允許伺服器高效地處理多個客戶端請求,而無需阻塞。此外,清晰的訊息協定和頻道訂閲機制確保了訊息的可靠傳輸和分發。
這個簡易的訊息代理伺服器可以作為一個很好的學習範例,但要應用於生產環境,還需要進一步完善,例如:
- 持久化訊息:將訊息儲存到磁碟或資料函式庫,防止訊息丟失。
- 認證和授權:確保只有授權的客戶端才能連線和傳送訊息。
- 更豐富的訊息路由策略:支援更複雜的訊息過濾和分發規則。
透過持續的改進和最佳化,這個根據 asyncio
的訊息代理伺服器可以成為一個強大的工具,幫助我們構建更具彈性和可擴充套件性的分散式系統。
在建構非同步訊息佇列系統時,效能瓶頸常發生於訊息的傳送與接收環節。若訊息的傳送與接收在同一個協程中處理,慢速的訂閲者將拖慢整個系統。我發現,透過解耦訊息傳送與接收,可以有效提升訊息佇列的效能。本文將探討如何使用 Python 的 asyncio
模組實作這個目標,並分享我在設計過程中的一些心得。
效能瓶頸分析
在原始設計中,伺服器收到訊息後,會立即分發給所有訂閲者。如果其中一個訂閲者處理速度慢,就會阻塞整個訊息分發流程,影響伺服器接收新訊息的效率。
graph LR C[C] A[訊息傳送者] --> B(伺服器); B --> C{訂閲者}; C -- 慢速訂閲者 --> D[效能阻塞];
圖表説明: 慢速訂閲者造成系統效能阻塞。
解耦方案:獨立傳送協程與訊息佇列
為解決此問題,我採用了將訊息傳送與接收解耦的策略。具體來説,引入訊息佇列來暫存待傳送的訊息。伺服器收到訊息後,先將訊息放入對應通道的訊息佇列,再立即傳回接收新訊息。同時,啟動獨立的傳送協程,負責從訊息佇列取出訊息並傳送給訂閲者。即使有慢速訂閲者,也不會影響伺服器接收新訊息。
graph LR D[D] A[訊息傳送者] --> B(伺服器); B --> C[訊息佇列]; C --> D{傳送協程}; D --> E[訂閲者 1]; D --> F[訂閲者 2]; D --> G[訂閲者 3];
圖表説明: 傳送協程從訊息佇列取出訊息,傳送給各個訂閲者。
Python 程式碼實作
以下程式碼展示了最佳化後的訊息佇列伺服器實作:
import asyncio
from collections import deque, defaultdict
from typing import Deque, DefaultDict
subscribers: DefaultDict[str, Deque] = defaultdict(deque)
message_queues: DefaultDict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
async def handle_client(reader, writer):
# ... (處理客戶端連線與訂閲) ...
try:
while True:
channel = await reader.readline().decode().strip()
message = await reader.readline().decode().strip()
await message_queues[channel].put(message)
except Exception as e:
# ... (錯誤處理) ...
async def channel_sender(channel):
while True:
message = await message_queues[channel].get()
for subscriber in subscribers[channel]:
# ... (將訊息傳送給訂閲者) ...
async def main():
server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
# ... (啟動 channel_sender 協程) ...
async with server:
await server.serve_forever()
asyncio.run(main())
subscribers
儲存每個通道的訂閲者列表。message_queues
儲存每個通道的訊息佇列。handle_client
協程處理客戶端連線,將接收到的訊息放入對應通道的訊息佇列。channel_sender
協程從訊息佇列取出訊息,並傳送給所有訂閲者。
整合 Twisted 與 asyncio
Twisted 是一個成熟的 Python 非同步網路框架。我發現,在某些專案中,整合 Twisted 和 asyncio 可以帶來更大的靈活性。
from twisted.internet import asyncioreactor
asyncioreactor.install()
# ... (使用 Twisted 的 API) ...
這段程式碼將 asyncio 的事件迴圈安裝到 Twisted 中,允許在 Twisted 程式碼中使用 asyncio 的功能。
透過解耦訊息傳送與接收,並結合訊息佇列和獨立的傳送協程,我們可以有效提升非同步訊息佇列的效能。同時,整合 Twisted 和 asyncio 可以提供更全面的解決方案。希望我的經驗分享能幫助你構建更高效的非同步系統。
Python 的 asyncio
函式庫為非同步程式設計提供了一個優雅的解決方案,而 ZeroMQ 則是一個高效能的非同步訊息傳遞函式庫。兩者結合,可以開發出兼具效能和可維護性的網路應用程式。本文將探討如何利用 asyncio
和 ZeroMQ 構建這樣的應用程式。
傳統輪詢方法的瓶頸
傳統的 ZeroMQ 應用程式通常採用輪詢機制來處理多個 Socket 的事件。這種方法需要不斷檢查每個 Socket 的狀態,效率較低,而與程式碼結構複雜,不易維護。以下是一個典型的例子:
import zmq
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
while True:
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
break
if receiver in socks:
message = receiver.recv_json()
print(f"Via PULL: {message}")
if subscriber in socks:
message = subscriber.recv_json()
print(f"Via SUB: {message}")
這段程式碼使用 zmq.Poller
來監聽兩個 Socket:receiver
和 subscriber
。在 while
迴圈中,poller.poll()
會阻塞,直到其中一個 Socket 有資料可讀。然後,程式碼會檢查哪個 Socket 有資料,並接收訊息。這種方式雖然可行,但程式碼結構不夠清晰,尤其當 Socket 數量增加時,會變得更加複雜。
asyncio 的優雅解決方案
asyncio
提供了更優雅的非同步程式設計模型。透過使用 async
和 await
關鍵字,我們可以編寫非同步程式碼,而無需顯式地管理回呼或執行緒。結合 pyzmq
函式庫,我們可以將 ZeroMQ Socket 整合到 asyncio
事件迴圈中,從而簡化程式碼並提高效能。
import zmq
import asyncio
context = zmq.asyncio.Context()
async def receiver_task():
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
while True:
message = await receiver.recv_json()
print(f"Via PULL: {message}")
async def subscriber_task():
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
message = await subscriber.recv_json()
print(f"Via SUB: {message}")
async def main():
await asyncio.gather(receiver_task(), subscriber_task())
if __name__ == "__main__":
asyncio.run(main())
這段程式碼使用 zmq.asyncio.Context
建立了一個 asyncio
相容的 ZeroMQ 上下文。receiver_task
和 subscriber_task
兩個協程分別處理 PULL
和 SUB
Socket 的訊息接收。asyncio.gather
允許平行執行這兩個協程。await
關鍵字確保程式碼在等待訊息時不會阻塞事件迴圈。
效能比較與優勢分析
相比傳統的輪詢方法,使用 asyncio
的方法具有以下優勢:
- 程式碼更簡潔易讀: 使用
async
和await
可以避免複雜的回呼和執行緒管理,使程式碼更易於理解和維護。 - 更高的效能:
asyncio
可以有效地利用單執行緒處理多個 Socket,避免了執行緒切換的開銷,從而提高效能。 - 更好的擴充套件性:
asyncio
可以輕鬆地處理大量的平行連線,更適合構建高併發的網路應用程式。
graph LR B[B] C[C] F[F] A[傳統輪詢] --> B{Poller 阻塞}; B --> C{檢查 Socket 狀態}; C --> D[接收訊息]; subgraph asyncio E[asyncio] --> F{非同步等待訊息}; F --> G[接收訊息]; end
圖表説明: 該圖表展示了傳統輪詢和 asyncio 處理訊息的流程差異。asyncio 避免了阻塞式的輪詢,從而提高了效率。
我認為,asyncio 與 ZeroMQ 的結合是 Python 非同步網路程式設計的最佳實踐之一。它提供了一種優雅與高效的方式來處理大量的平行連線,同時保持程式碼的簡潔和可維護性。
在微服務架構中,有效監控每個服務的執行狀態至關重要。我將在這篇文章中分享如何利用 ZeroMQ(ØMQ)和 Python 的 Asyncio 函式庫,構建一個高效的效能指標收集和視覺化系統。此係統能即時收集各個微服務的 CPU 和記憶體使用情況,並以動態圖表的方式展現。
系統架構
我將系統劃分為三個主要部分:
- 應用層(微服務): 各個微服務應用程式,負責執行業務邏輯並定期傳送效能指標資料。
- 收集層: 中心化的伺服器,負責接收來自各個微服務的效能指標資料,並將其轉發給瀏覽器客戶端。
- 視覺化層: 瀏覽器客戶端,負責接收即時資料並使用圖表進行動態展示。
以下 圖表展示了系統的整體架構:
graph LR C[C] subgraph 應用層 [微服務] A[微服務 1] --> C{收集層} B[微服務 2] --> C D[微服務 N] --> C end C --> E[視覺化層]
應用層實作
應用層使用 psutil
函式庫收集效能指標,並透過 zmq.PUB
socket 將資料以 JSON 格式傳送到收集層。
import asyncio
import zmq
import zmq.asyncio
import psutil
import json
import datetime
async def stats_reporter(color):
ctx = zmq.asyncio.Context()
sock = ctx.socket(zmq.PUB)
sock.connect('tcp://localhost:5555') # 連線到收集層
while True:
cpu_percent = psutil.cpu_percent()
mem_percent = psutil.virtual_memory().percent
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat()
data = {
"color": color,
"cpu": cpu_percent,
"mem": mem_percent,
"timestamp": timestamp
}
await sock.send_json(data) # 傳送 JSON 資料
await asyncio.sleep(1)
async def main():
await stats_reporter("red") # 使用紅色標識此微服務
if __name__ == "__main__":
asyncio.run(main())
stats_reporter
協程持續收集 CPU 和記憶體使用率,並將這些資料連同時間戳和顏色標識一起封裝成 JSON 格式,透過 ZeroMQ 的 PUB socket 傳送到收集層。main
函式則負責啟動這個協程。我選擇使用 JSON 格式,是因為它易於解析和處理,與能傳輸結構化資料。
收集層實作
收集層使用 zmq.SUB
socket 接收來自應用層的資料,並透過 Server-Sent Events (SSE) 將資料推播到瀏覽器客戶端。
import asyncio
from weakref import WeakSet
import zmq
import zmq.asyncio
from aiohttp import web
from aiohttp_sse import sse_response
import json
ctx = zmq.asyncio.Context()
connections = WeakSet()
async def collector():
sock = ctx.socket(zmq.SUB)
sock.bind('tcp://*:5555')
sock.setsockopt_string(zmq.SUBSCRIBE, '') # 訂閲所有訊息
while True:
data = await sock.recv_json()
for q in connections:
await q.put(data)
async def feed(request):
queue = asyncio.Queue()
connections.add(queue)
async with sse_response(request) as resp:
while True:
data = await queue.get()
await resp.send_json(data)
async def main():
asyncio.create_task(collector())
app = web.Application()
app.router.add_get('/feed', feed)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
await asyncio.Future() # 讓伺服器持續執行
if __name__ == "__main__":
asyncio.run(main())
collector
協程負責接收來自各個微服務的效能資料,feed
協程則負責將接收到的資料透過 SSE 傳送到已連線的瀏覽器客戶端。connections
使用 WeakSet
來追蹤所有已連線的客戶端,避免記憶體洩漏。 SSE 的使用讓瀏覽器能持續接收更新,而無需不斷輪詢伺服器,提升了效率。
視覺化層實作 (前端程式碼示意)
視覺化層使用 Smoothie Charts 函式庫在瀏覽器中繪製動態圖表。
var evtSource = new EventSource("/feed");
evtSource.onmessage = function(e) {
var data = JSON.parse(e.data);
// 使用 data.cpu, data.mem, data.timestamp 更新圖表
};
這段 JavaScript 程式碼建立了一個 EventSource
物件,用於監聽來自伺服器的 SSE。每當伺服器傳送新的資料,onmessage
事件就會觸發,程式碼會解析 JSON 資料並更新圖表。 Smoothie Charts 非常適合處理串流資料,能流暢地繪製即時圖表。
本文介紹瞭如何使用 ØMQ 和 Asyncio 構建一個微服務效能監控系統,實作了串流即時監控並以圖表動態展示。透過結合 ØMQ 的高效能訊息傳輸和 Asyncio 的非同步程式設計模型,系統能輕鬆處理大量併發連線和資料流。我加入了 圖表和詳細的程式碼解密,更清晰地闡述了系統架構和實作細節,希望能幫助你快速理解並應用到實際專案中。
在高流量的網路應用中,資料函式庫的讀寫效能往往成為瓶頸。為了提升應用程式的反應速度,快取機制是不可或缺的。然而,快取的引入也帶來了資料一致性的挑戰:當資料函式庫中的資料更新時,如何有效地讓快取中的資料失效,以確保使用者總是能看到最新的資料?
傳統的快取失效方法,例如設定過期時間或定期清除快取,效率低下與不夠精確。本文將介紹一種更優雅與高效的解決方案:利用 PostgreSQL 的 LISTEN
/NOTIFY
功能,結合 asyncpg
這個高效能的非同步 PostgreSQL 驅動程式,以及 Sanic 框架,開發一個反應迅速與穩健的快取失效系統。
Asyncpg:高效能的非同步 PostgreSQL 驅動程式
asyncpg
是一個專為 Python 設計的非同步 PostgreSQL 驅動程式,它以其卓越的效能和簡潔的 API 而聞名。在高併發的場景下,asyncpg
能夠有效地利用系統資源,最大程度地提升資料函式庫操作的效率。
以下是一個使用 asyncpg
進行資料函式庫操作的簡單範例:
import asyncio
import asyncpg
import datetime
class Database:
async def __aenter__(self):
self.conn = await asyncpg.connect(user='user', password='password', database='database', host='127.0.0.1')
return self.conn
async def __aexit__(self, exc_type, exc, tb):
await self.conn.close()
async def demo():
async with Database() as conn:
# ... (資料函式庫操作)
這個 Database
類別利用非同步的 __aenter__
和 __aexit__
方法,簡化了資料函式庫連線的管理,確保連線在使用後能被正確關閉。
PostgreSQL 的 LISTEN/NOTIFY:即時通知機制
PostgreSQL 提供了 LISTEN
/NOTIFY
機制,允許應用程式監聽特定的通道,並在其他應用程式或資料函式庫觸發 NOTIFY
命令時收到通知。這個機制非常適合用於構建快取失效系統。
import asyncio
import asyncpg
# ... (Database 類別定義)
async def invalidate_cache(conn, channel, payload):
print(f"收到快取失效通知: {payload}")
# 在此處執行快取失效邏輯,例如刪除 Redis 中的對應鍵值
async def listen_for_changes(conn):
await conn.add_listener('cache_invalidation', invalidate_cache)
async def main():
async with Database() as conn:
await listen_for_changes(conn)
# 模擬資料函式庫更新,並傳送通知
await conn.execute("NOTIFY cache_invalidation, 'users'")
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
這段程式碼展示瞭如何使用 asyncpg
的 add_listener
方法監聽名為 cache_invalidation
的通道。當資料函式庫執行 NOTIFY cache_invalidation, 'users'
命令時,invalidate_cache
函式就會被觸發,並執行快取失效邏輯。payload
引數可以攜帶額外的資訊,例如更新的資料表名稱。
整合 Sanic 框架:構建完整的快取失效系統
將上述程式碼整合到 Sanic 框架中,我們就可以構建一個完整的快取失效系統。在 Sanic 應用啟動時,我們可以啟動一個後台任務,持續監聽 PostgreSQL 的通知。
graph LR G[G] subgraph Sanic 應用 A[API 請求] --> B[查詢快取]; B -- 快取未命中 --> C[查詢資料函式庫]; C --> D[更新快取]; D --> E[回傳結果]; end subgraph PostgreSQL 資料函式庫 F[資料更新] --> G{觸發 NOTIFY 命令}; end G --> H[Asyncpg 監聽器]; H --> I[失效快取];
圖表説明: 此流程圖展示了 Sanic 應用、PostgreSQL 資料函式庫和快取失效機制之間的互動流程。當資料函式庫更新時,會觸發 NOTIFY
命令,asyncpg
監聽器收到通知後,就會執行快取失效邏輯。
sequenceDiagram participant Sanic應用 participant 快取 participant PostgreSQL Sanic應用->>快取: 查詢資料 alt 快取命中 快取->>Sanic應用: 回傳資料 else 快取未命中 Sanic應用->>PostgreSQL: 查詢資料函式庫 PostgreSQL->>Sanic應用: 回傳資料 Sanic應用->>快取: 更新快取 end PostgreSQL->>PostgreSQL: 資料更新 PostgreSQL->>Sanic應用: 傳送 NOTIFY 通知 Sanic應用->>快取: 失效快取
圖表説明: 這個循序圖更詳細地展示了快取失效的流程。當資料函式庫更新後,會傳送 NOTIFY
通知給 Sanic 應用,Sanic 應用收到通知後,會立即失效快取中的對應資料。
我選擇使用 PostgreSQL 的 LISTEN
/NOTIFY
機制,是因為它提供了即時的通知功能,並且與 asyncpg
整合。相比於其他方案,例如使用訊息佇列,這種方案更加輕量級,也更容易實作。
未來,可以考慮將快取失效邏輯更細粒度化,例如只失效受影響的資料,而不是整個快取。此外,也可以探索更進階的快取策略,例如使用不同的快取層級,以進一步提升應用程式的效能。
我認為,這個根據 asyncpg
和 PostgreSQL 通知功能的快取失效方案,能夠有效地解決資料一致性問題,並顯著提升網路應用程式的效能。它適用於各種規模的專案,並且易於擴充套件和維護。