在高併發的網路應用程式中,資料函式庫快取是提升效能的關鍵。然而,維護快取資料的一致性卻是一大挑戰。當資料函式庫資s料更新時,如何有效地使快取失效,避擴音供過時的資訊?本文將探討如何利用 asyncpg
這個高效能的 PostgreSQL 非同步客戶端函式庫,結合 PostgreSQL 的通知機制,開發一個自動化的快取失效方案,並以 Sanic 網路框架為例,展示其在實際應用中的優勢。
我發現,在建構高效能 Web 應用時,資料函式庫快取和失效策略的設計至關重要。傳統的快取失效方法,例如設定過期時間或定期清除,往往不夠精確與效率不高。而利用 PostgreSQL 的 LISTEN
/NOTIFY
機制,可以實作更精準、實時的快取失效。
asyncpg 監聽 PostgreSQL 事件:實作精準快取失效
asyncpg
提供了監聽 PostgreSQL 事件通知的功能,允許應用程式訂閲特定資料表的變更事件。當資料函式庫中發生資料更新、插入或刪除操作時,PostgreSQL 會傳送通知給所有已訂閲的客戶端,讓它們可以立即更新或失效其快取。
import asyncpg
from sanic import Sanic
from sanic.response import json
import model # 假設 model.py 包含資料函式庫操作函式
app = Sanic(__name__)
# ..程式碼...
@app.listener('before_server_start')
async def setup_database_listener(app, loop):
app.pool = await asyncpg.create_pool(...) # 建立資料函式庫連線池
await app.pool.add_listener("patron_updated", handle_cache_invalidation)
async def handle_cache_invalidation(connection, pid, channel, payload):
# 這個函式是 PostgreSQL 通知事件的處理器。
# connection: 資料函式庫連線物件
# pid: 傳送通知的 PostgreSQL 後端程式 ID
# channel: 通知頻道名稱,此處為 "patron_updated"
# payload: 通知訊息的有效負載,包含需要失效的快取鍵值,例如更新的資料 ID
invalidated_id = int(payload)
print(f"收到通知,ID: {invalidated_id} 的快取已失效")
# 從快取中移除對應 ID 的資料
del app.cache[invalidated_id] # app.cache 為應用程式的快取物件
# ...其他程式碼...
@app.route("/patron/<id:int>", methods=["PUT"])
async def update_patron(request, id):
# ... 更新資料函式庫邏輯 ...
# 傳送通知,觸發快取失效
await app.pool.execute("NOTIFY patron_updated, $1", str(id))
return json({"msg": "ok"})
# ...其他程式碼...
這段程式碼展示瞭如何使用 asyncpg
監聽 PostgreSQL 的通知,並在收到通知後失效快取。setup_database_listener
函式在伺服器啟動時建立資料函式庫連線池,並使用 add_listener
方法訂閲 patron_updated
頻道。handle_cache_invalidation
函式則作為事件處理器,從通知的 payload
中提取失效的 ID,並從應用程式的快取中移除對應的資料。update_patron
函式在更新資料函式庫後,使用 NOTIFY
命令向 patron_updated
頻道傳送通知,觸發快取失效。
sequenceDiagram participant Client participant Sanic App participant Cache participant PostgreSQL Client->>Sanic App: PUT /patron/123 Sanic App->>PostgreSQL: 更新資料 PostgreSQL-->>Sanic App: 更新完成 Sanic App->>PostgreSQL: NOTIFY patron_updated, 123 PostgreSQL->>Sanic App: 通知傳送 Sanic App->>Cache: 失效 ID 123 的快取 Sanic App-->>Client: 回應
圖表説明: 此序列圖展示了客戶端更新資料、伺服器更新資料函式庫、傳送通知和失效快取的完整流程。
graph LR subgraph 應用程式 A[API 請求] --> B{快取命中?} B -- 命中 --> C[傳回快取資料] B -- 未命中 --> D[查詢資料函式庫] D --> E[更新快取] E --> F[傳回資料函式庫資料] end subgraph PostgreSQL G[資料函式庫更新] --> H[傳送通知 NOTIFY] end H --> I[失效快取] C --> A F --> A
圖表説明: 此流程圖展示了 API 請求、快取機制和資料函式庫通知之間的互動關係。當 API 請求到達時,首先檢查快取。如果快取命中,則直接傳回快取資料;如果快取未命中,則查詢資料函式庫,並將結果更新到快取後傳回。當資料函式庫更新時,會傳送通知觸發快取失效。
透過 asyncpg
和 PostgreSQL 的通知機制,我們可以實作更精準、實時的快取失效,避擴音供過時資料,提升應用程式效能和使用者經驗。這種方法尤其適用於高併發、資料更新頻繁的場景。我認為,這種根據事件的快取失效策略,是現代 Web 應用程式提升效能的利器。
現代網路應用中,資料函式庫的即時互動至關重要。本文將探討如何結合 asyncpg
函式庫和 PostgreSQL 的非同步通知功能,開發高效能的資料函式庫互動。我將以圖書館使用者系統為例,示範如何利用非同步通知機制在資料變更時即時更新應用程式的快取,避免頻繁輪詢資料函式庫,從而提升系統效率。
graph LR B[B] D[D] A[客戶端請求] --> B{API 端點}; B --> C[資料函式庫查詢]; C --> D{快取命中?}; D -- 是 --> E[回傳快取資料]; E --> F[客戶端回應]; D -- 否 --> G[資料函式庫查詢]; G --> H[更新快取]; H --> E; subgraph "PostgreSQL 非同步通知" I[資料函式庫觸發器] --> J[通知通道]; J --> K[asyncpg 監聽器]; K --> H; end
圖表説明: 此圖展示了客戶端請求資料的流程,以及 PostgreSQL 非同步通知如何更新快取。
API 伺服器架構
我們使用 Sanic 框架搭建 API 伺服器。add_route()
方法將 /patron/<id:int>
URL 的請求傳送到 PatronAPI
類別型 view。PatronAPI
類別中的方法決定如何處理不同型別的 HTTP 請求,例如 GET
請求會呼叫 PatronAPI.get()
方法。
# server.py
from sanic import Sanic
from views import PatronAPI
app = Sanic(__name__)
app.add_route(PatronAPI.as_view(), '/patron/<id:int>')
這段程式碼建立 Sanic 應用程式,並將 PatronAPI
類別型 view 繫結到 /patron/<id:int>
路徑,使所有針對此路徑的 HTTP 請求都由 PatronAPI
處理。
資料函式庫模型與操作
資料函式庫模型定義了 patron
表的結構,包含 id
、name
和 fav_dish
等欄位。以下程式碼展示了資料函式庫的增刪改查操作,以及如何利用 boltons
函式庫的 LRU
快取提升效能。
# model.py
import logging
from json import loads, dumps
from boltons.cacheutils import LRU
# ... (其他程式碼)
CACHE = LRU(max_size=65536)
async def get_patron(conn, id: int) -> dict:
if id not in CACHE:
# ... (從資料函式庫讀取資料並更新快取)
return CACHE.get(id)
# ... (其他資料函式庫操作函式)
這段程式碼定義了 CACHE
,一個 LRU 快取,用於儲存使用者資料。get_patron
函式首先檢查快取中是否存在指定 ID 的使用者資料;若存在,直接傳回;否則,從資料函式庫讀取資料,更新快取後傳回。
非同步通知機制
PostgreSQL 的 LISTEN
/NOTIFY
機制允許應用程式訂閲特定通道,並在資料函式庫發生變更時收到通知。我們利用此機制,在 patron
表資料變更時,透過 chan_patron
通道傳送通知,觸發 db_event
回呼函式更新應用程式快取。
# model.py
# ... (其他程式碼)
def db_event(conn, pid, channel, payload):
event = loads(payload)
# ... (根據事件型別更新快取)
async def create_table_if_missing(conn):
# ... (建立表格和觸發器)
db_event
函式是 asyncpg
的事件回呼函式。當資料函式庫傳送通知時,asyncpg
會呼叫此函式。函式會解析通知的 payload,並根據事件型別(INSERT、UPDATE 或 DELETE)更新快取。create_table_if_missing
函式負責建立資料表和觸發器,確保通知機制正常運作。
sequenceDiagram participant Client participant API participant Cache participant Database Client->>API: 請求資料 API->>Cache: 檢查快取 Cache-->>API: 快取未命中 API->>Database: 查詢資料函式庫 Database-->>API: 傳回資料 API->>Cache: 更新快取 API-->>Client: 傳回資料 Database->>Channel: 傳送通知 (資料變更) Channel->>API: 觸發 db_event API->>Cache: 更新快取
圖表説明: 此序列圖展示了資料請求流程以及非同步通知如何觸發快取更新。
asyncio 的學習心得與反思
學習 Python 的 asyncio 框架並不容易,即使我擁有近二十年的 Python 經驗,與先前使用過 Twisted 和 Tornado 等根據事件的程式設計框架。我發現 asyncio 的 API 比預期中複雜,部分原因是缺乏高階檔案。
現在,在花時間學習 asyncio 之後,我感覺更自在,相信你的學習歷程也會類別似。asyncio 的 API 設計背後有著一致的結構和目標。asyncio 的 API 龐大與細粒度,因為它同時滿足了框架設計者和終端使用者的需求。這表示我們作為終端使用者開發者,必須學習 API 的哪些部分適用於我們,哪些不適用。隨著時間推移,隨著 asyncio 的第三方函式庫生態系統的發展和成熟,我們可能會發現自己更多地使用這些函式庫的 API,而不是原始的 asyncio 標準函式庫 API。現有的函式庫,如 aiohttp 和 Sanic 就是很好的例子。
我建議學習 asyncio 的最佳方法是:實驗、嘗試,並享受其中的樂趣。
透過結合 Sanic、asyncpg
和 PostgreSQL 的非同步通知功能,我們可以開發高效能的資料函式庫互動系統。非同步通知機制避免了頻繁的資料函式庫輪詢,有效降低系統負載,提升應用程式回應速度。這個案例展示瞭如何利用非同步程式設計和資料函式庫特性最佳化系統效能,並提供更佳的使用者經驗。
在現代軟體開發中,高效的平行處理是提升應用程式效能的關鍵。我發現結合 Python 的 asyncio 協程和 PostgreSQL 資料函式庫的非同步通知機制,可以開發出回應迅速與資源利用率高的應用程式。本文將探討這兩種技術的結合,並提供實用的程式碼範例和詳細的解説。
asyncio 協程:解決競爭條件的利器
在多執行緒環境下,競爭條件是一個常見的難題。當多個執行緒同時存取和修改分享資源時,可能會導致資料不一致或程式當機。我以餐廳餐具管理為例,説明如何使用 asyncio 協程來避免這種問題。
想像一下多個機器人服務生同時為多張餐桌準備和清理餐具的場景。若沒有適當的同步機制,多個機器人可能會同時修改餐具庫存,造成庫存數量錯誤。
asyncio 提供了優雅的解決方案。它允許以單執行緒的方式執行多個協程,並透過 await
關鍵字精確控制協程的切換時機,確保一次只有一個協程修改分享資源。
import asyncio
from attr import attrs, attrib
@attrs
class Cutlery:
knives = attrib(default=0)
forks = attrib(default=0)
def give(self, to: 'Cutlery', knives=0, forks=0):
self.change(-knives, -forks)
to.change(knives, forks)
def change(self, knives, forks):
self.knives += knives
self.forks += forks
class CoroBot():
def __init__(self):
self.cutlery = Cutlery(knives=0, forks=0)
self.tasks = asyncio.Queue()
async def manage_table(self):
while True:
task = await self.tasks.get()
if task == 'prepare table':
kitchen.give(to=self.cutlery, knives=4, forks=4)
elif task == 'clear table':
self.cutlery.give(to=kitchen, knives=4, forks=4)
elif task == 'shutdown':
return
kitchen = Cutlery(knives=100, forks=100)
bots = [CoroBot() for i in range(10)]
import sys
for b in bots:
for i in range(int(sys.argv[1])):
b.tasks.put_nowait('prepare table')
b.tasks.put_nowait('clear table')
b.tasks.put_nowait('shutdown')
print('Kitchen inventory before service:', kitchen)
loop = asyncio.get_event_loop()
tasks = []
for b in bots:
t = loop.create_task(b.manage_table())
tasks.append(t)
task_group = asyncio.gather(*tasks)
loop.run_until_complete(task_group)
print('Kitchen inventory after service:', kitchen)
Cutlery
類別代表餐具,CoroBot
類別模擬機器人服務生。每個 CoroBot
有一個 tasks
佇列接收指令。manage_table
協程不斷從佇列取指令執行。await self.tasks.get()
是關鍵,它暫停當前協程,讓其他協程執行,確保一次只有一個協程修改 kitchen
物件,避免競爭條件。
sequenceDiagram participant CoroBot1 participant CoroBot2 participant Kitchen CoroBot1->>Kitchen: prepare table (await) activate Kitchen Kitchen-->>CoroBot1: 餐具減少 deactivate Kitchen CoroBot2->>Kitchen: prepare table (await) activate Kitchen Kitchen-->>CoroBot2: 餐具減少 deactivate Kitchen CoroBot1->>Kitchen: clear table (await) activate Kitchen Kitchen-->>CoroBot1: 餐具增加 deactivate Kitchen CoroBot2->>Kitchen: clear table (await) activate Kitchen Kitchen-->>CoroBot2: 餐具增加 deactivate Kitchen
圖表説明: 序列圖清楚地展示了 CoroBot 如何與 Kitchen 互動,以及 await
如何確保每個操作的原子性,防止競爭條件。
PostgreSQL 非同步通知:即時掌握資料函式庫變更
PostgreSQL 的非同步通知機制讓應用程式能即時回應資料函式庫的變更。結合 asyncpg 函式庫和資料函式庫觸發器,可以實作高效的非同步通知處理。
建立通知觸發器
以下 SQL 程式碼建立一個觸發器函式,在資料變更時傳送通知:
CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $$
DECLARE
payload JSON;
BEGIN
payload := json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'data', CASE
WHEN TG_OP = 'INSERT' THEN row_to_json(NEW)
WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
WHEN TG_OP = 'UPDATE' THEN json_build_object(
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'diff', hstore_to_json(hstore(NEW) - hstore(OLD))
)
END
);
PERFORM pg_notify('data_change', payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
觸發器函式 notify_trigger()
在資料表發生 INSERT、UPDATE 或 DELETE 操作時觸發,構建包含變更資訊的 JSON payload
,並透過 pg_notify()
傳送到 data_change
通道。
啟用 hstore 並連線觸發器
CREATE EXTENSION IF NOT EXISTS hstore;
CREATE TRIGGER patrons_notify_trigger
AFTER INSERT OR UPDATE OR DELETE ON patrons
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();
啟用 hstore
擴充功能以計算資料差異。接著,將觸發器 patrons_notify_trigger
連結到 patrons
表,使其在資料變更時觸發 notify_trigger()
函式。
Python 程式碼:監聽 PostgreSQL 通知
import asyncio
import asyncpg
import json
async def listen_for_notifications(conn):
await conn.add_listener('data_change', handle_notification)
async def handle_notification(conn, pid, channel, payload):
data = json.loads(payload)
print(f"Received notification on channel {channel}: {data}")
async def main():
conn = await asyncpg.connect(user='user', password='password', database='database', host='127.0.0.1')
await listen_for_notifications(conn)
# ... 其他程式碼 ...
await conn.close()
asyncio.run(main())
使用 asyncpg 連線到 PostgreSQL 資料函式庫,並透過 add_listener()
監聽 data_change
通道。handle_notification()
函式處理接收到的通知,解析 JSON payload
並印出資訊。
asyncio 協程和 PostgreSQL 非同步通知是構建高效能平行應用的利器。透過 asyncio,我們可以有效管理平行任務,避免競爭條件。PostgreSQL 的非同步通知機制則讓應用程式能即時回應資料函式庫變更,提升使用者經驗。 我相信,掌握這兩種技術將大幅提升您的程式開發能力。
在現代應用程式開發中,即時資料處理和反應速度至關重要。我發現結合 PostgreSQL 的 LISTEN
/NOTIFY
機制與 Python 的 asyncpg
函式庫,能有效地實作非同步資料函式庫通知,大幅提升應用程式的效率。
PostgreSQL 的 LISTEN
/NOTIFY
機制
PostgreSQL 提供了內建的 LISTEN
/NOTIFY
功能,允許伺服器端主動推播通知給客戶端。當資料函式庫特定事件發生時,例如資料表更新,伺服器會傳送通知到指定的通道。客戶端應用程式可以監聽這些通道,並在收到通知時觸發對應的動作。
Python asyncpg
函式庫
asyncpg
是一個高效能的 PostgreSQL 非同步客戶端函式庫,完美支援 LISTEN
/NOTIFY
機制。它允許 Python 應用程式以非同步的方式接收資料函式庫通知,無需輪詢或阻塞主執行緒。
程式碼實作範例
以下程式碼示範如何使用 asyncpg
監聽 PostgreSQL 的通知:
import asyncio
import asyncpg
async def handle_notification(connection, pid, channel, payload):
print(f"收到來自通道 {channel} 的通知:{payload}")
async def main():
conn = await asyncpg.connect(user='user', password='password', database='database', host='127.0.0.1')
await conn.add_listener("data_change", handle_notification)
print("開始監聽 data_change 通道...")
# 持續執行,等待通知
await asyncio.Future()
asyncio.run(main())
這段程式碼首先建立與 PostgreSQL 資料函式庫的非同步連線。接著,使用 add_listener()
方法註冊一個監聽器,指定要監聽的通道 “data_change” 以及收到通知時要執行的回呼函式 handle_notification()
。asyncio.Future()
讓程式持續執行,等待並處理來自資料函式庫的通知。
流程圖解
graph LR B[B] subgraph PostgreSQL 伺服器 A[資料表更新] --> B{觸發器} B --> C[NOTIFY 命令] C --> D[data_change 通道] end D --> E[asyncpg 監聽器] E --> F[應用程式邏輯]
圖表説明: 此流程圖展示了 PostgreSQL 通知機制與 asyncpg
的互動流程。當資料表更新時,觸發器會執行 NOTIFY
命令,將通知傳送到 data_change
通道。asyncpg
監聽器接收到通知後,會觸發應用程式邏輯進行處理。
技術優勢與應用場景
這個方法的優勢在於非同步處理,應用程式無需主動輪詢資料函式庫,就能即時回應資料函式庫的變化。我認為這在以下場景特別有用:
- 即時聊天應用程式: 當新訊息送達時,伺服器可以立即通知客戶端更新訊息列表。
- 股票交易平台: 當股票價格變動時,系統可以即時通知使用者。
- 物聯網應用: 當感測器資料更新時,系統可以觸發相應的動作。
透過結合 PostgreSQL 的 LISTEN
/NOTIFY
和 Python 的 asyncpg
函式庫,我們可以輕鬆地實作非同步資料函式庫通知,開發高效能與即時反應的應用程式。我建議在開發需要即時資料處理的應用程式時,優先考慮這個方案。
sequenceDiagram participant PostgreSQL participant asyncpg participant Application activate PostgreSQL PostgreSQL->>asyncpg: 傳送通知 (data_change) deactivate PostgreSQL activate asyncpg asyncpg->>Application: 呼叫 handle_notification() deactivate asyncpg activate Application Application->>Application: 執行應用程式邏輯 deactivate Application
圖表説明: 這個序列圖更詳細地展示了 asyncpg
如何處理 PostgreSQL 的通知,並觸發應用程式邏輯的執行流程。