在先前章節中,我們已經使用 SQLModel 和 Beanie 將 SQL 和 NoSQL 資料函式庫整合至 FastAPI 應用程式,實作了資料函式庫互動。然而,應用程式安全仍待加強,任何人都能新增活動。本章將使用 JWT 保護應用程式,僅允許已驗證使用者執行特定操作。首先,我們將實作使用者註冊和登入路由,並使用 bcrypt 雜湊密碼,避免以明文儲存密碼。接著,我們將介紹 OAuth2 授權機制,並使用 JWT 生成和驗證存取令牌,確保只有持有有效令牌的使用者才能存取受保護的資源。最後,我們將示範如何將 JWT 整合到 FastAPI 應用程式中,實作完整的身份驗證和授權流程,提升應用程式整體安全性。

使用 Beanie 實作 CRUD 操作與 FastAPI 應用安全

在前一章節中,我們探討瞭如何使用 SQLModel 和 Beanie 分別將 SQL 和 NoSQL 資料函式庫整合到 FastAPI 應用程式中。我們利用前面章節的知識,成功實作了資料函式庫方法並更新了現有的路由,以啟用應用程式與資料函式庫之間的互動。然而,目前的活動企劃應用程式仍然允許任何人新增活動,而不是僅限於已驗證的使用者。在本章中,我們將使用 JSON Web Token(JWT)來保護應用程式,並限制某些活動操作僅供已驗證的使用者使用。

保護 FastAPI 應用程式

保護應用程式涉及新增安全措施,以限制未授權實體對應用程式功能的存取,防止駭客攻擊或非法修改應用程式。驗證是驗證實體傳遞的憑證的過程,而授權則是授予實體執行指定操作的許可權。當憑證被驗證後,實體便被授權執行各種操作。

實作使用者註冊與登入路由

首先,我們來實作使用者註冊和登入的路由。以下程式碼展示瞭如何使用 Beanie 進行使用者註冊和登入操作:

@user_router.post("/signup")
async def create_user(user: UserSignUp) -> dict:
    user_exist = await User.find_one(User.email == user.email)
    if user_exist:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="User with email provided exists already."
        )
    await user.create()
    return {"message": "User created successfully"}

@user_router.post("/signin")
async def sign_user_in(user: UserSignIn) -> dict:
    user_exist = await User.find_one(User.email == user.email)
    if not user_exist:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User with email does not exist."
        )
    if user_exist.password == user.password:
        return {"message": "User signed in successfully."}
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid details passed."
    )

內容解密:

  1. create_user函式:此函式負責處理使用者註冊請求。它首先檢查資料函式庫中是否已存在具有相同電子郵件地址的使用者。如果存在,則引發 HTTP 衝突錯誤(409)。如果不存在,則建立新使用者並傳回成功訊息。

  2. sign_user_in函式:此函式處理使用者登入請求。它首先檢查資料函式庫中是否存在具有指定電子郵件地址的使用者。如果不存在,則引發 HTTP 404 錯誤。如果存在,則驗證密碼。如果密碼正確,則傳回成功登入訊息;否則,引發 HTTP 401 未授權錯誤。

測試路由

接下來,我們啟動 MongoDB 例項和應用程式,並使用 curl 命令測試事件路由和使用者註冊、登入路由。

# 啟動 MongoDB
(venv)$ mkdir store
(venv)$ mongod --dbpath store

# 在另一個視窗中啟動應用程式
(venv)$ python main.py

使用 curl 命令測試事件相關操作和使用者註冊、登入:

# 建立事件
(venv)$ curl -X 'POST' \
'http://0.0.0.0:8080/event/new' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"title": "FastAPI Book Launch",
"image": "https://linktomyimage.com/image.png",
"description": "We will be discussing the contents of the FastAPI book in this event. Ensure to come with your own copy to win gifts!",
"tags": ["python", "fastapi", "book", "launch"],
"location": "Google Meet"
}'

# 取得所有事件
(venv)$ curl -X 'GET' \
'http://0.0.0.0:8080/event/' \
-H 'accept: application/json'

# 更新事件
(venv)$ curl -X 'PUT' \
'http://0.0.0.0:8080/event/{event_id}' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{"location": "Hybrid"}'

# 刪除事件
(venv)$ curl -X 'DELETE' \
'http://0.0.0.0:8080/event/{event_id}' \
-H 'accept: application/json'

# 使用者註冊
(venv)$ curl -X 'POST' \
'http://0.0.0.0:8080/user/signup' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{"email": "fastapi@packt.com", "password": "strong!!!", "events": []}'

# 使用者登入
(venv)$ curl -X 'POST' \
'http://0.0.0.0:8080/user/signin' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{"email": "fastapi@packt.com", "password": "strong!!!"}'

強化FastAPI應用程式的安全性

本章結束時,您將能夠為FastAPI應用程式新增身份驗證層。本章將解釋透過雜湊保護密碼、新增身份驗證層以及保護路由免受未經身份驗證的使用者存取的過程。 在本章中,我們將涵蓋以下主題:

  • FastAPI中的身份驗證方法
  • 使用OAuth2和JWT保護應用程式
  • 使用依賴注入保護路由
  • 組態CORS

