現在我們已經建立了PyPI資料模型,讓我們將其與FastAPI整合,建立一個完整的API應用。
設定FastAPI應用
首先,我們需要安裝必要的套件並設定基本應用結構:
pip install fastapi uvicorn
# main.py
import asyncio
from fastapi import FastAPI, Depends, HTTPException, Query
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from typing import List, Optional
from datetime import datetime
# 匯入我們的模型
from models import User, Package, Download
# 建立FastAPI應用
app = FastAPI(title="PyPI API Clone")
# 資料函式庫初始化
@app.on_event("startup")
async def startup_db_client():
client = AsyncIOMotorClient("mongodb://localhost:27017")
await init_beanie(
database=client.pypi_clone,
document_models=[User, Package, Download]
)
這段程式碼設定了一個基本的FastAPI應用:
首先匯入必要的模組,包括FastAPI相關元件、Beanie和Motor用於MongoDB連線,以及一些標準函式庫。
匯入我們前面定義的資料模型:
User
、Package
和Download
。建立一個FastAPI應使用案例項,設定標題為"PyPI API Clone"。
定義一個啟動事件處理函式
startup_db_client
,使用@app.on_event("startup")
裝飾器註冊。這確保在應用啟動時初始化資料函式庫連線。在啟動函式中:
- 建立一個Motor客戶端連線到本地MongoDB
- 使用
init_beanie
初始化Beanie,指定資料函式庫和檔案模型
這種設定方式確保了在處理第一個請求之前,資料函式庫連線已經準備就緒。使用FastAPI的事件系統是初始化資源的推薦方式,特別是對於非同步資源。
定義API路由
接下來,我們定義各種API端點:
# 套件相關端點
@app.get("/packages/", response_model=List[dict])
async def list_packages(
q: Optional[str] = None,
classifier: Optional[str] = None,
maintainer: Optional[str] = None,
sort: str = "name",
limit: int = Query(10, le=100),
skip: int = 0
):
"""列出套件,支援搜尋、過濾和分頁"""
query = {}
# 搜尋
if q:
query["$text"] = {"$search": q}
# 過濾
if classifier:
query["classifiers"] = classifier
if maintainer:
query["maintainers"] = maintainer
# 排序
sort_field = {
"name": "normalized_name",
"downloads": "-download_count",
"updated": "-last_updated"
}.get(sort, "normalized_name")
# 執行查詢
packages = await Package.find(query).sort(sort_field).skip(skip).limit(limit).to_list()
# 簡化回應
return [
{
"name": pkg.name,
"summary": pkg.summary,
"latest_version": pkg.latest_version,
"maintainers": pkg.maintainers,
"download_count": pkg.download_count
}
for pkg in packages
]
@app.get("/packages/{package_name}")
async def get_package(package_name: str):
"""取得特定套件的詳細訊息"""
package = await Package.find_one(Package.normalized_name == package_name.lower())
if not package:
raise HTTPException(status_code=404, detail="Package not found")
return package
@app.get("/packages/{package_name}/releases")
async def get_package_releases(package_name: str):
"""取得套件的所有發布版本"""
package = await Package.find_one(Package.normalized_name == package_name.lower())
if not package:
raise HTTPException(status_code=404, detail="Package not found")
return package.releases
@app.get("/packages/{package_name}/releases/{version}")
async def get_release(package_name: str, version: str):
"""取得特定版本的詳細訊息"""
package = await Package.find_one(Package.normalized_name == package_name.lower())
if not package:
raise HTTPException(status_code=404, detail="Package not found")
release = package.releases.get(version)
if not release:
raise HTTPException(status_code=404, detail="Release not found")
return release
這段程式碼定義了幾個與套件相關的API端點:
列出套件 (
/packages/
):- 支援多種查詢引數:文字搜尋(
q
)、分類別過濾(classifier
)、維護者過濾(maintainer
) - 支援排序(
sort
):按名稱、下載量或更新時間 - 支援分頁:使用
skip
和limit
引數 - 回傳簡化的套件資訊列表,而非完整檔案
- 支援多種查詢引數:文字搜尋(
取得套件詳情 (
/packages/{package_name}
):- 根據套件名稱查詢特定套件
- 使用標準化名稱(小寫)進行不區分大小寫的查詢
- 如果找不到套件,回傳404錯誤
取得套件版本列表 (
/packages/{package_name}/releases
):- 回傳特定套件的所有發布版本
- 這些版本儲存在套件檔案的
releases
字典中
取得特定版本詳情 (
/packages/{package_name}/releases/{version}
):- 回傳特定套件特定版本的詳細資訊
- 如果找不到套件或版本,回傳404錯誤
這些API端點展示了FastAPI與Beanie的無縫整合:
- 使用非同步函式處理請求,與Beanie的非同步操作完美配合
- 使用路徑引數和查詢引數接收輸入
- 使用
response_model
指定回應格式 - 使用
HTTPException
處理錯誤情況
使用者相關端點
# 使用者相關端點
@app.get("/users/{username}")
async def get_user(username: str):
"""取得使用者訊息"""
user = await User.find_one(User.username == username)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.get("/users/{username}/packages")
async def get_user_packages(username: str):
"""取得使用者維護的套件"""
packages = await Package.find(Package.maintainers.contains(username)).to_list()
return packages
這段程式碼定義了兩個與使用者相關的API端點:
取得使用者資訊 (
/users/{username}
):- 根據使用者名稱查詢特定使用者
- 如果找不到使用者,回傳404錯誤
- 回傳完整的使用者檔案
取得使用者維護的套件 (
/users/{username}/packages
):- 查詢特定使用者維護的所有套件
- 使用
contains
方法在maintainers
陣列中查詢使用者名稱 - 回傳套件列表
這些端點展示瞭如何處理MongoDB中的關聯查詢。在關聯式資料函式庫中,這類別查詢通常需要JOIN操作,而在MongoDB中,我們可以:
- 使用陣列欄位(
maintainers
)儲存多對多關係 - 使用
contains
方法直接查詢陣列中的值 - 避免複雜的JOIN操作,提高查詢效能
統計相關端點
# 統計相關端點
@app.get("/stats/top_packages")
async def get_top_packages(limit: int = 10):
"""取得最受歡迎的套件"""
top_packages = await Package.find().sort(-Package.download_count).limit(limit).to_list()
return [
{
"name": pkg.name,
"download_count": pkg.download_count,
"latest_version": pkg.latest_version
}
for pkg in top_packages
]
@app.get("/stats/recent_updates")
async def get_recent_updates(limit: int = 10):
"""取得最近更新的套件"""
recent_packages = await Package.find().sort(-Package.last_updated).limit(limit).to_list()
return [
{
"name": pkg.name,
"last_updated": pkg.last_updated,
"latest_version": pkg.latest_version
}
for pkg in recent_packages
]
這段程式碼定義了兩個統計相關的API端點:
最受歡迎的套件 (
/stats/top_packages
):- 根據下載量排序套件,回傳下載量最高的前N個
- 使用
sort(-Package.download_count)
進行降序排序 - 使用
limit(limit)
限制結果數量 - 回傳簡化的套件資訊,只包含名稱、下載量和最新版本
最近更新的套件 (
/stats/recent_updates
):- 根據更新時間排序套件,回傳最近更新的前N個
- 使用
sort(-Package.last_updated)
進行降序排序 - 同樣使用
limit
引數控制回傳數量 - 回傳包含更新時間的簡化套件資訊
這些統計端點展示了MongoDB的強大排序和分頁能力。透過適當的索引(如在download_count
和last_updated
欄位上),這些查詢可以非常高效,即使在包含大量檔案的集合中也能快速回傳結果。
資料操作端點
最後,我們增加一些用於資料操作的端點:
from fastapi import Body
from pydantic import BaseModel
class ReleaseCreate(BaseModel):
version: str
summary: Optional[str] = None
description: Optional[str] = None
requires_dist: List[str] = []
requires_python: Optional[str] = None
@app.post("/packages/")
async def create_package(
name: str = Body(...),
summary: Optional[str] = Body(None),
author: Optional[str] = Body(None),
maintainers: List[str] = Body([])
):
"""建立新套件"""
# 檢查套件是否已存在
existing = await Package.find_one(Package.normalized_name == name.lower())
if existing:
raise HTTPException(status_code=400, detail="Package already exists")
# 建立新套件
package = Package(
name=name,
normalized_name=name.lower(),
summary=summary,
author=author,
maintainers=maintainers
)
await package.insert()
return package
@app.post("/packages/{package_name}/releases")
async def add_release(package_name: str, release: ReleaseCreate):
"""增加新版本"""
package = await Package.find_one(Package.normalized_name == package_name.lower())
if not package:
raise HTTPException(status_code=404, detail="Package not found")
# 檢查版本是否已存在
if release.version in package.releases:
raise HTTPException(status_code=400, detail="Version already exists")
# 增加新版本
package.releases[release.version] = Release(
version=release.version,
released_at=datetime.now(),
summary=release.summary,
description=release.description,
requires_dist=release.requires_dist
)
# 更新最新版本(如果適用)
if not package.latest_version or release.version > package.latest_version:
package.latest_version = release.version
package.last_updated = datetime.now()
await package.save()
return package.releases[release.version]
@app.post("/downloads/record")
async def record_download(
package_name: str = Body(...),
version: str = Body(...),
file_name: str = Body(...),
ip_address: Optional[str] = Body(None),
user_agent: Optional[str] = Body(None)
):
"""記錄下載"""
# 檢查套件是否存在
package = await Package.find_one(Package.normalized_name == package_name.lower())
if not package:
raise HTTPException(status_code=404, detail="Package not found")
# 記錄下載
download = Download(
package_name=package_name,
version=version,
file_name=file_name,
ip_address=ip_address,
user_agent=user_agent
)
await download.insert()
# 更新套件下載計數(使用原子更新)
await Package.find_one(
Package.normalized_name == package_name.lower()
).update({"$inc": {"download_count": 1}})
return {"status": "success"}
這段程式碼定義了三個用於資料操作的API端點:
建立新套件 (
POST /packages/
):- 接收套件基本資訊:名稱、摘要、作者和維護者列表
- 檢查套件是否已存在,避免重複
- 建立新的
Package
物件並儲存到資料函式庫 - 回傳建立的套件物件
增加新版本 (
POST /packages/{package_name}/releases
):- 接收一個
ReleaseCreate
物件,包含版本資訊 - 檢查套件是否存在,以及版本是否已存在
- 建立新的
Release
物件並增加到套件的releases
字典中 - 更新套件的
latest_version
和last_updated
欄位 - 回傳建立的版本物件
- 接收一個
記錄下載 (
POST /downloads/record
):- 接收下載資訊:套件名稱、版本、檔案名稱等
- 檢查套件是否存在
- 建立新的
Download
物件記錄下載事件 - 使用MongoDB的原子更新操作增加套件的下載計數
- 回傳成功狀態
這些端點展示了FastAPI與Beanie結合處理資料操作的方式:
- 使用
Body
引數接收請求體中的資料 - 使用Pydantic模型(如
ReleaseCreate
)驗證輸入資料 - 使用Beanie的
insert
、save
和update
方法操作資料函式庫 - 使用MongoDB的原子更新操作(如
$inc
)確保資料一致性
啟動應用
最後,我們可以使用Uvicorn啟動應用:
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
這個FastAPI應用展示瞭如何將Beanie模型與Web API整合,提供了一個功能完整的PyPI API克隆。主要特點包括:
- 非同步資料函式庫操作,提高效能
- 使用Pydantic模型進行資料驗證
- 豐富的查詢功能,包括文字搜尋、過濾和排序
- 原子更新操作,確保資料一致性
- 完整的RESTful API設計
MongoDB效能最佳化:從慢到快
MongoDB雖然強大,但並非自動就能達到最佳效能。在本文中,我們將探討如何最佳化MongoDB查詢,使應用從「慢」變「快」。
索引最佳化
索引是提升查詢效能的最重要工具:
# 在Beanie模型中定義索引
class Package(Document):
name: Indexed(str, unique=True)
normalized_name: Indexed(str, unique=True)
# 其他欄位...
class Settings:
name = "packages"
indexes = [
"keywords", # 單欄位索引
"classifiers",
"maintainers",
[("normalized_name", "text"), ("summary", "text"), ("description", "text")], # 複合全文索引
[("download_count", -1)], # 排序索引
[("last_updated", -1)]
]
這段程式碼展示瞭如何在Beanie模型中定義索引:
單欄位索引:
- 使用
Indexed
裝飾器直接在欄位定義中建立索引:name: Indexed(str, unique=True)
- 在
Settings
類別中使用字串列表定義簡單索引:"keywords", "classifiers", "maintainers"
- 使用
複合索引:
- 使用巢狀列表定義複合索引:
[("normalized_name", "text"), ("summary", "text"), ("description", "text")]
- 這建立了一個全文索引,涵蓋三個欄位
- 使用巢狀列表定義複合索引:
排序索引:
- 為經常用於排序的欄位建立索引:
[("download_count", -1)]
-1
表示降序索引,適合sort(-Package.download_count)
這樣的查詢
- 為經常用於排序的欄位建立索引:
索引最佳化策略:
- 為常用查詢建立索引:分析應用中最常用的查詢,為這些查詢的條件欄位建立索引
- 複合索引:對於多欄位查詢,建立複合索引比多個單欄位索引更有效
- 索引順序:在複合索引中,將高基數(唯一值多)的欄位放在前面
- 覆寫索引:設計索引使查詢只需要索引中的欄位,不需要讀取檔案
- 避免過多索引:索引會佔用空間並減慢寫入操作,只為必要的查詢建立索引
查詢最佳化查詢本身也很重要:
# 最佳化前
all_packages = await Package.find().to_list()
python_packages = [p for p in all_packages if "Python" in p.classifiers]
# 最佳化後
python_packages = await Package.find(
Package.classifiers.contains("Python")
).to_list()
# 只取得名稱和摘要
packages = await Package.find().project({"name": 1, "summary": 1}).to_list()
# 分頁查詢
page = 1
page_size = 20
packages = await Package.find().skip((page - 1) * page_size).limit(page_size).to_list()
# 取得每個維護者的套件數量
maintainer_stats = await Package.find().aggregate([
{"$unwind": "$maintainers"},
{"$group": {"_id": "$maintainers", "package_count": {"$sum": 1}}},
{"$sort": {"package_count": -1}}
]).to_list()
# 避免
packages = await Package.find({"name": {"$regex": ".*python.*"}}).to_list()
# 更好的方式
packages = await Package.find(
{"$text": {"$search": "python"}}
).to_list()
這段程式碼展示了多種查詢最佳化技巧:
使用資料函式庫過濾而非程式碼過濾:
- 不佳做法:先取得所有套件,再在Python中過濾
- 最佳做法:直接在查詢中使用
contains
方法過濾,讓資料函式庫完成過濾工作
使用投影(Projection):
- 使用
.project({"name": 1, "summary": 1})
只選擇需要的欄位 - 這減少了網路傳輸量和記憶體使用
- 使用
實作分頁:
- 使用
skip
和limit
實作基本分頁 - 這避免了一次性載入大量資料
- 使用
使用聚合管道:
- 使用MongoDB的聚合功能進行複雜資料處理
- 範例中計算每個維護者的套件數量,並按數量排序
避免低效的正規表示式查詢:
- 不佳做法:使用
$regex
進行全文搜尋,特別是使用.*
字首的正規表示式 - 最佳做法:使用全文索引和
$text
運算元進行搜尋
- 不佳做法:使用
查詢最佳化策略:
- 使用投影:只選擇需要的欄位
- 限制結果數量:使用
limit()
和分頁 - 使用聚合管道:對於複雜查詢,使用聚合管道可能更高效
- 避免正規表示式:盡量使用精確比對或字首比對
檔案結構最佳化
檔案結構也會影響效能:
# 在訂單中儲存產品名稱,避免每次都查詢產品集合
class OrderItem(BaseModel):
product_id: PydanticObjectId
product_name: str # 冗餘儲存
price: float
# 使用子檔案儲存版本訊息
releases: Dict[str, Release] = {} # 版本號 -> 發布訊息
這段程式碼展示了檔案結構最佳化的兩個關鍵技巧:
適當冗餘:
- 在
OrderItem
中儲存product_name
,即使這是來自Product
檔案的資訊 - 這種冗餘可以避免在查詢訂單時還需要額外查詢產品資訊
- 特別適合「讀多寫少」的場景,如訂單一旦建立很少修改
- 在
使用字典儲存關聯資料:
- 使用字典(
Dict[str, Release]
)而非列表儲存版本資訊 - 這使得按版本號查詢特定版本變得非常高效(O(1)時間複雜度)
- 適合需要頻繁按鍵查詢的場景
- 使用字典(
檔案結構最佳化策略:
- 避免過深巢狀:過深的巢狀會使查詢複雜與效率低下
- 適當冗餘:有時候適當的冗餘可以減少查詢次數
- 使用子檔案而非陣列:對於需要單獨查詢的專案,考慮使用子檔案而非陣列
批次操作
對於大量操作,使用批次方法:
# 單個插入
for package in packages:
await package.insert()
# 批次插入(更高效)
await Package.insert_many(packages)
這段程式碼對比了單個插入和批次插入的差異:
單個插入:
- 使用迴圈逐個插入檔案
- 每個插入操作都需要一次網路往往返
- 對於大量檔案,效能較差
批次插入:
- 使用
insert_many
一次性插入多個檔案 - 只需要一次網路往往返
- 大幅減少網路延遲和伺服器處理開銷
- 使用
批次操作不僅適用於插入,還適用於更新和刪除操作。MongoDB提供了update_many
和delete_many
等批次操作方法,可以大幅提高處理大量檔案的效能。
連線池最佳化資料函式庫連線:
# 設定連線池
client = AsyncIOMotorClient(
"mongodb://localhost:27017",
maxPoolSize=100, # 最大連線數
minPoolSize=10, # 最小連線數
maxIdleTimeMS=30000 # 連線最大閒置時間
)
這段程式碼展示瞭如何最佳化MongoDB連線池設定:
最大連線數(maxPoolSize):
- 設定連線池可以維護的最大連線數
- 應根據應用的並發需求和伺服器資源來設定
- 太小會限制並發處理能力,太大可能浪費資源
最小連線數(minPoolSize):
- 連線池中保持的最小連線數
- 有助於減少在流量突增時建立新連線的延遲
最大閒置時間(maxIdleTimeMS):
- 連線在閒置多長時間後被關閉
- 有助於釋放長時間不用的連線資源
連線池設定對於高流量應用特別重要。適當的連線池設定可以:
- 減少連線建立的開銷
- 提高並發處理能力
- 避免連線資源耗盡
- 平衡資源使用和應用效能
使用讀寫關注級別
根據需求調整讀寫關注級別:
# 對於不需要立即一致性的查詢,使用較低的讀關注
packages = await Package.find().read_concern("local").to_list()
# 對於重要的寫操作,使用較高的寫關注
await package.save(write_concern={"w": "majority"})
這段程式碼展示瞭如何調整MongoDB的讀寫關注級別:
讀關注(Read Concern):
local
:回傳節點的最新資料,不保證資料已被大多數節點確認available
:最低的讀關注級別,可能回傳已被回復的資料majority
:只回傳已被大多數節點確認的資料linearizable
:最高的讀關注級別,保證讀取最新的確認寫入
寫關注(Write Concern):
{w: 1}
:預設值,寫入操作只需要主節點確認{w: "majority"}
:寫入操作需要大多數節點確認{w: 0}
:不需要確認,最快但最不可靠
調整讀寫關注級別可以在一致性和效能之間取得平衡:
- 對於不重要的讀取(如統計資訊),可以使用較低的讀關注提高效能
- 對於關鍵的寫入操作(如金融交易),應使用較高的寫關注確保資料安全
監控與分析
使用MongoDB的監控工具識別效能問題:
explain_result = await Package.find(Package.name == "requests").explain()
print(explain_result)
indexes = await client.pypi_clone.packages.index_information()
print(indexes)
這段程式碼展示了兩種重要的MongoDB效能分析工具:
查詢計劃分析(explain):
- 使用
.explain()
方法分析查詢計劃 - 可以檢視MongoDB如何執行查詢,是否使用了索引
- 有助於識別低效查詢和缺失的索引
- 使用
索引資訊(index_information):
- 取得集合的所有索引資訊
- 可以檢查索引是否正確建立
- 有助於識別冗餘或未使用的索引
除了這些基本工具外,MongoDB還提供了更多監控選項:
- MongoDB Compass:圖形化工具,提供查詢分析和索引建議
- MongoDB Atlas監控:如果使用Atlas雲端服務,提供詳細的效能指標
- 慢查詢日誌:記錄執行時間超過閾值的查詢
透過這些最佳化技術,我們可以顯著提高MongoDB應用的效能,從而實作像文章開頭提到的那樣快速的回應時間。
MongoDB佈署與託管:選擇適合的方案
在將MongoDB應用佈署到生產環境時,我們有多種選擇。本文將探討不同的佈署選項及其優缺點。
MongoDB Atlas:雲端託管服務
MongoDB Atlas是MongoDB官方提供的雲端資料函式庫服務,提供多種優勢:
- 簡單設定:幾分鐘內完成設定
- 自動擴充套件:根據需求自動調整資源
- 內建監控:提供詳細的監控和警示
- 自動備份:定期備份和時間點還原
- 多雲佈署:支援AWS、Azure和GCP
設定Atlas的基本步驟:
- 註冊MongoDB Atlas帳戶
- 建立新叢集
- 設定網路存取控制
- 建立資料函式庫使用者
- 取得連線字元串
# 連線到Atlas
client = AsyncIOMotorClient(
"mongodb+srv://username:password@cluster0.mongodb.net/mydb?retryWrites=true&w=majority"
)
這段程式碼展示瞭如何連線到MongoDB Atlas:
- 使用
mongodb+srv
協定,這是MongoDB Atlas的標準連線方式 - 包含使用者名稱和密碼進行身份驗證
- 指定叢集地址(
cluster0.mongodb.net
) - 指定資料函式庫名稱(
mydb
) - 包含額外引數:
retryWrites=true
:啟用自動重試寫入操作w=majority
:使用多數寫入確認,確保資料安全
MongoDB Atlas是最簡單的佈署選項,特別適合:
- 初創企業和小型團隊
- 不想管理基礎設施的開發團隊
- 需要快速上線的專案
- 需要全球分散式佈署的應用
自託管選項
對於想要更多控制或有特殊需求的組織,自託管是一個選擇:
單節點佈署
最簡單的佈署方式,適合開發和小型應用:
# 使用Docker佈署單節點MongoDB
docker run -d --name mongodb \
-p 27017:27017 \
-v mongodb_data:/data/db \
mongo:latest
這個Docker命令啟動了一個單節點MongoDB例項:
-d
:在背景執行容器--name mongodb
:指定容器名稱-p 27017:27017
:將容器的27017連線埠對映到主機的27017連線埠-v mongodb_data:/data/db
:建立一個持久化卷儲存資料mongo:latest
:使用最新版本的MongoDB官方映像
單節點佈署簡單快速,但缺乏高用性和自動容錯移轉能力,主要適合:
- 開發和測試環境
- 低流量的生產應用
- 資料不是極度關鍵的場景
複製集佈署
提供高用性和自動容錯移轉:
# 啟動三節點複製集
docker run -d --name mongo1 -p 27017:27017 mongo:latest --replSet rs0
docker run -d --name mongo2 -p 27018:27017 mongo:latest --replSet rs0
docker run -d --name mongo3 -p 27019:27017 mongo:latest --replSet rs0
# 初始化複製集
docker exec -it mongo1 mongo --eval 'rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" },
{ _id: 1, host: "localhost:27018" },
{ _id: 2, host: "localhost:27019" }
]
})'
連線到複製集:
client = AsyncIOMotorClient(
"mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0"
)
這段程式碼展示瞭如何設定MongoDB複製集:
啟動複製集節點:
- 啟動三個MongoDB例項,每個都使用
--replSet rs0
引數指定複製集名稱 - 將每個例項對映到不同的主機連線埠
- 啟動三個MongoDB例項,每個都使用
初始化複製集:
- 使用
rs.initiate
命令初始化複製集 - 指定複製集ID和成員列表
- 每個成員有唯一的ID和主機地址
- 使用
連線到複製集:
- 在連線字串中列出所有節點地址
- 使用
replicaSet
引數指定複製集名稱
複製集提供了多項優勢:
- 高用性:如果主節點故障,會自動選舉新的主節點
- 自動容錯移轉:應用可以無縫切換到新的主節點
- 讀取擴充套件:可以從次要節點讀取資料,分散讀取負載
複製集是生產環境中的推薦設定,特別適合:
- 需要高用性的應用
- 不能容忍資料丟失的業務
- 中等規模的資料量和流量
分片叢集
對於需要水平擴充套件的大型應用:
- 設定伺服器複製集
- 設定分片複製集
- 設定mongos路由伺服器
- 設定分片
這種設定較為複雜,通常需要專業的資料函式函式倉管理員。
分片叢集是MongoDB最複雜但也最具擴充套件性的佈署方式:
設定伺服器複製集:
- 儲存叢集的中繼資料和設定
- 通常是一個三節點的複製集
分片複製集:
- 每個分片是一個獨立的複製集
- 資料分佈在多個分片上
- 每個分片負責一部分資料
mongos路由伺服器:
- 作為應用程式和分片叢集之間的介面
- 將查詢路由到適當的分片
- 合併來自多個分片的結果
分片設定:
- 選擇分片鍵
- 設定資料分佈策略
分片叢集適合:
- 超大規模的資料量(TB級以上)
- 需要高吞吐量的應用
- 需要無限水平擴充套件能力的場景
佈署考量因素
選擇佈署方案時,考慮以下因素:
- 可用性需求:需要什麼級別的可用性?
- 擴充套件需求:預期的資料量和流量是多少?
- 安全需求:需要什麼級別的安全性?
- 成本預算:可以投入多少資源?
- 管理能力:團隊是否有管理MongoDB的專業知識?
安全最佳實踐
無論選擇哪種佈署方式,都應遵循這些安全最佳實踐:
- 啟用認證:始終要求使用者名和密碼
- 使用TLS/SSL:加密傳輸中的資料
- 網路隔離:限制資料函式庫的網路存取
- 最小許可權原則:為使用者分配最小必要許可權
- 定期備份:實施定期備份策略
- 監控與稽核:啟用日誌記錄和監控
備份策略
實施可靠的備份策略:
- 定期備份:設定自動定期備份
- 測試還原:定期測試備份還原過程
- 異地備份:將備份儲存在不同的地理位置
- 時間點還原:對於關鍵應用,啟用時間點還原
選擇適合的MongoDB佈署方案對於應用的可靠性和效能至關重要。根據應用需求和團隊能力,選擇最適合的方案。
MongoDB與Async Python:系統負載評估應用效能
在將應用佈署到生產環境之前,進行負載測試是確保系統能夠處理預期流量的關鍵步驟。本文將介紹如何使用Locust對我們的PyPI API克隆進行負載測試。
Locust簡介
Locust是一個用Python編寫的開放原始碼負載測試工具,具有以下特點:
- 使用Python定義測試場景:無需學習新的語法
- 分散式負載生成:可以從多台機器生成負載
- Web UI:提供實時監控和控制
- 可擴充套件:可以自定義負載生成行為
安裝Locust
pip install locust
定義負載測試場景
為我們的PyPI API建立一個負載測試指令碼:
# locustfile.py
from locust import HttpUser, task, between
import random
class PyPIUser(HttpUser):
wait_time = between(1, 5) # 使用者在任務之間等待1-5秒
def on_start(self):
"""使用者開始時執行"""
# 可以在這裡進行登入等初始化操作
pass
@task(10) # 權重為10
def view_packages(self):
"""瀏覽套件列表"""
self.client.get("/packages/?limit=20")
@task(5)
def search_packages(self):
"""搜尋套件"""
search_terms = ["http", "api", "web", "data", "async", "test", "cloud", "security"]
term = random.choice(search_terms)
self.client.get(f"/packages/?q={term}&limit=20")
@task(3)
def view_package_details(self):
"""檢視套件詳情"""
popular_packages = ["requests", "flask", "django", "pandas", "numpy", "pytest"]
package = random.choice(popular_packages)
self.client.get(f"/packages/{package}")
@task(2)
def view_package_releases(self):
"""檢視套件版本"""
popular_packages = ["requests", "flask", "django", "pandas", "numpy", "pytest"]
package = random.choice(popular_packages)
self.client.get(f"/packages/{package}/releases")
@task(1)
def record_download(self):
"""記錄下載"""
popular_packages = ["requests", "flask", "django", "pandas", "numpy", "pytest"]
package = random.choice(popular_packages)
versions = ["1.0.0", "2.0.0", "3.0.0"]
version = random.choice(versions)
self.client.post("/downloads/record", json={
"package_name": package,
"version": version,
"file_name": f"{package}-{version}.tar.gz",
"user_agent": "pip/21.0.1"
})
@task(1)
def view_stats(self):
"""檢視統計訊息"""
self.client.get("/stats/top_packages?limit=10")
self.client.get("/stats/recent_updates?limit=10")
這個Locust測試檔案定義了一個模擬PyPI API使用者的測試類別:
使用者行為模型:
wait_time = between(1, 5)
:模擬使用者在執行任務之間等待1-5秒- 這種隨機等待時間使負載測試更接近真實使用者行為
任務定義:
- 使用
@task
裝飾器定義不同的使用者任務 - 括號中的數字表示任務的相對權重,數字越大執行頻率越高
- 例如,
view_packages
的權重為10,而record_download
的權重為1,表示瀏覽套件的頻率是記錄下載的10倍
- 使用
模擬不同API操作:
- 瀏覽套件列表:模擬使用者瀏覽套件目錄
- 搜尋套件:使用隨機關鍵字搜尋
- 檢視套件詳情:隨機選擇熱門套件檢視詳情
- 檢視套件版本:檢視特定套件的版本列表
- 記錄下載:模擬套件下載事件
- 檢視統計資訊:檢視熱門套件和最近更新
隨機性:
- 使用
random.choice
從預定義列表中隨機選擇值 - 這使測試更接近真實流量模式,避免總是請求相同的資源
- 使用
這種測試設計能夠全面評估API的各個端點,特別關注高頻操作(如瀏覽和搜尋)和低頻但重要的操作(如下載記錄)。
執行負載測試
locust -f locustfile.py --host=http://localhost:8000
然後在瀏覽器中存取http://localhost:8089來設定和啟動測試。
分析測試結果
Locust提供了多種指標來評估系統效能:
- 回應時間:平均、中位數、90%和99%百分位數
- 請求率:每秒請求數
- 失敗率:失敗請求的百分比
- 使用者數:模擬的並發使用者數
識別效能瓶頸
透過負載測試,我們可以識別系統中的效能瓶頸:
- 資料函式庫查詢:某些查詢可能需要最佳化
- 資源限制:CPU、記憶體或網路可能成為限制因素
- 程式碼效率:某些處理邏輯可能需要最佳化
最佳化策略
根據測試結果,我們可以實施以下最佳化策略:
資料函式庫最佳化:
- 增加或調整索引
- 最佳化查詢
- 考慮讀寫分離
應用最佳化:
- 實施快取
- 最佳化程式碼
- 減少不必要的處理
基礎設施最佳化:
- 擴充套件資源
- 調整設定
- 考慮水平擴充套件
實施快取策略
快取是提高效能的有效方法:
from fastapi import FastAPI, Depends, HTTPException, Query
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
app = FastAPI()
@app.on_event("startup")
async def startup():
redis = aioredis.from_url("redis://localhost:6379")
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
@app.get("/packages/", response_model=List[dict])
@cache(expire=60) # 快取60秒
async def list_packages(
q: Optional[str] = None,
classifier: Optional[str] = None,
maintainer: Optional[str] = None,
sort: str = "name",
limit: int = Query(10, le=100),
skip: int = 0
):
# 原始查詢邏輯...
pass
這段程式碼展示瞭如何在FastAPI應用中實施Redis快取:
設定Redis快取:
- 匯入必要的快取相關模組
- 在應用啟動時初始化Redis連線
- 設定FastAPICache使用Redis作為後端
使用快取裝飾器:
- 使用
@cache(expire=60)
裝飾API端點 - 這會將端點的回應快取60秒
- 相同引數的請求在60秒內會直接從快取回傳,不會重新執行查詢
- 使用
快取策略考量:
- 只快取讀取操作,不快取寫入操作
- 設定合適的過期時間,平衡資料新鮮度和效能
- 考慮快取鍵的設計,確保不同參陣列合有不同的快取項
快取可以顯著提高API的回應速度和吞吐量,特別是對於頻繁請求但不常變更的資料。在實際應用中,可能需要根據不同端點的特性調整快取策略。
持續監控與最佳化
負載測試不應該是一次性活動,而應該是持續過程:
- 持續整合:將負載測試整合到CI/CD流程中
- 效能基準:建立效能基準並監控變化
- 定期審查:定期審查系統效能並進行最佳化
透過系統負載測試,我們可以確保應用能夠處理預期的流量,並在問題影響使用者之前識別和解決效能瓶頸。
深入理解檔案資料函式庫:MongoDB查詢語法與應用
在探索現代資料函式庫技術時,檔案資料函式庫以其靈活性和高效能而脫穎而出。本文將深入解析檔案資料函式庫的運作原理,特別聚焦於MongoDB的原生查詢語法。雖然在實際應用開發中,我們通常會使用Pydantic、Beanie和Async/Await等工具,但瞭解MongoDB的原生查詢能力對於資料探索和問題診斷仍然至關重要。
檔案資料函式庫的運作原理
檔案資料函式庫的核心優勢在於其靈活的資料模型。以一個實際的章節記錄為例,我們可以看到檔案資料函式庫如何儲存結構化資料:
{
"_id": "chapter123",
"title": "MongoDB查詢語法",
"course_id": "mongodb101",
"duration": 3600,
"lectures": [
{
"id": "lec101",
"title": "歡迎與介紹",
"duration": 300
},
{
"id": "lec102",
"title": "Linux安裝",
"duration": 450
}
// 更多講座...
]
}
這個JSON檔案展示了MongoDB檔案的典型結構:
檔案結構:
- 每個檔案都有一個唯一的
_id
欄位,作為主鍵 - 檔案包含各種型別的欄位:字串、數字、陣列等
- 檔案可以包含嵌入式檔案或陣列
- 每個檔案都有一個唯一的
嵌入式檔案:
lectures
是一個陣列,包含多個講座物件- 每個講座都是一個完整的嵌入式檔案,有自己的ID、標題和時長
- 這種結構允許在單一查詢中取得章節及其所有講座
這種資料結構可以分為兩個部分:
- 傳統欄位:如
_id
、title
、course_id
和duration
,類別似於關聯式資料函式庫中的欄位 - 嵌入式檔案:如
lectures
陣列,包含多個講座物件
嵌入式檔案是檔案資料函式庫的獨特性,可以視為「預先計算的連線」(pre-computed join)。這意味著當你查詢章節資料時,相關的講座資料已經包含在結果中,無需額外的連線操作,大幅提升查詢效能。
MongoDB Shell基礎操作
MongoDB Shell (mongosh)是與MongoDB互動的命令列工具。以下是一些基本操作:
# 連線到MongoDB
mongosh
# 顯示所有資料函式庫
show dbs
# 使用特定資料函式庫
use training
# 查詢檔案
db.courses.find({ "_id": 30 }).pretty()
這段程式碼展示了MongoDB Shell的基本操作:
連線MongoDB:
- 使用
mongosh
命令啟動MongoDB Shell - 預設連線到本地的MongoDB伺服器
- 使用
資料函式庫操作:
show dbs
列出所有可用的資料函式庫use training
切換到名為"training"的資料函式庫
查詢操作:
db.courses.find({ "_id": 30 })
查詢ID為30的課程pretty()
方法使結果以格式化的方式顯示,更易於閱讀
在MongoDB中,db
指向當前使用的資料函式庫,courses
是集合名稱,find()
是查詢方法。查詢條件以JSON物件的形式提供,這使得查詢語法非常直覺。
進階查詢範例
讓我們看一個更複雜的查詢範例:
# 切換到書店資料函式庫
use bookstore
# 顯示所有集合
show collections
# 分頁查詢:跳過前15筆,限制回傳5筆
db.Book.find().skip(15).limit(5)
# 根據標題查詢書籍
db.Book.find({ "title": "From the Corner of His Eye" })
# 複合條件查詢
db.Book.find({
"title": "From the Corner of His Eye",
"ISBN": "0553582747"
})
# 查詢嵌入式檔案
db.Book.find({ "ratings.user_id": ObjectId("507f1f77bcf86cd799439011") })
這段程式碼展示了更多MongoDB查詢功能:
集合操作:
show collections
顯示資料函式庫中的所有集合- MongoDB中的集合類別似於關聯式資料函式庫中的表
分頁查詢:
skip(15)
跳過前15個結果limit(5)
限制回傳5個結果- 這兩個方法結合使用實作基本分頁功能
條件查詢:
- 使用欄位名和值進行精確比對
- 可以組合多個條件進行複合查詢
嵌入式檔案查詢:
- 使用點表示法(
ratings.user_id
)查詢嵌入式檔案中的欄位 - 這是MongoDB的強大特性,允許直接查詢嵌入式檔案
- 使用點表示法(
這些範例展示了MongoDB查詢的靈活性,特別是對嵌入式檔案的查詢能力。注意MongoDB是區分大小寫的,所以Book
和book
是兩個不同的集合。
MongoDB查詢運算元
MongoDB提供了多種查詢運算元,用於表達複雜的查詢條件:
// 等於
{ field: { $eq: value } }
// 大於
{ field: { $gt: value } }
// 大於等於
{ field: { $gte: value } }
// 小於
{ field: { $lt: value } }
// 小於等於
{ field: { $lte: value } }
// 不等於
{ field: { $ne: value } }
// 在...之中
{ field: { $in: [value1, value2, ...] } }
例如,要查詢評分大於等於9的書籍:
db.Book.find({ "ratings.value": { $gte: 9 } })
這段程式碼展示了MongoDB的比較運算元:
比較運算元:
$eq
:等於(通常可以省略,直接使用{ field: value }
)$gt
:大於$gte
:大於等於$lt
:小於$lte
:小於等於$ne
:不等於$in
:在指定的陣列中
運算元語法:
- 運算元總是以
$
開頭 - 運算元放在欄位值的位置,作為一個物件的鍵
- 可以組合多個運算元建立複雜條件
- 運算元總是以
嵌入式檔案中的運算元:
- 範例中使用
"ratings.value": { $gte: 9 }
查詢評分大於等於9的書籍 - 這展示瞭如何在嵌入式檔案的欄位上使用比較運算元
- 範例中使用
這些運算元使MongoDB的查詢能力大增強,能夠表達各種複雜的查詢條件。在實際應用中,這些運算元通常透過ODM(如Beanie)的API間接使用。
邏輯運算元
MongoDB也支援邏輯運算元,用於組合多個查詢條件:
// OR運算元
db.collection.find({
$or: [
{ field1: value1 },
{ field2: value2 }
]
})
// AND運算元 (通常直接使用多條件)
db.collection.find({
field1: value1,
field2: value2
})
// NOT運算元
db.collection.find({
field: { $not: { $eq: value } }
})
這段程式碼展示了MongoDB的邏輯運算元:
OR運算元:
- 使用
$or
後跟一個條件陣列 - 當任一條件滿足時,檔案會被包含在結果中
- 例如:查詢標題包含"MongoDB"或作者為"John"的書籍
- 使用
AND運算元:
- MongoDB中的多個條件預設是AND關係
- 可以直接在查詢物件中列出多個條件
- 所有條件都必須滿足,檔案才會被包含在結果中
NOT運算元:
- 使用
$not
否定一個條件 - 通常與其他運算元結合使用
- 例如:查詢價格不等於9.99的書籍
- 使用
這些邏輯運算元可以組合使用,建立非常複雜的查詢條件。例如,可以使用巢狀的$or
和$and
運算元表達複雜的布林邏輯。
投影:最佳化查詢效能
投影(Projection)是MongoDB中的一個重要概念,它允許你只回傳檔案中的特定欄位,而不是整個檔案。這對於提升查詢效能至關重要,特別是當檔案包含大量嵌入式資料時。
// 只回傳ISBN和標題
db.Book.find({}, { "ISBN": 1, "title": 1 })
// 排除_id欄位
db.Book.find({}, { "ISBN": 1, "title": 1, "_id": 0 })
這段程式碼展示了MongoDB的投影功能:
投影語法:
- 投影是
find()
方法的第二個引數 - 使用
{ field: 1 }
包含特定欄位 - 使用
{ field: 0 }
排除特定欄位 - 預設情況下,
_id
欄位總是包含在結果中,除非明確排除
- 投影是
投影的好處:
- 減少網路傳輸量
- 減少客戶端記憶體使用
- 減少序列化和反序列化的開銷
投影限制:
- 在同一個投影中,不能同時使用包含和排除(除了
_id
欄位) - 例如,不能同時使用
{ field1: 1, field2: 0 }
- 在同一個投影中,不能同時使用包含和排除(除了
在實際應用中,合理使用投影可以顯著提升API效能。玄貓曾經遇到一個案例,某個API端點的回應時間從900毫秒降到了10-20毫秒,僅是因為使用了投影來限制回傳的資料量。
實際應用案例
在開發一個電子商務平台時,玄貓曾面臨一個挑戰:如何高效地查詢包含特定評論的產品。傳統關聯式資料函式庫需要複雜的連線操作,而使用MongoDB的嵌入式檔案模型,這變得異常簡單:
db.products.find({ "reviews.user_id": userObjectId })
這個簡單的查詢展示了MongoDB在實際應用中的強大之處:
嵌入式檔案查詢:
- 使用點表示法直接查詢嵌入在產品檔案中的評論
- 不需要任何JOIN操作或多次查詢
效能優勢:
- 單一查詢即可取得所有相關資料
- 透過在
reviews.user_id
上建立索引,可以進一步提高查詢效能 - 這種查詢可以達到與主鍵查詢相當的效能
應用場景:
- 使用者個人資料頁面顯示使用者的所有評論
- 管理員審核特定使用者的評論
- 分析使用者評論行為
這個例子展示了MongoDB如何簡化資料模型和查詢邏輯,同時提供卓越的效能。在關聯式資料函式庫中,這種查詢通常需要複雜的JOIN操作,而在MongoDB中,它只是一個簡單的查詢。
非同步程式設計:Python中的Async與平行處理
打破對非同步程式設計的迷思
在Python世界中,非同步程式設計(Async)常被視為一個複雜與危險的領域。許多開發者會警告你:「非同步很難」、「執行緒和平行處理會讓你自找麻煩」、「你會在第一天就把自己搞得一團糟」。這些警告在某些情況下確實有道理,特別是當你需要處理大量執行緒,並透過事件、訊號、臨界區和訊號量來協調它們時。
然而,玄貓想強調的是,我們今天討論的非同步處理其實相當簡單直觀。我們關注的是像這樣的基本需求:
- 向資料函式庫傳送查詢時,不希望系統被阻塞
- 同時處理多個獨立的API請求
- 與多個HTTP服務同時通訊
這些是「簡易模式」的非同步和平行處理,但這正是大多數開發者日常所需的功能,特別是當你使用像Beanie這樣的非同步框架時。
為什麼需要非同步程式設計?
使用非同步協程(Async-Coroutine)程式設計主要有兩個原因:
- 等待操作時提高效率:當程式在等待資料函式庫查詢或API呼叫時,可以執行其他工作
- 充分利用現代硬體:現代電腦通常有多個核心,非同步程式設計可以更有效地利用這些資源
硬體效能的演進
讓我們先簡單瞭解硬體效能的演進。從摩爾定律的角度來看,直到2008年左右,電腦的電晶體數量、單執行緒效能和時脈速度都呈現穩定增長。如果你的程式不夠快,等一年就會因為硬體升級而變快。
但在2008年左右,情況發生了變化。由於熱量和物理限制,我們開始轉向多核心設計而非單純提高時脈速度。現代電腦(如Apple Silicon M2 Pro)擁有10個核心,這意味著如果你只寫單執行緒程式,就只能利用其中一個核心的能力。
以下是一個簡單的Python程式示範:
# 單執行緒程式
while True:
n = 997
n = (n * n) % 997
這是一個極簡單的單執行緒程式,它不斷計算一個數字的平方並取模:
單執行緒特性:
- 程式只使用一個執行緒執行
- 即使在多核心繫統上,也只能使用一個核心
- 這種程式無法充分利用現代多核心處理器
資源使用情況:
- 即使這個程式使CPU達到100%,在一個16核心的系統上,它也只使用了約7%的系統資源
- 這是因為它只能使用一個核心(1/16 ≈ 6.25%)
這就是為什麼我們需要多執行緒和平行處理來充分利用現代硬體。傳統上,Python在這方面並不出色,主要是因為GIL(全域直譯器鎖)的存在。不過,有多種方法可以繞過GIL,如使用Cython、匯入C或Rust函式庫、使用多處理等。
非同步處理與系統擴充套件性
對於需要與資料函式庫通訊的開發者來說,非同步處理的真正價值在於提高系統擴充套件性。
擴充套件性指的是系統能夠處理越來越多請求而不會顯著降低效能的能力。如果我將系統的請求數量增加四倍,效能是否會下降到四分之一或更糟?理想情況下,即使請求數量增加,每個請求的處理速度也應該保持相對穩定。
同步處理的限制
想像一個處理API或網頁請求的網頁伺服器。如果有三個請求(請求1、2和3)依次到達,每個請求需要一定的處理時間:
- 請求1需要較長時間處理
- 請求2需要類別似的處理時間
- 請求3處理很快
在同步處理模式下,請求必須依次處理。這意味著:
- 請求1的回應時間等於其處理時間
- 請求2必須等待請求1完成,因此其回應時間更長
- 請求3雖然處理很快,但必須等待前兩個請求完成,導致其回應時間最長
如果我們深入分析一個請求的處理過程,會發現許多時間都花在等待上:
- 等待資料函式庫查詢
- 等待API回應
- 等待網路傳輸
在同步模式下,當程式在等待這些操作時,它無法處理其他請求,這導致了資源的浪費。
非同步處理的優勢
使用非同步執行,我們可以在等待一個請求的I/O操作時處理其他請求:
- 請求1開始處理,遇到資料函式庫操作時進入等待狀態
- 系統開始處理請求2,直到它也需要等待
- 系統處理請求3
- 當請求1的資料函式庫操作完成時,繼續處理請求1
- 以此類別推
這種方式下,每個請求的回應時間接近於其實際處理時間,而不是所有前面請求的總和。這就是擴充套件性的體現:無論有一個請求還是多個請求,感知的回應時間基本保持不變。
實作非同步程式
讓我們透過一個實際例子來理解非同步程式設計。我們將使用Python的async
和await
關鍵字來實作非同步API呼叫。
首先,我們需要一個支援非同步的HTTP客戶端。Python的requests
函式庫不支援非同步,但httpx
函式庫提供了這個功能:
import httpx
import asyncio
from datetime import datetime
# 城市URL列表
locations = [
"https://weather.example.com/api?city=Taipei&state=TP",
"https://weather.example.com/api?city=TaiNan&state=TN",
# 更多城市...
]
async def get_report(url):
print(f"連線 {url}")
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
async def main():
start = datetime.now()
reports = []
for url in locations:
report = await get_report(url)
reports.append(report)
print(f"位置: {report['location']}")
print(f"預報: {report['forecast']}")
end = datetime.now()
print(f"總耗時: {(end-start).total_seconds() * 1000:.1f} 毫秒")
if __name__ == "__main__":
asyncio.run(main())
這段程式碼展示了基本的非同步HTTP請求:
非同步函式定義:
- 使用
async def
定義非同步函式 get_report
函式使用await
等待HTTP請求完成main
函式是主要的非同步入口點
- 使用
非同步HTTP客戶端:
- 使用
httpx.AsyncClient()
建立非同步HTTP客戶端 - 使用
await client.get(url)
傳送非同步GET請求 - 使用
async with
確保客戶端正確關閉
- 使用
執行流程:
- 迴圈遍歷URL列表
- 對每個URL傳送請求並等待結果
- 處理並顯示結果