FastAPI 提供簡潔易用的方式與各種資料函式庫互動,實作資料的增刪改查。本文以 SQLite 為例,示範如何直接使用 sqlite3 和 aiosqlite 進行同步和非同步的資料函式庫操作,並進一步介紹如何使用 SQLAlchemy ORM 簡化開發流程,提升程式碼可讀性和可維護性。同時,也涵蓋了 Pydantic 模型的定義和依賴注入的使用,確保資料的完整性和程式碼的結構性。
# 此處放置程式碼
在FastAPI中使用資料函式庫進行CRUD操作
簡介
FastAPI應用程式可以輕鬆地與幾乎任何型別的關聯式資料函式庫進行通訊,例如SQLite、MySQL、PostgreSQL和Oracle等。為了實作這一點,需要一個符合DB-API標準的資料函式庫特定驅動程式介面。這確保了執行資料函式庫操作的函式對於任何型別的資料函式庫都是統一的。因此,如果開發人員決定更改後端資料函式庫,只需要進行最小的更改。
使用SQLite資料函式庫
在本文中,我們將使用SQLite建立一個FastAPI應用程式,以在SQLite資料函式庫的Books表中執行CRUD操作。SQLite是一個輕量級且無伺服器的資料函式庫,並且在Python的標準函式庫中具有內建支援,其形式為sqlite3模組,這是DB-API的參考實作。
建立Books表
第一步是建立與資料函式庫的連線並取得Connection物件。使用connect()函式來實作此目的。
import sqlite3
conn = sqlite3.connect("mydata.sqlite3")
connect()函式的字串引數是代表資料函式庫的檔案。如果mydata.sqlite3資料函式庫不存在,則會建立它。
建立資料函式庫遊標
要執行資料函式庫操作,需要一個資料函式庫遊標來處理所有查詢事務。
cur = conn.cursor()
在這個遊標物件上呼叫execute()方法。其字串引數包含要由SQL引擎執行的SQL查詢。我們所需的Books表由以下程式碼建立。
初始化資料函式庫
def init_db():
conn = sqlite3.connect("mydata.sqlite3")
cur = conn.cursor()
qry = '''
SELECT count(name) FROM sqlite_master WHERE type='table'
AND name='Books'
'''
cur.execute(qry)
if cur.fetchone()[0] == 0: # 如果表不存在
qry = '''
CREATE TABLE IF NOT EXISTS Books (
id INTEGER (10) PRIMARY KEY,
title STRING (50),
author STRING (20),
price INTEGER (10),
publisher STRING (20)
);
'''
#### 內容解密:
# 這段程式碼首先檢查Books表是否存在。如果不存在,則建立一個新的Books表。
# CREATE TABLE陳述式用於建立表,IF NOT EXISTS子句確保如果表已經存在,則不會嘗試重新建立它。
# Books表的欄位包括id(主鍵)、title、author、price和publisher。
cur.execute(qry)
conn.close()
init_db()
驗證Books表的建立
有多種方法可以驗證Books表是否已實際建立。可以使用任何可用的SQLite GUI工具(例如SQLite Studio)。或者,可以開啟SQLite shell,開啟資料函式庫,並檢查Books表的架構。
sqlite> .open mydata.sqlite3
sqlite> .tables
Books
sqlite> .schema Books
CREATE TABLE Books (
id INTEGER (10) PRIMARY KEY,
title STRING (50),
author STRING (20),
price INTEGER (10),
publisher STRING (20)
);
內容解密:
這段SQL程式碼用於檢查Books表的架構,顯示了表的欄位定義。
新增一本新書
如前所述,POST方法用於建立新資源,在本例中,即在Books表中新增一本新書的資料。要在請求主體中包含資料,需宣告Pydantic模型。
from pydantic import BaseModel
class Book(BaseModel):
id: int
title: str
author: str
price: int
publisher: str
內容解密:
這段程式碼定義了一個Pydantic模型Book,用於表示一本文的資料結構,包括id、title、author、price和publisher欄位。
POST操作函式
POST操作函式使用Book模型的物件作為引數。此外,該函式還需要資料函式庫上下文——連線和遊標物件——以便執行INSERT操作。可以將它們視為操作函式的依賴項。
from fastapi import Depends
def get_cursor():
conn = sqlite3.connect("mydata.db")
conn.row_factory = sqlite3.Row
cur = conn.cursor()
yield (conn, cur)
內容解密:
get_cursor函式用於提供資料函式庫連線和遊標物件作為依賴項,供其他函式使用。
新增書籍的API端點
from fastapi import FastAPI, Depends
app = FastAPI()
@app.post("/books")
def add_book(book: Book, db = Depends(get_cursor)):
id = book.id
title = book.title
author = book.author
price = book.price
publisher = book.publisher
cur = db[1]
conn = db[0]
ins = "INSERT INTO books VALUES (?,?,?,?,?)"
cur.execute(ins, (id, title, author, price, publisher))
conn.commit()
return "Record successfully added"
內容解密:
add_book函式處理新增書籍的POST請求,使用Pydantic模型Book解析請求主體,並使用依賴項get_cursor提供的資料函式庫連線和遊標執行INSERT操作,將新書籍的資料插入Books表中。
測試API端點
可以使用Swagger UI測試這個路由。啟動Uvicorn伺服器並展開add_book()函式。輸入某些測試資料並執行該函式。
檢查Books表中的記錄
可以在SQLite終端中檢查Books表中的記錄,如下所示。
sqlite> SELECT * FROM Books;
內容解密:
這段SQL程式碼用於查詢Books表中的所有記錄,以驗證新增操作的結果。
使用 SQLite 資料函式庫進行 CRUD 操作
本章節將探討如何使用 FastAPI 與 SQLite 資料函式庫進行基本的 CRUD(建立、讀取、更新、刪除)操作。我們將從設定資料函式庫開始,逐步實作各項操作,並探討如何使用非同步方式提升效能。
建立資料函式庫與資料表
首先,我們需要建立一個 SQLite 資料函式庫並建立一個名為 Books 的資料表。以下是建立資料表的 SQL 語法:
CREATE TABLE Books (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
author TEXT NOT NULL,
price REAL NOT NULL,
publisher TEXT NOT NULL
);
內容解密:
此 SQL 語法建立了一個名為 Books 的資料表,包含五個欄位:
id:主鍵,以整數表示title:書名,以文字表示且不可為空author:作者,以文字表示且不可為空price:價格,以實數表示且不可為空publisher:出版社,以文字表示且不可為空
新增資料
接下來,我們實作一個使用 @app.post() 裝飾器的函式來新增書籍資料。
@app.post("/books")
def add_book(book: Book, db=Depends(get_cursor)):
id = book.id
title = book.title
author = book.author
price = book.price
publisher = book.publisher
cur = db[1]
conn = db[0]
ins = "INSERT INTO books VALUES (?,?,?,?,?)"
cur.execute(ins, (id, title, author, price, publisher))
conn.commit()
return "Record successfully added"
內容解密:
此函式接收一個 Book 物件,並將其資料插入到 Books 資料表中。
- 從
db引數中取得資料函式庫連線和遊標物件。 - 使用
INSERT INTO語法將書籍資料插入資料表。 - 呼叫
commit()方法提交交易。 - 傳回成功訊息。
讀取所有書籍資料
我們可以使用 @app.get() 裝飾器來讀取所有書籍資料。
@app.get("/books")
def get_books(db=Depends(get_cursor)):
cur = db[1]
conn = db[0]
cur.execute("select * from Books;")
books = cur.fetchall()
return books
內容解密:
此函式從 Books 資料表中擷取所有書籍資料。
- 執行
SELECT * FROM Books;查詢。 - 使用
fetchall()方法取得所有查詢結果。 - 將結果以 JSON 格式傳回給客戶端。
讀取單一書籍資料
若要讀取特定 ID 的書籍資料,可以使用路徑引數。
@app.get("/books/{id}")
def get_book(id: int, db=Depends(get_cursor)):
cur = db[1]
conn = db[0]
cur.execute("select * from Books where id=?", (id,))
book = cur.fetchone()
return book
內容解密:
此函式根據提供的 ID 擷取單一書籍資料。
- 使用引數化的
SELECT查詢,避免 SQL 注入風險。 - 使用
fetchone()方法取得第一筆查詢結果。 - 將結果傳回給客戶端。
更新書籍資料
更新特定書籍的價格可以使用 @app.put() 裝飾器。
@app.put("/books/{id}")
def update_book(id: int, price: str = Body(), db=Depends(get_cursor)):
cur = db[1]
conn = db[0]
qry = "UPDATE Books set price=? where id=?"
cur.execute(qry, (price, id))
conn.commit()
return "Book updated successfully"
內容解密:
此函式更新特定 ID 的書籍價格。
- 使用
UPDATE語法更新指定 ID 的書籍價格。 - 提交交易以儲存變更。
- 傳回成功訊息。
刪除書籍資料
刪除特定書籍可以使用 @app.delete() 裝飾器。
@app.delete("/books/{id}")
def del_book(id: int, db=Depends(get_cursor)):
cur = db[1]
conn = db[0]
cur.execute("delete from Books where id=?", (id,))
conn.commit()
return "Book deleted successfully"
內容解密:
此函式刪除特定 ID 的書籍資料。
- 使用
DELETE FROM語法刪除指定 ID 的書籍。 - 提交交易以確認刪除。
- 傳回成功訊息。
使用 aiosqlite 進行非同步操作
預設的 sqlite3 模組不支援非同步操作,因此我們可以使用 aiosqlite 模組來實作非同步的資料函式庫操作。
首先,需要安裝 aiosqlite:
pip3 install aiosqlite
接著,將相關函式改為非同步版本:
import aiosqlite
async def get_cursor():
conn = await aiosqlite.connect("mydata.sqlite3")
conn.row_factory = aiosqlite.Row
cur = await conn.cursor()
yield (conn, cur)
內容解密:
此非同步函式建立資料函式庫連線和遊標物件。
- 使用
aiosqlite.connect()建立非同步連線。 - 設定
row_factory以便以字典形式取得查詢結果。 - 建立非同步遊標物件。
非同步的新增資料範例如下:
@app.post("/books")
async def add_book(book: Book, db=Depends(get_cursor)):
id = book.id
title = book.title
author = book.author
price = book.price
publisher = book.publisher
cur = db[1]
conn = db[0]
ins = "INSERT INTO books VALUES (?,?,?,?,?)"
await cur.execute(ins, (id, title, author, price, publisher))
await conn.commit()
return "Record successfully added"
內容解密:
此非同步函式新增書籍資料到資料函式庫。
-
使用
await關鍵字執行非同步的 SQL 陳述式。 -
提交非同步交易。
-
安全性增強:實作身分驗證和授權機制,確保只有授權使用者可以存取和修改資料。
-
效能最佳化:使用連線池和快取技術提升資料函式庫操作的效能。
-
錯誤處理:實作全面的錯誤處理和日誌記錄機制,以便於除錯和監控應用程式狀態。
透過這些改進,可以進一步提升應用程式的穩定性和使用者經驗。未來還可以考慮使用更強大的資料函式庫系統,如 PostgreSQL,以支援更大規模的應用需求。
使用SQLAlchemy進行資料函式庫操作
在前面的章節中,我們已經使用sqlite3和aiosqlite模組對SQLite資料函式庫進行了CRUD操作。對於其他資料函式庫,您需要使用相應的DB-API相容模組(例如,pymysql用於MySQL,aiomysql作為其asyncio相容版本)。在路徑操作函式中,您基本上使用請求資料來構建SQL查詢,然後執行它。這有時可能是一項繁瑣的任務。各種ORM(物件關聯對映器)使開發人員的生活變得輕鬆。
ORM技術簡介
當您需要在Python程式中與關聯式資料函式庫互動時,您面臨兩個挑戰。首先,您必須具備良好的SQL語法知識。其次,需要將資料從Python環境轉換為SQL資料型別。SQL資料型別基本上是純量型別(數字、字串等),而在Python中,您有可能包含多於一個基本資料型別的物件。
ORM技術幫助您在這些不相容的型別系統之間轉換資料。ORM中的“物件”指的是Python類別的物件。您可能記得,在關聯式資料函式庫理論中,表格被稱為關係。Python類別被對映到資料函式庫中相應結構的表格。因此,對映類別的每個物件都反映為資料函式庫表格中的一行。
ORM的優點
- 將Python物件資料按照OO原則進行操作
- 對應的SQL陳述式由ORM發出,並在後台執行CRUD操作
- 作為Python開發人員,您無需編寫任何SQL查詢
SQLAlchemy簡介
SQLAlchemy是一個非常流行的SQL工具包和物件關聯對映器API。在我們的範例中,我們一直在對Books表格執行CRUD操作。使用SQLAlchemy,我們將擁有一個Books類別,並透過其物件執行CRUD操作。
連線到資料函式庫
首先,您需要使用create_engine()函式建立與資料函式庫的連線。資料函式庫由SQLALCHEMY_DATABASE_URL常數參照(清單6-19)。它本質上與Connection物件相似。
from sqlalchemy import create_engine
from sqlalchemy.dialects.sqlite import *
SQLALCHEMY_DATABASE_URL = "sqlite:///./mydata.sqlite3"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
ORM模型
接下來,您需要宣告一個繼承自declarative_base類別的Books類別(清單6-20)。Books類別的類別屬性對應於對映到它的資料函式庫表格的所需結構。可以明確指定表格的名稱為__tablename__。
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy import Column, Integer, String
class Books(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True, nullable=False)
title = Column(String(50), unique=True)
author = Column(String(50))
price = Column(Integer)
publisher = Column(String(50))
Base.metadata.create_all(bind=engine)
會話物件
會話物件類別似於遊標物件。它是正在使用的資料函式庫的控制程式碼。所有資料函式庫操作都是透過此會話物件完成的。Sessionmaker()函式傳回會話類別(清單6-21)。
from sqlalchemy.orm import sessionmaker, Session
session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
依賴函式
您需要將資料函式庫上下文注入到FastAPI應用程式的路徑操作函式中。我們將使用清單6-22中的函式將會話物件注入到所有操作函式中。
def get_db():
db = session()
try:
yield db
finally:
db.close()
Pydantic模型
SQLAlchemy部分的應用程式到此結束。對於FastAPI應用程式,我們需要一個Pydantic模型類別,其結構與ORM類別相匹配。清單6-23提供了Book類別的定義。
from pydantic import BaseModel
class Book(BaseModel):
id: int
title: str
author: str
price: int
publisher: str
class Config:
orm_mode = True
路徑操作函式
現在,我們將開發用於POST、GET、PUT和DELETE方法的路徑操作函式。我們將使用與前面範例中相同的URL路徑端點:
- POST /books
- GET /books
- GET /books/{id}
- PUT /books/{id}
- DELETE /books/{id}
@app.post()
您知道,INSERT操作是由HTTP POST方法執行的。使用Book Pydantic模型作為此目的response_model,並將其物件作為add_book()函式的回應體引數,如清單6-24所示。
@app.post("/books", response_model=Book)
def add_book(book: Book, db: Session = Depends(get_db)):
# 新增書籍的邏輯
pass
內容解密:
在上述程式碼中,我們定義了一個用於新增書籍的路徑操作函式add_book()。該函式使用Book Pydantic模型作為其回應模型,並將會話物件作為依賴項注入。新增書籍的邏輯將在函式內部實作。
CRUD操作的實作
以下是使用SQLAlchemy實作CRUD操作的完整程式碼範例:
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 建立FastAPI應用程式
app = FastAPI()
# 建立資料函式庫引擎
SQLALCHEMY_DATABASE_URL = "sqlite:///./mydata.sqlite3"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# 建立基礎類別
Base = declarative_base()
# 定義Books類別
class Books(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True, nullable=False)
title = Column(String(50), unique=True)
author = Column(String(50))
price = Column(Integer)
publisher = Column(String(50))
# 建立表格
Base.metadata.create_all(bind=engine)
# 建立會話類別
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 定義Pydantic模型
class Book(BaseModel):
id: int
title: str
author: str
price: int
publisher: str
class Config:
orm_mode = True
# 依賴函式
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 新增書籍
@app.post("/books", response_model=Book)
def add_book(book: Book, db: Session = Depends(get_db)):
db_book = Books(**book.dict())
db.add(db_book)
db.commit()
db.refresh(db_book)
return db_book
# 取得所有書籍
@app.get("/books")
def get_books(db: Session = Depends(get_db)):
return db.query(Books).all()
# 取得單一書籍
@app.get("/books/{book_id}")
def get_book(book_id: int, db: Session = Depends(get_db)):
return db.query(Books).filter(Books.id == book_id).first()
# 更新書籍
@app.put("/books/{book_id}", response_model=Book)
def update_book(book_id: int, book: Book, db: Session = Depends(get_db)):
db_book = db.query(Books).filter(Books.id == book_id).first()
for key, value in book.dict().items():
setattr(db_book, key, value)
db.commit()
db.refresh(db_book)
return db_book
# 刪除書籍
@app.delete("/books/{book_id}")
def delete_book(book_id: int, db: Session = Depends(get_db)):
db_book = db.query(Books).filter(Books.id == book_id).first()
db.delete(db_book)
db.commit()
return {"message": "Book deleted successfully"}