技術需求

要跟隨本章的內容,需要MongoDB資料函式庫元件。您的作業系統的安裝程式可以在其官方檔案中找到。

FastAPI中的身份驗證方法

FastAPI支援多種身份驗證方法,包括基本HTTP身份驗證、Cookie和持有者令牌身份驗證。讓我們簡要介紹每種方法的內容:

  • 基本HTTP身份驗證:在此身份驗證方法中,使用者憑據(通常是使用者名稱和密碼)透過授權HTTP標頭傳送。請求反過來傳回一個WWW-Authenticate標頭,其中包含一個Basic值和一個可選的realm引數,該引數指示進行身份驗證請求的資源。
  • Cookie:當資料要儲存在客戶端(如Web瀏覽器)時,會使用Cookie。FastAPI應用程式也可以使用Cookie儲存使用者資料,伺服器可以檢索這些資料以進行身份驗證。
  • 持有者令牌身份驗證:此身份驗證方法涉及使用稱為持有者令牌的安全令牌。這些令牌與Bearer關鍵字一起在授權標頭請求中傳送。最常用的令牌是JWT,它通常是一個包含使用者ID和令牌到期時間的字典。

依賴注入

依賴注入是一種模式,其中物件(在本例中為函式)接收執行函式所需的例項變數。 在FastAPI中,透過在路徑操作函式引數中宣告依賴項來注入依賴項。

@user_router.post("/signup")
async def sign_user_up(user: User) -> dict:
    user_exist = await User.find_one(User.email == user.email)

在這段程式碼中,定義的依賴項是User模型類別,它被注入到sign_user_up()函式中。透過將User模型注入到使用者函式引數中,我們可以輕鬆檢索物件的屬性。

建立和使用依賴項

在FastAPI中,依賴項可以定義為函式或類別。建立的依賴項使我們能夠存取其底層值或方法,從而無需在繼承它們的函式中建立這些物件。依賴注入有助於減少某些情況下的程式碼重複,例如強制身份驗證和授權。

定義依賴項範例

async def get_user(token: str):
    user = decode_token(token)
    return user

此依賴項是一個函式,它接受token作為引數,並從外部函式decode_token傳回使用者引數。要使用此依賴項,將相關函式引數宣告為具有Depends引數,例如:

from fastapi import Depends

@router.get("/user/me")
async def get_user_details(user: User = Depends(get_user)):
    return user

內容解密:

  1. get_user 函式是一個依賴項,它根據提供的 token 解碼並傳回對應的使用者。
  2. Depends 類別負責執行傳遞給它的函式,並使其傳回值在端點中可用。
  3. get_user_details 路由函式依賴於 get_user 函式,這意味著要存取該路由,必須滿足 get_user 依賴項。

使用OAuth2和JWT保護應用程式

在本文中,我們將為事件規劃應用程式構建身份驗證系統。我們將使用OAuth2密碼流,該流要求客戶端將使用者名稱和密碼作為表單資料傳送。在我們的例子中,使用者名稱是建立帳戶時使用的電子郵件。

當表單資料從客戶端傳送到伺服器時,存取令牌(即簽名的JWT)將作為回應傳送。通常,在建立令牌以允許進一步授權之前,會進行後台檢查以驗證傳送到伺服器的憑據。要授權經過身份驗證的使用者,JWT在透過標頭傳送以授權伺服器上的操作時會加上Bearer字首。

JWT是什麼?為什麼要簽名?

JWT是一種編碼字串,通常包含一個字典,其中包含有效載荷、簽名及其演算法。JWT使用只有伺服器和客戶端知道的唯一金鑰進行簽名,以避免外部人員篡改編碼字串。

圖表說明:身份驗證流程

此圖示說明瞭身份驗證流程,包括使用者請求、伺服器驗證憑據、生成JWT以及使用JWT進行授權的步驟。

內容解密:

  1. 使用者向伺服器傳送使用者名稱和密碼。
  2. 伺服器驗證憑據,如果有效,則生成JWT。
  3. JWT被傳回給使用者,使用者可以使用它進行授權。
  4. 使用者在後續請求中使用JWT進行授權,伺服器驗證JWT以確定使用者的身份。

使用OAuth2和JWT保護應用程式

現在我們已經瞭解了身份驗證流程的工作原理,接下來讓我們建立必要的資料夾和檔案,以在我們的應用程式中設定身份驗證系統。

建立身份驗證相關檔案

首先,在專案資料夾中建立auth資料夾:

(venv)$ mkdir auth

接下來,在auth資料夾中建立以下檔案:

(venv)$ cd auth && touch {__init__,jwt_handler,authenticate,hash_password}.py

