Celery 作為 Python 常用的分散式任務佇列框架,能有效處理非同步任務與延遲執行。本文將詳細說明如何設定 Celery,並結合 RabbitMQ 訊息佇列,建構一個穩健的訊息處理系統。首先,我們會探討 Celery 設定檔的結構與常用引數,例如序列化方式、訊息代理地址、結果儲存後端等。接著,示範如何定義 Celery 任務,並解決本地模組名稱與系統套件衝突的問題。此外,文章也涵蓋 Systemd 整合,確保 Celery 服務穩定執行。更進一步,將介紹如何利用 RabbitMQ 的路由機制,將任務分配到特定佇列,以及如何使用 Celery 的廣播功能,將訊息同時傳送到多個工作節點,提升系統效率。最後,將探討生產者-消費者外掛架構,說明如何透過外掛擴充 Celery 應用,並以 MySQL 效能調校應用程式為例,展示如何整合 Celery 與外掛框架,實作更複雜的系統功能。

分散式訊息處理系統:Celery 設定與應用

Celery 設定檔總覽

Celery 應用程式通常不需要大量的設定。主要的設定包括指定新任務的來源(通常是執行在相同主機上的訊息佇列)、結果的儲存位置(通常是相同的訊息佇列),以及一些環境特定的設定。最簡單的管理 Celery 設定檔的方式是將所有的設定專案放在一個獨立的檔案中,並從主應用程式中匯入。

設定檔範例

在我們的範例中,設定檔被命名為 celeryconfig.py,其內容如以下所示:

CELERY_TASK_SERIALIZER = 'json'  # 只允許使用 JSON 進行物件序列化
CELERY_RESULT_SERIALIZER = 'json'  # 以前預設使用 Python pickle 物件,但它們不安全且將被棄用
CELERY_ACCEPT_CONTENT = ['json', ]  # 設定允許的序列化格式
BROKER_URL = 'amqp://guest@localhost//'  # 指定新任務的來源
CELERY_RESULT_BACKEND = 'amqp'  # 指定任務結果的儲存位置
CELERY_IMPORTS = ('celery_app.tasks.geometry', 'celery_app.tasks.arithmetics',)  # 包含 Celery 任務的模組
CELERY_TASK_RESULT_EXPIRES = 3600  # 設定任務結果的過期時間(秒)

設定專案說明

設定專案說明
CELERY_TIMEZONE指定 Celery 使用的時區,預設為 UTC。
CELERYD_CONCURRENCY指定 Celery worker 可以執行的平行程式或執行緒數量。
CELERY_RESULT_BACKEND指定任務結果的儲存後端,預設不儲存結果。
CELERY_RESULT_SERIALIZER指定任務結果的序列化方法,預設為 Python 的 pickle 方法。
CELERY_ACCEPT_CONTENT指定允許的序列化格式。
CELERY_TASK_RESULT_EXPIRES指定任務結果的過期時間(秒)。
CELERY_TASK_SERIALIZER指定傳送任務資料到 worker 程式時的序列化方法。
CELERY_IMPORTS指定 Celery worker 需要匯入的模組。

主要的 Celery 應用程式檔案

這個檔案用於初始化 Celery 應用程式並從設定模組載入設定。

from __future__ import absolute_import
from celery import Celery
from celery_app import celeryconfig

app = Celery()
app.config_from_object(celeryconfig)

if __name__ == '__main__':
    app.start()

程式碼解析

  1. 匯入必要的模組:首先,我們從 __future__ 匯入 absolute_import 以確保正確的匯入行為。然後,從 celery 匯入 Celery 類別,並從 celery_app 匯入 celeryconfig 模組。
  2. 建立 Celery 應用程式例項:使用 Celery() 建立一個新的 Celery 應用程式例項。
  3. 載入設定:呼叫 config_from_object() 方法並傳入 celeryconfig 以載入設定。
  4. 啟動應用程式:如果此指令碼被直接執行(而不是被匯入),則呼叫 app.start() 以啟動 Celery 應用程式。

分散式訊息處理系統

Celery 設定與任務管理

