在 Flask 應用程式中,處理耗時任務時,若採用同步方式,容易造成請求阻塞,影響使用者經驗。本文介紹如何結合 UUID 和佇列機制,實作非同步任務處理,並進一步探討如何最佳化 HTTP 客戶端,提升應用程式整體效能。首先,利用 UUID 生成唯一任務識別碼,將任務加入佇列,並立即傳回 202 狀態碼和 Location 標頭給客戶端,讓客戶端可以非同步輪詢任務結果。後端則使用獨立執行緒持續處理佇列中的任務,並將結果儲存,供客戶端查詢。此外,對於頻繁的 HTTP 請求,使用 requests 模組的 Session 物件可以建立持續連線和連線池,減少 TCP 握手次數,降低延遲。更進一步,利用 concurrent.futures 模組可以平行處理多個 HTTP 請求,避免同步等待,充分發揮多核心處理器的效能優勢。

安裝套件

pip install flask uuid

程式碼實作

以下是使用 Flask 和 UUID 處理非同步任務的程式碼實作:

from flask import Flask, Response, jsonify, request
import uuid
from queue import Queue
import threading

app = Flask(__name__)

# 初始化結果和任務佇列
RESULTS = {}
JOBS = Queue()

def to_url(value):
    """將值轉換為 URL"""
    return str(value)

@app.route("/sum/<uuid:job>", methods=['GET'])
def get_job(job):
    """取得任務結果"""
    if job not in RESULTS:
        return Response(status=404)
    if RESULTS[job] is None:
        return jsonify({"status": "waiting"})
    return jsonify({"status": "done", "result": RESULTS[job]})

@app.route("/sum", methods=['POST'])
def post_job():
    """提交新任務"""
    # 生成隨機任務 ID
    job_id = uuid.uuid4()
    # 儲存任務到佇列中
    RESULTS[job_id] = None
    numbers = request.args.getlist('number', type=int)
    JOBS.put((job_id, numbers))
    # 傳回 202 狀態碼和 Location 標頭
    return Response(
        headers={"Location": to_url(f"/sum/{job_id}")},
        status=202
    )

def compute_jobs():
    """計算任務結果"""
    while True:
        job_id, numbers = JOBS.get()
        RESULTS[job_id] = sum(numbers)
        JOBS.task_done()

if __name__ == "__main__":
    # 啟動計算任務執行緒
    t = threading.Thread(target=compute_jobs)
    t.daemon = True  # 設定為 daemon 執行緒
    t.start()
    app.run()

解釋

  1. 我們首先初始化 Flask 應用程式和必要的資料結構,包括 RESULTSJOBS
  2. /sum/<uuid:job> 路由用於取得任務結果。如果任務不存在,傳回 404 狀態碼。如果任務正在等待,傳回 { "status": "waiting" }。如果任務已完成,傳回 { "status": "done", "result": <result> }
  3. /sum 路由用於提交新任務。它生成一個隨機的任務 ID,儲存任務到佇列中,並傳回 202 狀態碼和 Location 標頭。
  4. compute_jobs 函式是一個無限迴圈,從佇列中取出任務,計算結果,並更新 RESULTS
  5. 最後,我們啟動計算任務執行緒和 Flask 應用程式。

測試

您可以使用 curl 命令測試這個應用程式:

curl -X POST -H "Content-Type: application/json" -d "number=1&number=2&number=3" http://localhost:5000/sum

然後,您可以使用以下命令取得任務結果:

curl -X GET http://localhost:5000/sum/<job_id>

請將 <job_id> 替換為實際的任務 ID。

圖表翻譯:

  flowchart TD
    A[提交任務] --> B[生成任務 ID]
    B --> C[儲存任務到佇列中]
    C --> D[啟動計算任務執行緒]
    D --> E[計算任務結果]
    E --> F[更新 RESULTS]
    F --> G[傳回 202 狀態碼和 Location 標頭]

這個圖表展示了提交任務到取得任務結果的整個過程。

背景執行緒與非同步處理

在現代網路應用中,為了提高系統的吞吐量和回應速度,開發者經常會使用背景執行緒(daemon thread)來處理耗時的任務。這樣可以讓主執行緒專注於處理請求和回應,從而提高系統的整體效能。

基本原理

當客戶端向伺服器傳送請求時,伺服器會建立一個新的執行緒來處理這個請求。這個執行緒可以是背景執行緒,也可以是前臺執行緒。背景執行緒的特點是,它會在主程式結束後自動終止,所以不需要手動停止它。

實作方法

在Python中,可以使用threading模組來建立背景執行緒。以下是一個簡單的例子:

import threading
import queue

# 建立一個佇列來儲存資料
q = queue.Queue()

# 定義一個函式來處理資料
def process_data():
    while True:
        # 從佇列中取出資料
        data = q.get()
        # 處理資料
        result = sum(data)
        # 將結果放回佇列中
        q.put(result)
        # 標記任務完成
        q.task_done()

# 建立一個背景執行緒
t = threading.Thread(target=process_data)
t.daemon = True  # 設定為背景執行緒

# 啟動執行緒
t.start()