上述命令建立了四個檔案:

  • jwt_handler.py:此檔案將包含編碼和解碼JWT字串所需的函式。
  • authenticate.py:此檔案將包含authenticate依賴項,該依賴項將被注入到我們的路由中以強制執行身份驗證和授權。
  • hash_password.py:此檔案將包含在註冊過程中加密使用者密碼以及在登入過程中比較密碼的函式。
  • __init__.py:此檔案表示該資料夾的內容是一個模組。

雜湊密碼

在前一章中,我們以純文字形式儲存使用者密碼。這在構建API時是一種非常不安全且被禁止的做法。密碼應該使用適當的函式庫進行加密或雜湊。我們將使用bcrypt來加密使用者密碼。

首先,安裝passlib函式庫:

(venv)$ pip install passlib[bcrypt]

接下來,在hash_password.py中建立雜湊密碼的函式:

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class HashPassword:
    def create_hash(self, password: str):
        return pwd_context.hash(password)

    def verify_hash(self, plain_password: str, hashed_password: str):
        return pwd_context.verify(plain_password, hashed_password)

在上述程式碼區塊中,我們首先匯入了CryptContext,它採用bcrypt方案來雜湊傳遞給它的字串。上下文儲存在pwd_context變數中,使我們能夠存取執行任務所需的方法。

更新註冊路由

現在我們已經建立了一個類別來處理密碼的雜湊,接下來讓我們更新註冊路由,以便在將使用者密碼儲存到資料函式庫之前對其進行雜湊:

# routes/users.py
from auth.hash_password import HashPassword
from database.connection import Database

user_database = Database(User)
hash_password = HashPassword()

@user_router.post("/signup")
async def sign_user_up(user: User) -> dict:
    user_exist = await User.find_one(User.email == user.email)
    if user_exist:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="User with email provided exists already."
        )
    hashed_password = hash_password.create_hash(user.password)
    user.password = hashed_password
    await user_database.save(user)
    return {
        "message": "User created successfully"
    }

內容解密:

  1. 匯入必要的模組和類別,包括HashPasswordDatabase
  2. 建立HashPassword例項,用於雜湊使用者密碼。
  3. 在註冊路由中,首先檢查使用者是否已經存在。
  4. 如果使用者不存在,則對使用者密碼進行雜湊,並將雜湊後的密碼儲存到資料函式庫中。

建立和驗證存取令牌

建立JWT使我們更進一步地保護我們的應用程式。令牌的有效負載將包含使用者ID和到期時間,然後編碼為一個長字串。

JWT結構圖示

@startuml
skinparam backgroundColor #FEFEFE
skinparam sequenceArrowThickness 2

title FastAPI Beanie CRUD 操作與應用安全

actor "客戶端" as client
participant "API Gateway" as gateway
participant "認證服務" as auth
participant "業務服務" as service
database "資料庫" as db
queue "訊息佇列" as mq

client -> gateway : HTTP 請求
gateway -> auth : 驗證 Token
auth --> gateway : 認證結果

alt 認證成功
    gateway -> service : 轉發請求
    service -> db : 查詢/更新資料
    db --> service : 回傳結果
    service -> mq : 發送事件
    service --> gateway : 回應資料
    gateway --> client : HTTP 200 OK
else 認證失敗
    gateway --> client : HTTP 401 Unauthorized
end

@enduml

此圖示展示了JWT的基本結構,包括標頭、有效負載和簽名。

更新設定類別和環境檔案

首先,更新database/database.py中的Settings類別,以及.env環境檔案,以包含一個SECRET_KEY變數,用於簽署JWT:

# database/database.py
class Settings(BaseSettings):
    SECRET_KEY: Optional[str] = None
# .env
SECRET_KEY=HI5HL3V3L$3CR3T

內容解密:

  1. Settings類別中新增SECRET_KEY屬性,用於儲存簽署JWT的金鑰。
  2. .env檔案中設定SECRET_KEY的值。

建立存取令牌

接下來,在jwt_handler.py中建立一個函式,用於建立存取令牌:

import time
from datetime import datetime
from fastapi import HTTPException, status
from jose import jwt, JWTError
from database.database import Settings

settings = Settings()

def create_access_token(user: str) -> str:
    payload = {
        "user": user,
        "expires": time.time() + 3600
    }
    token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
    return token

內容解密:

  1. 匯入必要的模組和類別,包括jwtSettings
  2. 建立一個函式,用於建立存取令牌。
  3. 在有效負載中包含使用者ID和到期時間。
  4. 使用HS256演算法對有效負載進行簽署。

驗證存取令牌

接下來,建立一個函式,用於驗證存取令牌的真實性:

def verify_access_token(token: str) -> dict:
    try:
        data = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
        expire = data.get("expires")
        if expire is None:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="No access token supplied"
            )
        if datetime.utcnow() > datetime.utcfromtimestamp(expire):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Token expired!"
            )
        return data
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid token"
        )

內容解密:

  1. 使用jwt.decode函式對令牌進行解碼。
  2. 檢查令牌是否包含到期時間。
  3. 檢查令牌是否已過期。
  4. 如果令牌無效或已過期,則引發HTTP異常。