在設定 Celery 應用程式時,開發者可能會遇到一個常見的問題:當本地模組名稱與系統套件名稱衝突時,Python 直譯器會無法正確判斷應該匯入哪一個模組。例如,在本案例中,我們將 Celery 應用程式命名為 celery.py,這與官方的 celery 套件名稱相同。為瞭解決這個問題,我們需要在檔案開頭加入特定的匯入陳述式,以確保 Python 直譯器優先考慮 sys.path 中的模組。

from __future__ import absolute_import

內容解密:

這段程式碼的作用是強制 Python 直譯器使用絕對匯入(absolute import),以避免本地模組與官方套件之間的名稱衝突。這樣可以確保我們能夠正確地匯入官方的 celery 套件,而不會被本地的 celery.py 檔案幹擾。

Celery 任務

在我們的專案結構中,所有需要 Celery 管理的背景任務都被移到了一個獨立的子模組目錄中。在這個目錄下,我們根據任務的功能將其分為不同的檔案,例如算術運算和幾何運算。

算術運算任務

from __future__ import absolute_import
from celery_app.celery import app

@app.task
def add(a, b):
    return a + b

@app.task
def sub(a, b):
    return a - b

內容解密:

這段程式碼定義了兩個算術運算任務:加法和減法。透過使用 @app.task 裝飾器,我們將這兩個函式註冊為 Celery 任務,使其能夠被 Celery 工作節點執行。

幾何運算任務

from __future__ import absolute_import
from celery_app.celery import app

@app.task
def rect_area(h, w):
    return h * w

@app.task
def circle_area(r):
    import math
    return math.pi * r ** 2

內容解密:

這段程式碼定義了兩個幾何運算任務:矩形面積和圓形面積。同樣地,這些函式被註冊為 Celery 任務,以便在背景執行。

Systemd 設定

為了讓 Celery 守護程式能夠正確地找到我們的任務模組,我們需要調整 systemd 設定檔 /etc/conf.d/celery,將 CELERY_APP 變數設定為我們的 Celery 應用程式路徑。

CELERY_APP="celery_app.celery"

內容解密:

這行設定指定了 Celery 應用程式的位置,使 systemd 能夠正確地啟動和管理 Celery 守護程式。

路由任務

在訊息佇列系統中,一個重要的功能是能夠路由訊息到特定的佇列。Celery 繼承了 RabbitMQ 的這一功能,使得我們能夠根據任務的型別將其分配到不同的工作節點。

訊息佇列系統架構

內容解密:

這張圖展示了訊息佇列系統的基本架構。應用程式呼叫背景任務後,任務詳細資訊被序列化並提交到交換機。交換機根據路由鍵將訊息轉發到相應的佇列,最終由工作節點消費並執行。

繫結工作節點到特定佇列

為了有效地使用佇列,我們需要指示工作節點繫結到特定的佇列,並標記任務以便正確路由。預設情況下,所有未標記的任務都會被送到名為「celery」的預設佇列。

建立新佇列

我們可以建立一個新的佇列,例如「calc」,以便將所有計算相關的任務送到繫結到此佇列的工作節點。

celery -A celery_app.celery worker --loglevel=info --without-gossip --without-mingle --without-heartbeat -Q calc

內容解密:

這條命令啟動了一個 Celery 工作節點,並指定它只消費來自「calc」佇列的任務。這樣,我們可以根據任務型別將其分配到不同的工作節點,提高系統的擴充套件性和效率。

分散式訊息處理系統

在分散式系統中,訊息處理扮演著至關重要的角色。本章將探討任務和訊息佇列系統,並介紹如何使用 Celery 和 RabbitMQ 實作分散式訊息處理。

設定 Celery 佇列

首先,需要在 /etc/conf.d/celery 中新增設定:

CELERY_QUEUES="calc"

接著,修改 systemd 服務定義檔案 /usr/lib/systemd/system/celery.service,以確保 Celery 守護程式啟動時使用該引數:

ExecStart=/bin/celery multi start "${CELERYD_NODES}" -A "${CELERY_APP}" -Q "${CELERY_QUEUES}" --pidfile="${CELERYD_PID_FILE}" --logfile="${CELERYD_LOG_FILE}" --loglevel="${CELERYD_LOG_LEVEL}"

