與 PostgreSQL 資料函式庫互動:建立連線與執行查詢
首先,我們需要建立與 PostgreSQL 資料函式庫的連線,才能執行 SQL 查詢。asyncpg 提供了 connect()
協程用於建立連線,execute()
協程用於執行 SQL 陳述式,以及 fetch()
協程用於取得查詢結果。以下程式碼示範如何插入和查詢品牌資料:
import asyncpg
import asyncio
from asyncpg import Record
from typing import List
async def main():
conn = await asyncpg.connect(host='127.0.0.1',
port=5432,
user='postgres',
database='products',
password='password')
await conn.execute("INSERT INTO brand VALUES(DEFAULT, 'Levis')")
await conn.execute("INSERT INTO brand VALUES(DEFAULT, 'Seven')")
brand_query = 'SELECT brand_id, brand_name FROM brand'
results: List[Record] = await conn.fetch(brand_query)
for brand in results:
print(f'id: {brand["brand_id"]}, name: {brand["brand_name"]}')
await conn.close()
asyncio.run(main())
這段程式碼首先透過 asyncpg.connect()
建立與 PostgreSQL 資料函式庫的連線。接著,它依序插入兩個品牌 “Levis” 和 “Seven” 到 brand
表中。然後,它利用 conn.fetch()
執行 brand_query
查詢所有品牌資料,並將結果儲存在 results
變數中,這個變數的型別是 List[Record]
,其中 Record
物件類別似於字典,允許我們使用欄位名稱取值。最後,程式迴圈印出每個品牌的 ID 和名稱,並關閉資料函式庫連線。
非同步查詢:提升效能的關鍵
asyncpg 的 execute()
和 fetch()
都是協程,這讓我們能充分利用 asyncio 的 API 進行非同步操作,例如 asyncio.gather()
。非同步操作允許多個查詢同時執行,尤其在 I/O 密集型操作中,能大幅提升效能。
graph LR B[B] C[C] A[建立連線] --> B{執行查詢 1}; A --> C{執行查詢 2}; B --> D[處理結果 1]; C --> E[處理結果 2]; D --> F[結束]; E --> F;
這個流程圖清楚地展示了非同步查詢的執行流程。建立連線後,我們可以同時執行查詢 1 和查詢 2,不必等其中一個查詢完成才能執行另一個。這種平行處理方式顯著縮短了整體執行時間。
sequenceDiagram participant 使用者 participant 應用程式 participant 資料函式庫 使用者->>應用程式: 傳送請求 activate 應用程式 應用程式->>資料函式庫: 非同步查詢 1 應用程式->>資料函式庫: 非同步查詢 2 資料函式庫-->>應用程式: 傳回結果 1 資料函式庫-->>應用程式: 傳回結果 2 應用程式-->>使用者: 回應結果 deactivate 應用程式
此序列圖展現了使用者、應用程式和資料函式庫之間的互動流程。應用程式利用非同步查詢同時向資料函式庫傳送查詢 1 和查詢 2,並在收到兩個查詢的結果後,將結果回傳給使用者。這種非同步的處理方式有效地減少了使用者的等待時間。
我個人在開發高併發網路應用時,經常使用 asyncpg 搭配連線池來管理資料函式庫連線,有效地避免了頻繁建立和關閉連線的開銷,進一步提升了系統的整體效能和穩定性。
```python
import asyncio
import aiohttp
async def fetch_status(session, url, delay=0):
await asyncio.sleep(delay)
try:
async with session.get(url) as response:
return response.status
except aiohttp.ClientError as e:
print(f"網址 {url} 發生錯誤: {e}")
raise
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://www.example.com', 'https://www.google.com', 'https://不存在的網址.com']
tasks = [fetch_status(session, url, delay=i) for i, url in enumerate(urls)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
for task in done:
try:
result = task.result()
print(f"網址 {urls[tasks.index(task)]} 的狀態碼: {result}")
except Exception as e:
print(f"任務 {task} 丟擲異常: {e}")
for task in pending:
task.cancel()
print(f"任務 {task} 已取消")
asyncio.run(main())
這個程式碼範例示範了使用 asyncio.wait
並搭配 asyncio.FIRST_EXCEPTION
引數。當其中一個任務丟擲異常時,asyncio.wait
會立即傳回,並將已完成和未完成的任務分別放入 done
和 pending
集合中。程式碼接著取消所有未完成的任務,避免浪費資源。fetch_status
函式加入了錯誤處理,模擬網路請求可能發生的錯誤,並使用 delay
引數模擬不同任務的執行時間。
sequenceDiagram participant main participant task1 participant task2 participant task3 main->>task1: 建立 task1 (delay=0) main->>task2: 建立 task2 (delay=1) main->>task3: 建立 task3 (delay=2) main->>+asyncio.wait: 等待任務,return_when=FIRST_EXCEPTION alt task3 發生錯誤 task3-->>-asyncio.wait: 丟擲異常 main->>task2: 取消 task2 else task2 發生錯誤 task2-->>-asyncio.wait: 丟擲異常 main->>task3: 取消 task3 else task1 發生錯誤 task1-->>-asyncio.wait: 丟擲異常 main->>task2: 取消 task2 main->>task3: 取消 task3 else 其他情況 note right of main: 所有任務正常完成 end
使用 asyncio.wait
和 return_when=asyncio.FIRST_EXCEPTION
可以更有效地管理併發任務,特別是在需要及時處理錯誤並停止其他任務的情況下。
graph LR ALL_COMPLETED[ALL_COMPLETED] B[B] C[C] E[E] FIRST_EXCEPTION[FIRST_EXCEPTION] A[建立任務列表] --> B{使用 asyncio.wait 等待任務}; B -- FIRST_EXCEPTION --> C{第一個任務發生異常}; C --> D[取消剩餘任務]; B -- ALL_COMPLETED --> E{所有任務完成}; E --> F[處理結果];
import asyncio
import asyncpg
async def connect_to_database(user, password, database, host='127.0.0.1', port=5432):
conn = await asyncpg.connect(user=user, password=password,
database=database, host=host, port=port)
return conn
async def fetch_product_data(conn, product_ids):
query = "SELECT * FROM products WHERE id = ANY($1::int[])"
return await conn.fetch(query, product_ids)
async def main():
conn = await connect_to_database('user', 'password', 'products')
try:
product_ids = [1, 2, 3, 4, 5]
products = await fetch_product_data(conn, product_ids)
for product in products:
print(product)
finally:
await conn.close()
asyncio.run(main())
這段程式碼示範使用asyncpg
進行資料函式庫查詢。connect_to_database
函式建立與資料函式庫的連線。fetch_product_data
函式則執行 SQL 查詢,利用 ANY
運算元有效查詢多個產品 ID。main
函式中,我們取得產品資料並印出。finally
區塊確保資料函式庫連線關閉。
sequenceDiagram participant User participant Application participant Database User->>Application: 請求產品資料 activate Application Application->>Database: 傳送 SQL 查詢 (asyncpg) activate Database Database-->>Application: 回傳查詢結果 deactivate Database Application-->>User: 顯示產品資料 deactivate Application
graph LR B[B] C[C] A[建立資料函式庫連線] --> B{執行 SQL 查詢} B --> C{處理結果} C --> D[關閉連線]
透過 asyncpg
,我們可以有效利用非同步程式設計的優勢,提升資料函式庫查詢效率。以下是一些進階應用:
- 連線池: 建立連線池可以重複利用資料函式庫連線,減少建立連線的開銷。
asyncpg
提供了create_pool
函式來建立連線池。 - 交易管理:
asyncpg
支援交易操作,確保資料一致性。可以使用conn.transaction()
方法來管理交易。 - Prepared Statements: 使用 Prepared Statements 可以提高查詢效能,並防止 SQL 注入攻擊。
asyncpg
提供了豐富的功能和高度的靈活性,讓開發者能夠在 Python 的非同步環境中輕鬆操作 PostgreSQL 資料函式庫,開發高效能的應用程式。
藉由結合非同步程式設計和高效的資料函式庫驅動程式 asyncpg
,我們可以顯著提升應用程式在高併發情境下的效能。這對於需要處理大量資料函式庫操作的應用,例如電商平台、社群網路等,至關重要。
使用 Asyncpg 進行高效能 Python 資料函式庫查詢
身為一個技術工作者,我經常需要處理大量資料,而資料函式庫查詢的效率對應用程式的效能至關重要。Asyncpg 是一個絕佳的 PostgreSQL 非同步客戶端函式庫,它允許我們以非同步的方式執行資料函式庫操作,從而提升應用程式的反應速度和吞吐量。以下,我將分享一些使用 Asyncpg 進行高效能資料函式庫查詢的技巧和心得。
建立資料函式庫連線和 Schema
在開始任何資料函式庫操作之前,我們必須先建立與資料函式庫的連線。以下程式碼片段示範如何使用 Asyncpg 連線到 PostgreSQL 資料函式庫,並檢查伺服器版本:
import asyncpg
import asyncio
async def connect_to_database(user, password, database):
conn = await asyncpg.connect(user=user, password=password, database=database)
version = conn.get_server_version()
print(f"已連線 PostgreSQL 版本:{version}")
return conn
async def main():
conn = await connect_to_database('postgres', 'password', 'products')
await conn.close()
asyncio.run(main())
這個 connect_to_database
函式封裝了連線邏輯,接收使用者名稱、密碼和資料函式庫名稱作為引數,並回傳一個 Asyncpg 連線物件。main
函式示範瞭如何使用這個函式連線到名為 products
的資料函式庫。
建立連線後,我們需要定義資料函式庫 Schema。以下 圖表展示了產品資料函式庫的 Schema 設計:
erDiagram Brand { int brand_id PK text brand_name } Product { int product_id PK text product_name int brand_id FK } SKU { int sku_id PK int product_id FK int size_id FK int color_id FK } Size { int size_id PK text size_name } Color { int color_id PK text color_name } Brand ||--o{ Product : has Product ||--o{ SKU : has Size ||--o{ SKU : has Color ||--o{ SKU : has}}}}
這個實體關係圖 (ERD) 描述了產品、品牌、SKU、尺寸和顏色之間的關係。每個實體都有其主鍵,SKU 則透過外部索引鍵與產品、尺寸和顏色關聯。
接著,我們可以使用 SQL 陳述式建立這些表格:
CREATE_BRAND = """CREATE TABLE IF NOT EXISTS brand (brand_id SERIAL PRIMARY KEY, brand_name TEXT NOT NULL);"""
# ... 其他 SQL 建立表格陳述式 ...
async def create_tables(conn):
await conn.execute(CREATE_BRAND)
# ... 執行其他 SQL 建立表格陳述式 ...
async def main():
conn = await connect_to_database('postgres', 'password', 'products')
await create_tables(conn)
await conn.close()
asyncio.run(main())
create_tables
函式使用 conn.execute()
方法執行 SQL 建立表格陳述式。
執行資料函式庫查詢和操作
建立表格後,我們可以開始執行資料函式庫查詢和操作。以下程式碼示範如何插入產品資料:
INSERT_PRODUCT = """INSERT INTO product (product_name, brand_id) VALUES ($1, $2);"""
async def insert_product(conn, product_name, brand_id):
await conn.execute(INSERT_PRODUCT, product_name, brand_id)
# ...其他插入資料的函式...
insert_product
函式使用引數化查詢,有效防止 SQL 注入攻擊。
Asyncpg 也支援非同步的 fetch()
方法,用於查詢資料:
async def get_all_products(conn):
rows = await conn.fetch("SELECT * FROM product;")
return rows
get_all_products
函式查詢所有產品資料,並以列表形式回傳。
Asyncpg 提供了強大的非同步資料函式庫操作功能,能大幅提升 Python 應用程式的效能。透過結合非同步程式設計和 Asyncpg,我們可以更有效率地處理資料函式庫操作,開發更具回應性和高吞吐量的應用程式。