與 PostgreSQL 資料函式庫互動:建立連線與執行查詢
首先,我們需要建立與 PostgreSQL 資料函式庫的連線,才能執行 SQL 查詢。asyncpg 提供了 connect() 協程用於建立連線,execute() 協程用於執行 SQL 陳述式,以及 fetch() 協程用於取得查詢結果。以下程式碼示範如何插入和查詢品牌資料:
import asyncpg
import asyncio
from asyncpg import Record
from typing import List
async def main():
conn = await asyncpg.connect(host='127.0.0.1',
port=5432,
user='postgres',
database='products',
password='password')
await conn.execute("INSERT INTO brand VALUES(DEFAULT, 'Levis')")
await conn.execute("INSERT INTO brand VALUES(DEFAULT, 'Seven')")
brand_query = 'SELECT brand_id, brand_name FROM brand'
results: List[Record] = await conn.fetch(brand_query)
for brand in results:
print(f'id: {brand["brand_id"]}, name: {brand["brand_name"]}')
await conn.close()
asyncio.run(main())
這段程式碼首先透過 asyncpg.connect() 建立與 PostgreSQL 資料函式庫的連線。接著,它依序插入兩個品牌 “Levis” 和 “Seven” 到 brand 表中。然後,它利用 conn.fetch() 執行 brand_query 查詢所有品牌資料,並將結果儲存在 results 變數中,這個變數的型別是 List[Record],其中 Record 物件類別似於字典,允許我們使用欄位名稱取值。最後,程式迴圈印出每個品牌的 ID 和名稱,並關閉資料函式庫連線。
非同步查詢:提升效能的關鍵
asyncpg 的 execute() 和 fetch() 都是協程,這讓我們能充分利用 asyncio 的 API 進行非同步操作,例如 asyncio.gather()。非同步操作允許多個查詢同時執行,尤其在 I/O 密集型操作中,能大幅提升效能。
graph LR
B[B]
C[C]
A[建立連線] --> B{執行查詢 1};
A --> C{執行查詢 2};
B --> D[處理結果 1];
C --> E[處理結果 2];
D --> F[結束];
E --> F;
這個流程圖清楚地展示了非同步查詢的執行流程。建立連線後,我們可以同時執行查詢 1 和查詢 2,不必等其中一個查詢完成才能執行另一個。這種平行處理方式顯著縮短了整體執行時間。
sequenceDiagram
participant 使用者
participant 應用程式
participant 資料函式庫
使用者->>應用程式: 傳送請求
activate 應用程式
應用程式->>資料函式庫: 非同步查詢 1
應用程式->>資料函式庫: 非同步查詢 2
資料函式庫-->>應用程式: 傳回結果 1
資料函式庫-->>應用程式: 傳回結果 2
應用程式-->>使用者: 回應結果
deactivate 應用程式
此序列圖展現了使用者、應用程式和資料函式庫之間的互動流程。應用程式利用非同步查詢同時向資料函式庫傳送查詢 1 和查詢 2,並在收到兩個查詢的結果後,將結果回傳給使用者。這種非同步的處理方式有效地減少了使用者的等待時間。
我個人在開發高併發網路應用時,經常使用 asyncpg 搭配連線池來管理資料函式庫連線,有效地避免了頻繁建立和關閉連線的開銷,進一步提升了系統的整體效能和穩定性。
```python
import asyncio
import aiohttp
async def fetch_status(session, url, delay=0):
await asyncio.sleep(delay)
try:
async with session.get(url) as response:
return response.status
except aiohttp.ClientError as e:
print(f"網址 {url} 發生錯誤: {e}")
raise
async def main():
async with aiohttp.ClientSession() as session:
urls = ['https://www.example.com', 'https://www.google.com', 'https://不存在的網址.com']
tasks = [fetch_status(session, url, delay=i) for i, url in enumerate(urls)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
for task in done:
try:
result = task.result()
print(f"網址 {urls[tasks.index(task)]} 的狀態碼: {result}")
except Exception as e:
print(f"任務 {task} 丟擲異常: {e}")
for task in pending:
task.cancel()
print(f"任務 {task} 已取消")
asyncio.run(main())
這個程式碼範例示範了使用 asyncio.wait 並搭配 asyncio.FIRST_EXCEPTION 引數。當其中一個任務丟擲異常時,asyncio.wait 會立即傳回,並將已完成和未完成的任務分別放入 done 和 pending 集合中。程式碼接著取消所有未完成的任務,避免浪費資源。fetch_status 函式加入了錯誤處理,模擬網路請求可能發生的錯誤,並使用 delay 引數模擬不同任務的執行時間。
sequenceDiagram
participant main
participant task1
participant task2
participant task3
main->>task1: 建立 task1 (delay=0)
main->>task2: 建立 task2 (delay=1)
main->>task3: 建立 task3 (delay=2)
main->>+asyncio.wait: 等待任務,return_when=FIRST_EXCEPTION
alt task3 發生錯誤
task3-->>-asyncio.wait: 丟擲異常
main->>task2: 取消 task2
else task2 發生錯誤
task2-->>-asyncio.wait: 丟擲異常
main->>task3: 取消 task3
else task1 發生錯誤
task1-->>-asyncio.wait: 丟擲異常
main->>task2: 取消 task2
main->>task3: 取消 task3
else 其他情況
note right of main: 所有任務正常完成
end
使用 asyncio.wait 和 return_when=asyncio.FIRST_EXCEPTION 可以更有效地管理併發任務,特別是在需要及時處理錯誤並停止其他任務的情況下。
graph LR
ALL_COMPLETED[ALL_COMPLETED]
B[B]
C[C]
E[E]
FIRST_EXCEPTION[FIRST_EXCEPTION]
A[建立任務列表] --> B{使用 asyncio.wait 等待任務};
B -- FIRST_EXCEPTION --> C{第一個任務發生異常};
C --> D[取消剩餘任務];
B -- ALL_COMPLETED --> E{所有任務完成};
E --> F[處理結果];
import asyncio
import asyncpg
async def connect_to_database(user, password, database, host='127.0.0.1', port=5432):
conn = await asyncpg.connect(user=user, password=password,
database=database, host=host, port=port)
return conn
async def fetch_product_data(conn, product_ids):
query = "SELECT * FROM products WHERE id = ANY($1::int[])"
return await conn.fetch(query, product_ids)
async def main():
conn = await connect_to_database('user', 'password', 'products')
try:
product_ids = [1, 2, 3, 4, 5]
products = await fetch_product_data(conn, product_ids)
for product in products:
print(product)
finally:
await conn.close()
asyncio.run(main())
這段程式碼示範使用asyncpg 進行資料函式庫查詢。connect_to_database 函式建立與資料函式庫的連線。fetch_product_data 函式則執行 SQL 查詢,利用 ANY 運算元有效查詢多個產品 ID。main 函式中,我們取得產品資料並印出。finally 區塊確保資料函式庫連線關閉。
sequenceDiagram
participant User
participant Application
participant Database
User->>Application: 請求產品資料
activate Application
Application->>Database: 傳送 SQL 查詢 (asyncpg)
activate Database
Database-->>Application: 回傳查詢結果
deactivate Database
Application-->>User: 顯示產品資料
deactivate Application
graph LR
B[B]
C[C]
A[建立資料函式庫連線] --> B{執行 SQL 查詢}
B --> C{處理結果}
C --> D[關閉連線]
透過 asyncpg,我們可以有效利用非同步程式設計的優勢,提升資料函式庫查詢效率。以下是一些進階應用:
- 連線池: 建立連線池可以重複利用資料函式庫連線,減少建立連線的開銷。
asyncpg提供了create_pool函式來建立連線池。 - 交易管理:
asyncpg支援交易操作,確保資料一致性。可以使用conn.transaction()方法來管理交易。 - Prepared Statements: 使用 Prepared Statements 可以提高查詢效能,並防止 SQL 注入攻擊。
asyncpg 提供了豐富的功能和高度的靈活性,讓開發者能夠在 Python 的非同步環境中輕鬆操作 PostgreSQL 資料函式庫,開發高效能的應用程式。
藉由結合非同步程式設計和高效的資料函式庫驅動程式 asyncpg,我們可以顯著提升應用程式在高併發情境下的效能。這對於需要處理大量資料函式庫操作的應用,例如電商平台、社群網路等,至關重要。
使用 Asyncpg 進行高效能 Python 資料函式庫查詢
身為一個技術工作者,我經常需要處理大量資料,而資料函式庫查詢的效率對應用程式的效能至關重要。Asyncpg 是一個絕佳的 PostgreSQL 非同步客戶端函式庫,它允許我們以非同步的方式執行資料函式庫操作,從而提升應用程式的反應速度和吞吐量。以下,我將分享一些使用 Asyncpg 進行高效能資料函式庫查詢的技巧和心得。
建立資料函式庫連線和 Schema
在開始任何資料函式庫操作之前,我們必須先建立與資料函式庫的連線。以下程式碼片段示範如何使用 Asyncpg 連線到 PostgreSQL 資料函式庫,並檢查伺服器版本:
import asyncpg
import asyncio
async def connect_to_database(user, password, database):
conn = await asyncpg.connect(user=user, password=password, database=database)
version = conn.get_server_version()
print(f"已連線 PostgreSQL 版本:{version}")
return conn
async def main():
conn = await connect_to_database('postgres', 'password', 'products')
await conn.close()
asyncio.run(main())
這個 connect_to_database 函式封裝了連線邏輯,接收使用者名稱、密碼和資料函式庫名稱作為引數,並回傳一個 Asyncpg 連線物件。main 函式示範瞭如何使用這個函式連線到名為 products 的資料函式庫。
建立連線後,我們需要定義資料函式庫 Schema。以下 圖表展示了產品資料函式庫的 Schema 設計:
erDiagram
Brand {
int brand_id PK
text brand_name
}
Product {
int product_id PK
text product_name
int brand_id FK
}
SKU {
int sku_id PK
int product_id FK
int size_id FK
int color_id FK
}
Size {
int size_id PK
text size_name
}
Color {
int color_id PK
text color_name
}
Brand ||--o{ Product : has
Product ||--o{ SKU : has
Size ||--o{ SKU : has
Color ||--o{ SKU : has}}}}
這個實體關係圖 (ERD) 描述了產品、品牌、SKU、尺寸和顏色之間的關係。每個實體都有其主鍵,SKU 則透過外部索引鍵與產品、尺寸和顏色關聯。
接著,我們可以使用 SQL 陳述式建立這些表格:
CREATE_BRAND = """CREATE TABLE IF NOT EXISTS brand (brand_id SERIAL PRIMARY KEY, brand_name TEXT NOT NULL);"""
# ... 其他 SQL 建立表格陳述式 ...
async def create_tables(conn):
await conn.execute(CREATE_BRAND)
# ... 執行其他 SQL 建立表格陳述式 ...
async def main():
conn = await connect_to_database('postgres', 'password', 'products')
await create_tables(conn)
await conn.close()
asyncio.run(main())
create_tables 函式使用 conn.execute() 方法執行 SQL 建立表格陳述式。
執行資料函式庫查詢和操作
建立表格後,我們可以開始執行資料函式庫查詢和操作。以下程式碼示範如何插入產品資料:
INSERT_PRODUCT = """INSERT INTO product (product_name, brand_id) VALUES ($1, $2);"""
async def insert_product(conn, product_name, brand_id):
await conn.execute(INSERT_PRODUCT, product_name, brand_id)
# ...其他插入資料的函式...
insert_product 函式使用引數化查詢,有效防止 SQL 注入攻擊。
Asyncpg 也支援非同步的 fetch() 方法,用於查詢資料:
async def get_all_products(conn):
rows = await conn.fetch("SELECT * FROM product;")
return rows
get_all_products 函式查詢所有產品資料,並以列表形式回傳。
Asyncpg 提供了強大的非同步資料函式庫操作功能,能大幅提升 Python 應用程式的效能。透過結合非同步程式設計和 Asyncpg,我們可以更有效率地處理資料函式庫操作,開發更具回應性和高吞吐量的應用程式。