# 主程式
while True:
    # 接收客戶端請求
    request = input("請輸入資料:")
    # 將資料放入佇列中
    q.put([int(x) for x in request.split(",")])
    # 等待結果
    result = q.get()
    # 輸出結果
    print("結果:", result)

在這個例子中,背景執行緒負責處理資料,主程式負責接收客戶端請求和輸出結果。

###優點和缺點 使用背景執行緒有以下優點:

  • 可以提高系統的吞吐量和回應速度
  • 可以減少主程式的負擔

但是,也有以下缺點:

  • 背景執行緒的終止可能會導致資料丟失
  • 需要合理的同步機制來避免資料競爭

改進方法

為了改進上述方法,可以使用以下兩種方案:

  1. 實作串流: 客戶端可以連線到一個特殊的端點,伺服器會在結果可用時推播結果給客戶端。
  2. 實作 webhook: 伺服器可以儲存一個 URL,當結果可用時,伺服器會呼叫這個 URL 將結果推播給客戶端。

這兩種方案都可以提高系統的效能和回應速度,但是需要客戶端和伺服器之間的額外協定和同步機制。

9.5 快速 HTTP 客戶端

在軟體開發中,撰寫客戶端程式以與伺服器軟體進行通訊是非常常見的需求。同時,隨著 REST API 的普遍性,其最佳化模式已經成為當前的必備知識。

最佳化 TCP 連線

雖然有多種方法可以最佳化底層的 TCP 連線,但是由於其中許多方法需要進行作業系統層面的修改,因此這些內容不在本文的討論範圍之內。

Python 中的 HTTP 客戶端

Python 中有許多 HTTP 客戶端函式庫,但是最常用且易於使用的函式庫是 requests。下面我們將探討如何使用 requests 來最佳化 HTTP 連線。

持續連線

首先,使用持續連線(persistent connection)是最佳化 HTTP 連線的一種方法。自 HTTP 1.1 起,持續連線就已經成為標準,但是很多應用程式並沒有充分利用這個功能。當使用 requests 的簡單模式(例如使用 get 函式)時,連線會在回應傳回後立即關閉。為了避免這個問題,應用程式需要使用 Session 物件,這允許重複使用已經開啟的連線。

使用 Session 物件

import requests

session = requests.Session()

每個連線都會被存放在一個連線池中(預設大小為 10),而連線池的大小也可以進行組態,如下例所示:

組態連線池大小

import requests

session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=100,
    pool_maxsize=100
)

重複使用 TCP 連線來傳送多個 HTTP 請求可以帶來以下幾個效能優勢:

  • 降低 CPU 和記憶體使用率(同時開啟的連線數量較少)。
  • 減少後續請求的延遲時間(不需要進行 TCP 握手)。
  • 當發生異常時,不需要支付關閉 TCP 連線的代價。

HTTP 管道技術

HTTP 協定也提供了管道技術(pipelining),允許在同一連線上傳送多個請求,而不需要等待前一個請求的回應。然而,requests 目前不支援這個功能。另外,即使管道技術可以提高效率,但是它可能不如平行傳送請求那樣快,因為 HTTP 1.1 協定要求伺服器按照請求傳送的順序傳回回應。

requests 的限制

requests 有一個主要的限制:它是同步的。當呼叫 requests.get() 時,程式會等待直到伺服器傳回完整的回應。這可能會導致應用程式等待和閒置,而不是進行其他工作。

解決方案:使用 Futures

一個聰明的應用程式可以透過使用 concurrent.futures 來緩解這個問題。它允許以非常快速的方式平行化 HTTP 請求。

使用 Futures 平行化 HTTP 請求

from concurrent import futures
import requests

with futures.ThreadPoolExecutor(max_workers=4) as executor:
    #...

這樣,可以大大提高應用程式的效率和反應速度。

非同步HTTP請求的最佳化

在進行多個HTTP請求時,同步請求會導致程式阻塞,等待伺服器回應後才繼續執行。為瞭解決這個問題,我們可以使用非同步請求的方式。

從效能最佳化視角來看,本文探討了使用 Flask 和 UUID 處理非同步任務以及最佳化 HTTP 請求的策略。分析了使用佇列、背景執行緒以及非同步 HTTP 請求提升應用程式效能的方法,並深入探討了使用 Python requests 函式庫進行 HTTP 連線最佳化的技巧,包括持續連線和連線池的組態。同時也指出了 requests 函式庫同步性的限制,以及使用 concurrent.futures 進行非同步請求以提升效能的解決方案。然而,本文並未涵蓋所有 HTTP 客戶端函式庫,也未深入探討 TCP 連線的底層最佳化。對於追求極致效能的應用,可以考慮使用更底層的網路程式設計技術,例如 asyncioTwisted,以及 HTTP/2 和 HTTP/3 等新一代 HTTP 協定。展望未來,隨著網路技術的發展,預計會有更多高效能的非同步 HTTP 客戶端函式庫和工具出現,進一步簡化非同步程式設計的複雜度,並提升網路應用程式的效能。對於開發者而言,持續關注這些新興技術,並將其應用於實際專案中,將是保持競爭力的關鍵。