內容解密:

  • /etc/conf.d/celery:Celery 的設定檔,用於定義佇列名稱。
  • /usr/lib/systemd/system/celery.service:systemd 服務定義檔案,用於管理 Celery 守護程式。
  • CELERY_QUEUES:定義 Celery 佇列名稱的變數。
  • ExecStart:定義 Celery 守護程式啟動命令的引數。

重新啟動 Celery 程式後,可以看到程式繫結到新的佇列:

# ps auxww | grep celery
celery 4760 0.7 1.0 246404 21472 ? S 14:27 0:00 /usr/bin/python -m celery worker -n worker@fedora.local -A celery_app.celery --loglevel=INFO -Q calc --logfile=/var/log/celery/worker.log --pidfile=/run/celery/worker.pid

內容解密:

  • ps auxww | grep celery:用於檢視 Celery 程式的命令。
  • -Q calc:指定 Celery 工作程式監聽 calc 佇列。

任務路由

如果執行 calculator.py 應用程式,會發現任務沒有被處理,因為工作程式不再監聽預設佇列。使用 rabbitmqctl 命令檢視 RabbitMQ 中的佇列狀態:

# rabbitmqctl list_queues
Listing queues ...
1159cf27f68247da9885495e63c7dd1c 0
calc 0
celery 2
celeryev.601d558c-6354-4265-9704-a225948bb052 0
e289f4c20f754489944f75e1ee7c8ac6 0
worker@fedora.local.celery.pidbox 0
...done.

內容解密:

  • rabbitmqctl list_queues:用於檢視 RabbitMQ 中佇列狀態的命令。
  • celerycalc:佇列名稱。

為瞭解決任務未被處理的問題,需要在 calculator.py 中指定佇列名稱:

r = tasks.geometry.rect_area.apply_async((2, 2), queue="calc")

內容解密:

  • apply_async:用於非同步呼叫任務的方法。
  • queue="calc":指定任務傳送到 calc 佇列。

廣播訊息

Celery 提供廣播訊息機制,可以將訊息傳送到所有監聽特定佇列的工作程式。在 celeryconfig.py 中定義兩個佇列:

from kombu import Queue
from kombu.common import Broadcast

CELERY_QUEUES = (
    Queue("calc"),
    Broadcast("broadcast_calc"),
)

內容解密:

  • QueueBroadcast:用於定義佇列的類別。
  • CELERY_QUEUES:定義 Celery 佇列的變數。

修改 calculator.py 以傳送任務到不同的佇列:

def test_tasks():
    print("Submitting job...")
    r = tasks.geometry.rect_area.apply_async((2, 2), queue="calc")
    print(r.info)
    print("Job completed")
    print("Submitting broadcast job...")
    r = tasks.arithmetics.add.apply_async((1, 1), queue="broadcast_calc")
    print(r.info)
    print("Job completed")

內容解密:

  • test_tasks:測試任務的函式。
  • apply_async:用於非同步呼叫任務的方法。

Celery 工作流程

此圖示展示了 Celery 的工作流程,從任務提交到工作程式處理任務並傳回結果的整個過程。

圖表內容解密:

  • A[任務提交]:表示任務被提交到系統中。
  • B(Celery 佇列):表示 Celery 中的佇列,用於暫存待處理的任務。
  • C[工作程式]:表示監聽佇列並處理任務的工作程式。
  • D[結果]:表示工作程式處理任務後產生的結果。
  • E[任務提交者]:表示提交任務的客戶端或應用程式。

自動化 MySQL 資料函式庫效能調校

在這一章中,我們將擴充套件在第6章中建立的外掛框架。該框架允許透過在主應用程式碼之外實作新方法來擴充套件應用程式的功能。新的框架將允許外掛生成資料並將其提交回應用程式,以便其他外掛也能使用它。根據新的框架,我們將建立一個檢查 MySQL 資料函式庫組態和即時統計資料並提出效能調校建議的應用程式。我們將研究一些調校引數並編寫一些外掛。

需求規格與設計

作為系統管理員,您可能被要求提高 MySQL 資料函式庫伺服器的效能。這是一項創造性和具有挑戰性的任務,但同時也可能相當令人望而生畏。資料函式庫軟體本身就是一個複雜的軟體,而且您還必須考慮外部因素,例如執行環境——CPU核心數量和記憶體大小。除此之外,實際的表結構和 SQL 陳述式結構也扮演著非常重要的角色。

設計挑戰

您可能已經開發了自己的策略來解決這個問題。我提到「自己的策略」的原因是,不幸的是,沒有一個通用的解決方案來調校 MySQL 資料函式庫。每個安裝都是獨特的,需要個別的方法。有各種解決方案可幫助您識別資料函式庫中最常見的問題,包括商業選項,如 MySQL Enterprise Monitor(http://mysql.com/products/enterprise/monitor.html)和開源工具,如 MySQLTuner(http://blog.mysqltuner.com/)。這些工具的主要目的是透過提供系統組態和行為的洞察來自動化調校過程。

基本應用程式需求

在第6章中,我們討論了根據外掛的架構的優點。在這種架構中,主(宿主)應用程式為外掛提供了一些通用服務,外掛要麼擴充套件了主應用程式的功能,要麼實際提供了服務。從使用者的角度來看,系統作為一個實體運作。

這使我們列出了要在本章中構建的應用程式的基本需求列表:

  • 應用程式應該易於擴充套件、修改和增強新功能。
  • 應用程式應該專注於收集和處理來自 MySQL 資料函式庫的效能觀察結果。
  • 效能調校規則應該易於在不同應用程式例項之間傳輸和交換。

系統設計

作為應用程式的基礎,我們將使用在第6章中建立的外掛框架。我們可以直接使用它,用 MySQL 資料收集功能替換讀取日誌行的部分,並開始編寫消耗資料的外掛模組。這種方法在短期內會很好地為我們服務,但從長遠來看,它可能不是最具擴充套件性的解決方案。問題在於,雖然我們可以立即識別 MySQL 組態引數和狀態變數,但我們將難以處理作業系統狀態引數。這是因為沒有確定的資訊來源。每個系統都不同,可能需要不同的工具來報告狀態。

生產者-消費者外掛架構

解決這個問題的方法是將生成資訊的任務從宿主應用程式移動到外掛模組。換句話說,一些外掛將生成資料,其他外掛將依賴這些資料進行計算,並最終提出效能改進建議。在這種場景中,宿主應用程式僅充當排程器,它提供的唯一服務是與資料函式庫伺服器的連線。其餘功能由外掛提供。圖13-1顯示了生產者/消費者外掛架構的示意圖。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Celery分散式任務佇列設定與應用

package "NumPy 陣列操作" {
    package "陣列建立" {
        component [ndarray] as arr
        component [zeros/ones] as init
        component [arange/linspace] as range
    }

    package "陣列操作" {
        component [索引切片] as slice
        component [形狀變換 reshape] as reshape
        component [堆疊 stack/concat] as stack
        component [廣播 broadcasting] as broadcast
    }

    package "數學運算" {
        component [元素運算] as element
        component [矩陣運算] as matrix
        component [統計函數] as stats
        component [線性代數] as linalg
    }
}

arr --> slice : 存取元素
arr --> reshape : 改變形狀
arr --> broadcast : 自動擴展
arr --> element : +, -, *, /
arr --> matrix : dot, matmul
arr --> stats : mean, std, sum
arr --> linalg : inv, eig, svd

note right of broadcast
  不同形狀陣列
  自動對齊運算
end note

@enduml

此圖示展示了生產者/消費者外掛架構,其中宿主應用程式負責排程命令,而外掛則負責生成和使用分享資訊。

內容解密:

圖13-1中的Plantuml圖表描述了一個生產者/消費者外掛架構。在這個架構中,外掛框架包含一個外掛登入檔和一個外掛管理器,用於管理生產者和消費者外掛。宿主應用程式負責執行生產者和消費者,並排程命令給外掛管理器。這種設計允許宿主應用程式保持簡單,而將複雜的邏輯委託給外掛。