在前兩篇文章中,我們探討了 Rust 的非同步模型與執行環境的工作原理。現在,讓我們透過一個實際案例來應用這些概念:模擬加熱器運作並觀察熱損失。這是一個典型的非同步應用場景,涉及多個平行執行的任務,例如控制加熱器的溫度變化、模擬環境熱損失以及監測系統狀態。
實作加熱器與熱損失觀察者:加熱器觀察者設計
加熱器觀察者需要讀取HEAT_ON
布林值,但不直接關心溫度。不過,加熱不是瞬時的過程—加熱器需要時間來升溫。因此,我們的加熱器未來需要一個時間快照:
pub struct HeaterFuture {
pub time_snapshot: Instant,
}
impl HeaterFuture {
pub fn new() -> Self {
HeaterFuture {
time_snapshot: Instant::now()
}
}
}
有了時間快照後,我們可以在特定持續時間後增加溫度,實作poll
函式:
impl Future for HeaterFuture {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Self::Output> {
// 如果加熱器關閉,重設時間快照並立即回傳
if HEAT_ON.load(Ordering::SeqCst) == false {
self.time_snapshot = Instant::now();
cx.waker().wake_by_ref();
return Poll::Pending
}
// 檢查是否經過了足夠時間
let current_snapshot = Instant::now();
if current_snapshot.duration_since(self.time_snapshot) <
Duration::from_secs(3) {
cx.waker().wake_by_ref();
return Poll::Pending
}
// 增加溫度並重設時間快照
TEMP.fetch_add(3, Ordering::SeqCst);
self.time_snapshot = Instant::now();
cx.waker().wake_by_ref();
return Poll::Pending
}
}
在我實作加熱器觀察者時,我注意到幾個關鍵點:
當
HEAT_ON
標誌關閉時,我們盡快結束,因為此時不會發生任何事情。這樣可以避免阻塞其他未來任務。如果時間未超過3秒,我們也結束,因為加熱效果需要時間。
當時間已經過去與
HEAT_ON
標誌開啟,我們將溫度增加3單位。我們在每次
HEAT_ON
為false或時間不足時更新time_snapshot
。如果不這樣做,當HEAT_ON
從false切換到true時,溫度變化會瞬間發生,而不是在3秒後。
熱損失觀察者設計
熱損失觀察者的結構與加熱器類別似:
pub struct HeatLossFuture {
pub time_snapshot: Instant,
}
impl HeatLossFuture {
pub fn new() -> Self {
HeatLossFuture {
time_snapshot: Instant::now()
}
}
}
熱損失的poll
實作略有不同:
impl Future for HeatLossFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
let current_snapshot = Instant::now();
// 每3秒減少1度溫度
if current_snapshot.duration_since(self.time_snapshot) >
Duration::from_secs(3) {
TEMP.fetch_sub(1, Ordering::SeqCst);
self.time_snapshot = Instant::now();
}
cx.waker().wake_by_ref();
return Poll::Pending
}
}
熱損失觀察者只在效果發生後才重設快照,因為在我們的模擬中,熱損失是一個恆定因素。
執行整合系統
現在我們有了所有的未來任務,它們會持續輪詢,直到程式結束。使用以下程式碼執行所有未來任務:
#[tokio::main]
async fn main() {
let display = tokio::spawn(async {
DisplayFuture::new().await;
});
let heat_loss = tokio::spawn(async {
HeatLossFuture::new().await;
});
let heater = tokio::spawn(async {
HeaterFuture::new().await;
});
display.await.unwrap();
heat_loss.await.unwrap();
heater.await.unwrap();
}
在系統達到期望溫度後,你應該會看到溫度在期望值上下輕微振盪。
振盪與系統理論
振盪是經典系統理論中的標準現象。如果我們在顯示器中增加時間快照並延遲HEAT_ON
標誌的切換,振盪會變得更大。這些振盪值得注意,因為它們可能導致系統難以理解或預測。
在我參與的一個供應鏈管理系統中,我親身體驗到:如果觀察者反應有延遲,而另一個觀察者也延遲對初始觀察者的結果做出反應,就可能導致混亂的系統。這也是COVID-19疫情期間及之後供應鏈中斷的重要原因之一。
Donella H. Meadows在《系統思考》一書中指出,需求反應的延遲可能在供應鏈中產生振盪。長供應鏈有多個部分同時振盪,如果振盪步調嚴重不同,就會形成難以解決的混沌系統。這部分解釋了為何疫情後供應鏈還原需要那麼長時間。
幸運的是,電腦系統通常是近乎即時的,但值得記住串聯延遲及對其反應的危險性。
使用回呼處理使用者輸入
為了從終端取得使用者輸入,我們將使用device_query
套件:
device_query = "1.1.3"
我們需要匯入這些特徵和結構:
use device_query::{DeviceEvents, DeviceState};
use std::io::{self, Write};
use std::sync::Mutex;
回呼機制簡介
回呼是非同步程式設計的一種形式,用於將函式傳入另一個函式,然後呼叫傳入的函式。以下是一個基本回呼函式的例子:
fn perform_operation_with_callback<F>(callback: F)
where
F: Fn(i32),
{
let result = 42;
callback(result);
}
fn callback_example() {
let my_callback = |result: i32| {
println!("結果是: {}", result);
};
perform_operation_with_callback(my_callback);
}
這個實作仍然是阻塞的。我們可以透過使用事件迴圈執行緒來使回呼對主執行緒非阻塞,這個迴圈接受傳入的回呼事件。
事件迴圈與非阻塞回呼
在我的網路應用開發經驗中,事件迴圈是處理非同步操作的核心。例如,Node.js伺服器通常有一個執行緒池,事件迴圈將事件傳遞給這個池。如果回呼有一個回到事件發出源的通道,資料可以在方便時傳送回事件源。
對於我們的輸入,我們需要追蹤裝置狀態和輸入:
static INPUT: LazyLock<Arc<Mutex<String>>> = LazyLock::new(|| {
Arc::new(Mutex::new(String::new()))
});
static DEVICE_STATE: LazyLock<Arc<DeviceState>> = LazyLock::new(|| {
Arc::new(DeviceState::new())
});
重構顯示系統
此時需要考慮程式碼結構。目前,當顯示未來檢查溫度時,顯示會更新,如果溫度有變化,就更新顯示。然而,這對於處理使用者輸入不再
回呼地獄與事件匯流排:構建更強大的反應式系統
回呼地獄的困境
回呼地獄(Callback Hell)是一種程式碼結構問題,當使用大量巢狀的回呼函式時會出現。這種情況使得程式碼難以維護和理解,特別是在處理複雜的非同步操作時。
目前我們已經建立了一個基本系統,可以接收使用者輸入。若要進一步探索這個系統,可以修改輸入程式碼來處理溫度變更需求。
但這裡有個問題:我們的系統只能對基本資料類別做出反應。如果系統需要複雜資料類別來表示事件呢?此外,系統可能需要知道事件的順序,並正確地對所有事件做出反應。
超越簡單輪詢
不是每個反應式系統都只需要對當前時刻的整數值做出反應。以股票交易系統為例,我們需要知道股票的歷史資料,而不僅是在輪詢時的當前價格。
此外,在非同步環境中,我們無法保證輪詢的確切時間。當我們進行輪詢時,我們希望能夠取得自上次輪詢以來發生的所有事件,以決定哪些事件重要。為此,我們需要一個可訂閱的事件匯流排。
使用事件匯流排實作廣播功能
事件匯流排是一個系統,能夠讓更大系統的各個部分傳送包含特定資訊的訊息。與簡單的發布/訂閱關係不同,事件匯流排可以在多個站點停靠,只有特定的人會下車。這意味著我們可以有多個訂閱者從單一來源接收更新,但這些訂閱者可以請求只接收特定類別的訊息,而不是每一條廣播訊息。
我們可以建立一個向事件匯流排發布事件的主題,然後多個觀察者可以按發布順序消費該事件。在本文中,玄貓將建立自己的事件匯流排,以探索背後的機制。不過,廣播通道在Tokio等套件中已經現成可用。
廣播通道類別似於廣播電台。當電台發出訊息時,只要調到相同頻道,多個聽眾就能收聽到相同的訊息。在程式設計中的廣播通道中,多個監聽者可以訂閱並接收相同的訊息。廣播通道不同於普通道。在普通道中,訊息由程式的一部分傳送,由另一部分接收。在廣播通道中,訊息由程式的一部分傳送,同一訊息被程式的多個部分接收。
除非有特定需求,否則使用現成的廣播通道比自己構建更好。
事件匯流排的依賴項和匯入
在構建事件匯流排前,我們需要以下依賴項:
tokio = { version = "1.26.0", features = ["full"] }
futures = "0.3.28"
以及這些匯入:
use std::sync::{Arc, Mutex, atomic::{AtomicU32, Ordering}};
use tokio::sync::Mutex as AsyncMutex;
use std::collections::{VecDeque, HashMap};
use std::marker::Send;
構建事件匯流排結構
由於非同步程式設計需要跨執行緒傳送結構以輪詢非同步任務,我們需要克隆每個發布的事件,並將這些克隆的事件分發給每個訂閱者消費。消費者還需要能夠存取事件的歷史記錄,以防消費者被延遲。消費者還需要能夠取消訂閱事件。考慮到所有這些因素,我們的事件匯流排結構如下:
pub struct EventBus<T: Clone + Send> {
chamber: AsyncMutex<HashMap<u32, VecDeque<T>>>,
count: AtomicU32,
dead_ids: Mutex<Vec<u32>>,
}
我們的事件(由T表示)需要實作Clone特性(以便克隆並分發給每個訂閱者)和Send特性(以便跨執行緒傳送)。chamber欄位是訂閱者可以用特定ID存取其事件佇列的地方。count欄位用於分配ID,dead_ids用於追蹤已取消訂閱的消費者。
注意chamber互斥鎖是非同步的,而dead_ids互斥鎖不是非同步的。chamber互斥鎖是非同步的,因為我們可能有大量訂閱者迴圈並輪詢chamber以存取其個別佇列。我們不希望執行器被等待互斥鎖的非同步任務阻塞,這會大降低系統效能。然而,對於dead_ids,我們不會迴圈輪詢此欄位,它只會在消費者想要取消訂閱時被存取。阻塞互斥鎖還使我們能夠在handle被丟棄時輕鬆實作取消訂閱流程。
對於事件匯流排結構,我們可以實作以下函式:
impl<T: Clone + Send> EventBus<T> {
pub fn new() -> Self {
Self {
chamber: AsyncMutex::new(HashMap::new()),
count: AtomicU32::new(0),
dead_ids: Mutex::new(Vec::new()),
}
}
pub async fn subscribe(&self) -> EventHandle<T> {
// 實作在下面
}
pub fn unsubscribe(&self, id: u32) {
self.dead_ids.lock().unwrap().push(id);
}
pub async fn poll(&self, id: u32) -> Option<T> {
// 實作在下面
}
pub async fn send(&self, event: T) {
// 實作在下面
}
}
所有函式都有&self參照而沒有可變參照。這是因為我們利用原子和互斥鎖的內部可變性,可變參照在互斥鎖內部,繞過了Rust一次只能有一個可變參照的規則。原子也不需要可變參照,因為我們可以執行原子操作。這意味著我們的事件匯流排結構可以被包裝在Arc中並多次克隆,以便跨多個執行緒傳送,使這些執行緒都能安全地在事件匯流排上執行多個可變操作。對於unsubscribe函式,我們只是將ID推播到dead_ids欄位。
消費者需要做的第一個操作是呼叫匯流排的subscribe函式,定義如下:
pub async fn subscribe(&self) -> EventHandle<T> {
let mut chamber = self.chamber.lock().await;
let id = self.count.fetch_add(1, Ordering::SeqCst);
chamber.insert(id, VecDeque::new());
EventHandle {
id,
event_bus: Arc::new(self),
}
}
在此程式碼中,我們回傳一個EventHandle結構,我們將在下一小節中定義該結構。我們將計數增加1,使用新計數作為ID,並在該ID下插入一個新佇列。然後我們回傳一個對self的參照,即包裝在Arc中的事件匯流排,連同ID一起放在handle結構中,以允許消費者與事件匯流排互動。
雖然將計數增加1並將其用作新ID是分配ID的簡單方法,但高吞吐長時間執行的系統最終可能會耗盡數字。如果這種風險是一個嚴重的考慮因素,可以增加另一個欄位來回收在從dead_ids欄位清除後的ID。分配新ID時可以從回收的ID中提取。然後只有在回收的ID中沒有ID時才會增加計數。
現在消費者已訂閱匯流排,它可以使用以下匯流排函式進行輪詢:
pub async fn poll(&self, id: u32) -> Option<T> {
let mut chamber = self.chamber.lock().await;
let queue = chamber.get_mut(&id).unwrap();
queue.pop_front()
}
在取得與ID相關的佇列時我們直接unwrap,因為我們將透過handle互動,而我們只能在訂閱匯流排時取得該handle。因此,我們確定ID肯定在chamber中。由於每個ID都有自己的佇列,每個訂閱者都可以按自己的時間消費所有發布的事件。
這個簡單的實作可以修改,使poll函式回傳整個佇列,用空佇列替換現有佇列。這種新方法減少了對匯流排的poll呼叫,因為消費者可以迴圈遍歷從對匯流排的poll函式呼叫中提取的佇列。由於我們將自己的結構作為事件,我們還可以建立一個timestamp特性,並宣告這對於放在匯流排上的事件是必需的。timestamp將使我們能夠在輪詢只回傳最近事件時丟棄已過期的事件。
現在我們已經定義了基本的poll函式,可以為匯流排構建send函式:
pub async fn send(&self, event: T) {
let mut chamber = self.chamber.lock().await;
for (_, value) in chamber.iter_mut() {
value.push_back(event.clone());
}
}
構建事件匯流排的Handle
我們的handle需要有一個ID和對匯流排的參照,以便handle可以輪詢匯流排。我們的handle定義如下:
pub struct EventHandle<'a, T: Clone + Send> {
pub id: u32,
event_bus: Arc<&'a EventBus<T>>,
}
impl <'a, T: Clone + Send> EventHandle<'a, T> {
pub async fn poll(&self) -> Option<T> {
self.event_bus.poll(self.id).await
}
}
透過生命週期標記,我們可以看到handle的生命週期不能超過匯流排的生命週期。我們必須注意Arc計數參照,只有在我們的非同步系統中沒有指向匯流排的Arc時才會丟棄匯流排。因此,我們可以保證匯流排將存活到系統中最後一個handle的生命週期結束,使我們的handle執行緒安全。
我們還需要處理丟棄handle的情況。如果handle從記憶體中移除,就無法存取與該handle的ID相關的佇列,因為handle儲存了ID。但是,事件仍會繼續傳送到該ID的佇列。如果開發人員使用我們的佇列,與在其程式碼中丟棄handle而不明確呼叫unsubscribe函式,他們將擁有一個事件匯流排,該匯流排將充滿多個沒有訂閱者的佇列。這種情況浪費記憶體,甚至可能增長到電腦耗盡記憶體的程度,這被稱為記憶體洩漏。
為防止記憶體洩漏,我們必須為handle實作Drop特性,當handle被丟棄時會從事件匯流排取消訂閱:
impl<'a, T: Clone + Send> Drop for EventHandle<'a, T> {
fn drop(&mut self) {
self.event_bus.unsubscribe(self.id);
}
}
現在我們的handle已經完成,可以安全地使用它從匯流排消費事件,而不會有記憶體洩漏的風險。我們將使用我們的handle來構建與事件匯流排互動的任務。
透過非同步任務與事件匯流排互動
在本章中,我們的觀察者一直在實作Future特性並比較主題的狀態與觀察者的狀態。現在我們直接將事件流式傳輸到我們的ID,我們可以透過使用非同步函式輕鬆實作消費者非同步任務:
async fn consume_event_bus(event_bus: Arc<EventBus<f32>>) {
let handle = event_bus.subscribe().await;
loop {
let event = handle.poll().await;
match event {
Some(event) => {
println!("id: {} value: {}", handle.id, event);
if event == 3.0 {
break;
}
},
None => {}
}
}
}
在我們的例子中,我們流式傳輸一個浮點數,如果傳送了3.0則跳出迴圈。這只是為了教育目的,但實作邏輯以影響HEAT_ON原子布林值將很簡單。如果我們不想讓迴圈積極地輪詢事件匯流排,我們還可以在None分支上實作Tokio非同步sleep函式。
事件建立的速率有時可能大於事件處理的速率。這會導致事件堆積積,稱為背壓(backpressure)。解決背壓的方法有很多,如緩衝、流控制、速率限制、批處理和負載平衡等。
我們還需要一個後台任務來清理死亡ID,在一定時間後批次清理。這個垃圾收集任務也可以透過非同步函式定義:
async fn garbage_collector(event_bus: Arc<EventBus<f32>>) {
loop {
let mut chamber = event_bus.chamber.lock().await;
let dead_ids = event_bus.dead_ids.lock().unwrap().clone();
event_bus.dead_ids.lock().unwrap().clear();
for id in dead_ids.iter() {
chamber.remove(id);
}
開發高效非同步系統:Tokio 的自定義與應用
Tokio 作為 Rust 生態系統中最廣泛使用的非同步執行時環境,提供了極大的靈活性和強大功能。在本文中,玄貓將探討如何根據特定需求自定義 Tokio 執行時環境,以便更精確地控制任務處理流程、最佳化執行效率,並實作優雅的系統關閉機制。
從基本使用到深度客製化
在前面的文章中,我們主要透過 #[tokio::main]
巨集來使用 Tokio,這提供了一種快速上手的方式。然而,真正發揮 Tokio 強大功能的關鍵在於瞭解如何自定義執行時環境。
當我在大型分散式系統中應用 Tokio 時,我發現預設設定並不總是最適合特定的業務需求。例如,在處理高流量 API 服務時,調整工作執行緒數量和阻塞任務限制對提升效能至關重要。
自定義 Tokio 執行時環境
首先,我們需要引入必要的依賴和結構:
use std::future::Future;
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use tokio::task::JoinHandle;
use std::sync::LazyLock;
接著,我們可以使用 LazyLock
來建立一個延遲初始化的執行時環境:
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(1)
.on_thread_start(|| {
println!("thread starting for runtime A");
})
.on_thread_stop(|| {
println!("thread stopping for runtime A");
})
.thread_keep_alive(Duration::from_secs(60))
.global_queue_interval(61)
.on_thread_park(|| {
println!("thread parking for runtime A");
})
.thread_name("our custom runtime A")
.thread_stack_size(3 * 1024 * 1024)
.enable_time()
.build()
.unwrap()
});
Tokio 提供了豐富的設定選項,讓我們能夠精確控制執行時環境的行為:
關鍵設定引數解析
worker_threads:處理非同步任務的執行緒數量。這個數值應根據系統的 CPU 核心數和工作負載特性進行調整。
max_blocking_threads:分配給阻塞任務的最大執行緒數。阻塞任務是那些執行時間較長與不會頻繁讓出控制權的任務,如 CPU 密集型計算或同步 I/O 操作。
thread_keep_alive:阻塞執行緒的超時間。超過該時間限制的阻塞任務將被取消。
global_queue_interval:排程器在處理新任務前的滴答數。這是一個關於公平性與效率的權衡選項。
thread_stack_size:每個工作執行緒的堆積積疊大小,用於儲存區域性變數和函式呼叫訊息。
enable_time:啟用 Tokio 的時間驅動功能,支援時間相關的非同步操作。
實用設定回呼函式
on_thread_start/stop:當工作執行緒啟動或停止時觸發的函式,適合用於自定義監控。
on_thread_park:當工作執行緒因無任務處理而暫停時觸發的函式,也可用於監控。
使用自定義執行時環境
建立了執行時環境後,我們需要一個函式來派生任務:
pub fn spawn_task<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
RUNTIME.spawn(future)
}
這個函式與我們之前實作的 spawn_task
函式類別似,只是回傳的是 Tokio 的 JoinHandle
而非自定義的 Task
。
現在,讓我們定義一個簡單的非同步函式來測試我們的執行時環境:
async fn sleep_example() -> i32 {
println!("sleeping for 2 seconds");
tokio::time::sleep(Duration::from_secs(2)).await;
println!("done sleeping");
20
}
然後在主函式中執行它:
fn main() {
let handle = spawn_task(sleep_example());
println!("spawned task");
println!("task status: {}", handle.is_finished());
std::thread::sleep(Duration::from_secs(3));
println!("task status: {}", handle.is_finished());
let result = RUNTIME.block_on(handle).unwrap();
println!("task result: {}", result);
}
執行這段程式碼,我們會看到類別似以下的輸出:
thread starting for runtime A
thread starting for runtime A
sleeping for 2 seconds
thread starting for runtime A
thread parking for runtime A
thread parking for runtime A
spawned task
thread parking for runtime A
task status: false
thread starting for runtime A
thread parking for runtime A
done sleeping
thread parking for runtime A
task status: true
task result: 20
從輸出中我們可以觀察到:
- 執行時環境開始建立工作執行緒
- 非同步任務在所有工作執行緒建立完成前就開始執行
- 閒置的工作執行緒會進入暫停狀態,節省系統資源
- Tokio 會積極地暫停未使用的執行緒,這對於擁有多個執行時環境但不同時使用的情況非常有利
建立多優先順序執行時環境
根據不同的任務優先順序需求,我們可以建立多個執行時環境:
static HIGH_PRIORITY: LazyLock<Runtime> = LazyLock::new(|| {
Builder::new_multi_thread()
.worker_threads(2)
.thread_name("High Priority Runtime")
.enable_time()
.build()
.unwrap()
});
static LOW_PRIORITY: LazyLock<Runtime> = LazyLock::new(|| {
Builder::new_multi_thread()
.worker_threads(1)
.thread_name("Low Priority Runtime")
.enable_time()
.build()
.unwrap()
});
這種多執行時環境的設計允許我們根據任務的重要性和資源需求進行更精細的控制。例如,我們可以將關鍵業務邏輯分配給高優先順序執行時環境,而將日誌記錄、指標收集等非關鍵任務分配給低優先順序執行時環境。
實戰經驗與最佳實踐
在我參與的金融交易系統專案中,我們透過細緻調整 Tokio 執行時環境,將系統吞吐量提高了近 40%。以下是一些關鍵經驗:
工作執行緒數量:不要盲目設定過多執行緒。在大多數 I/O 密集型應用中,執行緒數量設為 CPU 核心數的 1-2 倍通常是最佳選擇。
阻塞任務管理:嚴格控制
max_blocking_threads
,避免系統資源被阻塞任務耗盡。對於 CPU 密集型操作,考慮使用專用的執行時環境或執行緒池。監控回呼:利用
on_thread_start
、on_thread_stop
和on_thread_park
實作執行緒行為的監控,及早發現潛在問題。任務分配策略:根據任務特性(I/O 密集型 vs. CPU 密集型)和優先順序,將其分配到不同設定的執行時環境。
理解任務排程機制
Tokio 的任務排程機制是其效能的關鍵所在。global_queue_interval
引數允許我們調整排程器的行為,在公平性和效率之間取得平衡。
較低的滴答數意味著新任務會更快獲得關注,但也會增加檢查佇列的開銷。相反,較高的滴答數可能導致某些任務等待時間過長。Tokio 團隊建議單執行緒執行時環境使用 31,多執行緒執行時環境使用 61 作為預設值。
堆積積疊大小與記憶體最佳化
thread_stack_size
引數允許我們控制每個工作執行緒的堆積積疊大小。預設值通常是 2 MiB,但根據任務的複雜性,我們可能需要調整這個值。
在處理簡單與數量龐大的任務時,減小堆積積疊大小可以顯著節省記憶體使用。相反,對於遞迴深度大或區域性變數多的複雜任務,可能需要增加堆積積疊大小。
在玄貓處理的一個物聯網資料處理系統中,將堆積積疊大小從預設的 2 MiB 減少到 1 MiB,在保持系統穩定的同時,將記憶體使用量減少了約 20%。
Tokio 執行時環境的優雅關閉
在生產環境中,實作系統的優雅關閉至關重要。當收到關閉訊號(如 Ctrl-C 或 kill 訊號)時,我們希望允許正在進行的任務完成,而不是突然中斷它們。
Tokio 提供了內建機制來處理這種情況,我們將在本系列的後續文章中詳細探討這一主題。
自定義 Tokio 執行時環境為我們提供了精細控制任務處理方式的能力。透過調整工作執行緒數量、阻塞執行緒限制、堆積積疊大小等引數,我們可以最佳化系統效能,更好地滿足特定應用場景的需求。
在實際應用中,根據工作負載特性和系統資源情況進行細緻調整,可以顯著提升非同步系統的效能和資源利用率。重要的是,要透過實際測量和效能分析來驗證設定變更的效果,而不是依賴理論假設。
多年來,我發現 Tokio 的靈活性是其最大的優勢之一,它允許開發者根據特定需求進行深度客製化,無論是處理高併發網路服務還是資源受限的嵌入式系統,都能找到最佳設定方案。
Tokio 執行環境的精細控制與最佳化
在處理複雜的非同步應用時,對執行環境的精確控制往往能帶來顯著的效能提升。過去我接手過一個金融交易系統,其中的非同步邏輯混亂不堪,高優先順序的交易處理與低優先順序的報表生成任務共用一個執行環境,導致系統在高峰期表現不佳。透過客製化 Tokio 執行環境,我們不僅解決了這個問題,還提高了系統整體的回應速度。
本文將探討 Tokio 客製化的高階技巧,特別是如何透過多執行環境和本地執行緒池來精確控制非同步任務的執行方式。
理解 Tokio 執行時的執行緒分配
在先前章節中,我們探討了具有任務竊取功能的雙佇列執行環境。而在高低優先順序的 Tokio 執行環境設計中,主要差異在於高優先順序執行環境的執行緒不會從低優先順序執行環境中竊取任務。此外,高優先順序執行環境擁有兩個佇列。
不過,這些差異並不特別明顯,因為同一執行環境內的執行緒會互相竊取任務,所以只要我們不在意任務的確切處理順序,它實際上就像一個佇列。
需要注意的是,當沒有非同步任務需要處理時,執行緒會進入休眠狀態。如果執行緒數量超過 CPU 核心數,作業系統會管理資源分配和執行緒間的連貫的背景與環境切換。簡單增加超過核心數量的執行緒並不會帶來線性的速度提升。
然而,若我們為高優先順序執行環境設定三個執行緒,為低優先順序執行環境設定兩個執行緒,仍然可以有效分配資源。如果低優先順序執行環境沒有任務需要處理,那兩個執行緒會進入休眠狀態,而高優先順序執行環境的三個執行緒將獲得更多 CPU 資源。
定義好執行緒和執行環境後,我們需要以不同方式與這些執行緒互動。透過本地執行緒池(Local Pools),我們可以更精確地控制任務流程。
使用本地執行緒池處理任務
本地執行緒池讓我們能更精確地控制處理非同步任務的執行緒。在探索本地執行緒池之前,我們需要引入以下依賴:
tokio-util = { version = "0.7.10", features = ["full"] }
還需要這些引入:
use tokio_util::task::LocalPoolHandle;
use std::cell::RefCell;
使用本地執行緒池時,我們將生成的非同步任務繫結到特定的執行緒池。這意味著我們可以使用未實作 Send
特徵的結構體,因為我們確保任務留在特定執行緒上。然而,由於任務被限制在特定執行緒上,我們無法利用任務竊取機制,也就無法獲得標準 Tokio 執行環境開箱即用的效能。
要了解非同步任務如何透過本地執行緒池對映,首先需要定義一些本地執行緒資料:
thread_local! {
pub static COUNTER: RefCell<u32> = RefCell::new(1);
}
每個執行緒都將能夠存取其 COUNTER
變數。然後我們需要一個簡單的非同步任務,它會阻塞執行緒一秒,增加該執行緒的 COUNTER
值,然後輸出 COUNTER
和數字:
async fn something(number: u32) -> u32 {
std::thread::sleep(std::time::Duration::from_secs(3));
COUNTER.with(|counter| {
*counter.borrow_mut() += 1;
println!("Counter: {} for: {}", *counter.borrow(), number);
});
number
}
透過這個任務,我們將看到本地執行緒池的不同設定如何處理多個任務。
在主函式中,我們仍需要一個 Tokio 執行環境,因為我們需要等待生成的任務:
#[tokio::main(flavor = "current_thread")]
async fn main() {
let pool = LocalPoolHandle::new(1);
// ...
}
我們的 Tokio 執行環境使用 current_thread
風格。目前 Tokio 的風格有 CurrentThread
或 MultiThread
。MultiThread
選項在多個執行緒上執行任務,而 CurrentThread
則在當前執行緒上執行所有任務。另一個風格 MultiThreadAlt
也聲稱在多個執行緒上執行任務,但目前不穩定。因此,我們實作的執行環境將在當前執行緒上執行所有任務,而本地執行緒池中只有一個執行緒。
現在定義了執行緒池,就可以用它來生成任務:
let one = pool.spawn_pinned(|| async {
println!("one");
something(1).await
});
let two = pool.spawn_pinned(|| async {
println!("two");
something(2).await
});
let three = pool.spawn_pinned(|| async {
println!("three");
something(3).await
});
現在我們有了三個處理器,可以等待這些處理器並回傳這些任務的總和:
let result = async {
let one = one.await.unwrap();
let two = two.await.unwrap();
let three = three.await.unwrap();
one + two + three
};
println!("result: {}", result.await);
執行程式碼時,我們得到以下輸出:
one
Counter: 2 for: 1
two
Counter: 3 for: 2
three
Counter: 4 for: 3
result: 6
我們的任務按順序處理,最高的 COUNTER
值為 4,這表示所有任務都在一個執行緒中處理。現在,如果我們將本地執行緒池大小增加到 3,會得到以下輸出:
one
three
two
Counter: 2 for: 1
Counter: 2 for: 3
Counter: 2 for: 2
result: 6
三個任務在生成後立即開始處理。我們還可以看到每個任務的 COUNTER
值都是 2。這表示我們的三個任務分佈在三個執行緒上。
我們還可以專注於特定執行緒。例如,我們可以將任務生成到索引為 0 的執行緒上:
let one = pool.spawn_pinned_by_idx(|| async {
println!("one");
something(1).await
}, 0);
如果我們將所有任務都生成在索引為 0 的執行緒上,會得到這樣的輸出:
one
Counter: 2 for: 1
two
Counter: 3 for: 2
three
Counter: 4 for: 3
result: 6
雖然執行緒池中有三個執行緒,但我們的輸出與單執行緒池相同。如果我們將標準睡眠替換為 Tokio 睡眠,會得到以下輸出:
one
two
three
Counter: 2 for: 1
Counter: 3 for: 2
Counter: 4 for: 3
result: 6
由於 Tokio 睡眠是非同步的,我們的單個執行緒可以同時處理多個非同步任務,但 COUNTER
的存取發生在睡眠之後。我們可以看到 COUNTER
值為 4,這意味著雖然我們的執行緒同時處理了多個非同步任務,但我們的非同步任務從未跨越到另一個執行緒。
本地執行緒池的優勢
透過本地執行緒池,我們可以精細控制將任務傳送到哪裡處理。雖然我們犧牲了任務竊取功能,但在以下情況下使用本地執行緒池可能有優勢:
處理不可傳送的 Future
如果 Future 無法在執行緒之間傳送,我們可以使用本地執行緒池處理它們。這在處理某些不支援多執行緒的舊函式庫統時特別有用。
執行緒親和性
因為我們可以確保任務在特定執行緒上執行,所以可以利用該執行緒的狀態。一個簡單的例子是快取。如果我們需要計算或從伺服器等其他資源取得值,可以將其快取在特定執行緒中。該執行緒中的所有任務都可以存取該值,因此傳送到該特定執行緒的所有任務都不需要再次取得或計算該值。
執行緒本地操作的效能
我在一個資料分析專案中發現,雖然可以透過互斥鎖和原子參照計數器在執行緒間分享資料,但執行緒同步會帶來一些開銷。例如,取得其他執行緒也在取得的鎖並非免費。如圖所示,如果我們有一個標準的 Tokio 非同步執行環境,其中有四個工作執行緒,而我們的計數器是 Arc<Mutex<T>>
,則一次只有一個執行緒可以存取計數器。
其他三個執行緒將不得不等待存取 Arc<Mutex<T>>
。為每個執行緒保持計數器的狀態為本地狀態將消除該執行緒等待存取互斥鎖的需要,從而加速處理過程。然而,每個執行緒中的本地計數器並不包含完整的圖景。這些計數器不知道其他執行緒中其他計數器的狀態。
取得計數總狀態的一種方法是向每個執行緒傳送一個非同步任務來取得計數器,最後合併每個執行緒的結果。執行緒內資料的本地存取也可以幫助最佳化 CPU 繫結任務,尤其是涉及 CPU 快取資料時。
安全存取不可傳送的資源
有時資料資源不是執行緒安全的。將該資源保持在一個執行緒中,並將任務傳送到該執行緒進行處理,是繞過這個問題的一種方式。
值得強調的是,我在整本章中都強調了阻塞任務對執行緒的潛在阻塞。然而,必須強調的是,阻塞對我們本地執行緒池的傷害可能更為明顯,因為我們沒有任務竊取功能。使用 Tokio 的 spawn_blocking
函式可以防止這種情況。
使用不安全程式碼處理執行緒資料
到目前為止,我們一直透過使用 RefCell
在非同步任務中存取執行緒的狀態。它使我們能夠透過 Rust 在執行時檢查借用規則來存取資料。然而,這種檢查在借用 RefCell
中的資料時會帶來一些開銷。
我們可以移除這些檢查,同時使用不安全程式碼安全地存取資料。這意味著我們可以直接存取執行緒資料,無需任何檢查。然而,使用 UnsafeCell
是否危險?潛在的是,所以我們必須小心確保安全。
考慮我們的系統,我們有一個單執行緒處理不會轉移到其他執行緒的非同步任務。我們必須記住,雖然這個單執行緒可以透過輪詢同時處理多個非同步任務,但它一次只能主動處理一個非同步任務。因此,我們可以假設,當我們的一個非同步任務正在存取 UnsafeCell
中的資料並處理它時,沒有其他非同步任務正在存取資料,因為 UnsafeCell
不是非同步的。
然而,我們需要確保在參照資料的範圍內不使用 await
。如果這樣做,我們的執行緒可能會在現有任務仍然參照資料的情況下切換到另一個任務的連貫的背景與環境。
我們可以透過在不安全程式碼中向數千個非同步任務公開一個
Tokio與執行緒:深入理解自定義執行環境
在現代非同步程式設計中,理解執行緒管理和訊號處理是建立穩健系統的關鍵。本文探討Tokio的執行緒隔離、執行緒本地狀態以及優雅關閉機制,這些技術對於構建高效能、可靠的非同步Rust應用至關重要。
執行緒隔離與本地狀態管理
在非同步程式設計中,我們經常需要處理執行緒特定的狀態。Tokio提供了強大的工具來管理這種情況,特別是透過本地執行緒池(local thread pools)。
以下程式碼展示瞭如何在隔離的執行緒中執行任務,並安全地操作執行緒本地狀態:
futures.push(pool.spawn_pinned(move || async move {
something(number).await;
something(number).await
}));
這種方法允許我們將任務固定到特定執行緒,避免連貫的背景與環境切換的開銷。即使在高並發情境下,這也能確保狀態一致性。當我們完成所有非同步任務後,可以收集結果:
for i in futures {
let _ = i.await.unwrap();
}
let _ = pool.spawn_pinned(|| async {
print_statement().await
}).await.unwrap();
執行結果會非常穩定:
Counter: {2: 200000, 4: 200000, 1: 200000, 3: 200000, 5: 200000}
玄貓在實作多執行緒資料分析系統時發現,這種模式特別適合處理需要維持執行緒本地狀態的情境。不需要原子操作、鎖等待或可變參照檢查,讓程式碼更加簡潔高效。
優雅關閉機制的實作
系統穩定性的另一個關鍵導向是優雅關閉(graceful shutdown)。當程式需要終止時,我們通常希望執行一系列清理操作,如儲存狀態、完成交易或傳送訊號給其他程式。
處理Ctrl-C訊號
最基本的關閉訊號是Ctrl-C。Tokio允許我們捕捉並自定義對這個訊號的回應:
async fn cleanup() {
println!("cleanup background task started");
let mut count = 0;
loop {
tokio::signal::ctrl_c().await.unwrap();
println!("ctrl-c received!");
count += 1;
if count > 2 {
std::process::exit(0);
}
}
}
配合主函式:
#[tokio::main]
async fn main() {
tokio::spawn(cleanup());
loop {
}
}
這樣設計後,程式只有在接收到三次Ctrl-C訊號後才會結束,而不是立即終止。
然而,值得注意的是,如果在等待Ctrl-C訊號前有阻塞操作,程式可能無法按預期處理訊號:
loop {
std::thread::sleep(std::time::Duration::from_secs(5));
tokio::signal::ctrl_c().await.unwrap();
// ...
}
在這種情況下,如果在5秒的睡眠期間按下Ctrl-C,程式會直接結束。解決方案是在獨立執行緒中執行非同步執行時,而主執行緒專注於訊號處理:
#[tokio::main(flavor = "current_thread")]
async fn main() {
std::thread::spawn(|| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
println!("Hello, world!");
});
});
let mut count = 0;
loop {
tokio::signal::ctrl_c().await.unwrap();
println!("ctrl-c received!");
count += 1;
if count > 2 {
std::process::exit(0);
}
}
}
狀態提取與關閉處理
在實際應用中,優雅關閉通常需要提取和儲存系統狀態。以下是從多個隔離執行緒中提取狀態的函式:
fn extract_data_from_thread() -> HashMap<u32, u32> {
let mut extracted_counter: HashMap<u32, u32> = HashMap::new();
COUNTER.with(|counter| {
let counter = unsafe { &mut *counter.get() };
extracted_counter = counter.clone();
});
return extracted_counter
}
接著,我們可以建立一個函式來收集所有執行緒的狀態:
async fn get_complete_count() -> HashMap<u32, u32> {
let mut complete_counter = HashMap::new();
let mut extracted_counters = Vec::new();
for i in 0..4 {
extracted_counters.push(RUNTIME.spawn_pinned_by_idx(||
async move {
extract_data_from_thread()
}, i));
}
for counter_future in extracted_counters {
let extracted_counter = counter_future.await
.unwrap_or_default();
for (key, count) in extracted_counter {
*complete_counter.entry(key).or_insert(0) += count;
}
}
return complete_counter
}
這個過程允許我們在系統關閉前取得完整的狀態,如下圖所示:
處理其他訊號
除了Ctrl-C,我們也可以處理其他系統訊號,如SIGHUP:
use tokio::signal::unix::{signal, SignalKind};
// 在main函式中
let pid = std::process::id();
println!("The PID of this process is: {}", pid);
let mut stream = signal(SignalKind::hangup()).unwrap();
stream.recv().await;
let complete_counter = get_complete_count().await;
println!("Complete counter: {:?}", complete_counter);
這允許我們透過命令列傳送訊號來觸發關閉:
kill -SIGHUP <pid>
Actor模型基礎
在非同步程式設計中,Actor模型提供了一種強大的抽象來處理並發和狀態隔離。Actor是隔離的程式碼單元,只能透過訊息傳遞進行通訊,並可以維護自己的內部狀態。
最基本的actor是一個無限迴圈的非同步函式,監聽訊息通道:
use tokio::sync::{
mpsc::channel,
mpsc::{Receiver, Sender},
oneshot
};
struct Message {
value: i64
}
async fn basic_actor(mut rx: Receiver<Message>) {
let mut state = 0;
while let Some(msg) = rx.recv().await {
state += msg.value;
println!("Received: {}", msg.value);
println!("State: {}", state);
}
}
我們可以透過以下方式測試這個actor:
#[tokio::main]
async fn main() {
let (tx, rx) = channel::<Message>(100);
let _actor_handle = tokio::spawn(
basic_actor(rx)
);
for i in 0..10 {
let msg = Message { value: i };
tx.send(msg).await.unwrap();
}
}
若要實作請求-回應模式,可以在訊息中包含一個oneshot通道的傳送端:
struct RespMessage {
value: i32,
responder: oneshot::Sender<i64>
}
async fn resp_actor(mut rx: Receiver<RespMessage>) {
let mut state = 0;
while let Some(msg) = rx.recv().await {
state += msg.value;
if msg.responder.send(state).is_err() {
eprintln!("Failed to send response");
}
}
}
在我設計的一個分散式監控系統中,這種模式證明非常有效,特別是當需要從多個worker節點收集狀態並彙總時。Actor模型的隔離性質使得系統更加模組化,易於測試和擴充套件。
Tokio提供了豐富的工具來自定義非同步執行環境,從執行緒管理到訊號處理,再到Actor模型的實作。透過執行緒隔離和本地狀態管理,我們可以構建高效的並發系統;透過優雅關閉機制,我們可以確保系統在終止時能夠正確處理資源;而Actor模型則提供了一種模組化的方法來組織非同步程式碼。
這些技術結合起來,為構建強大的非同步Rust應用提供了堅實的基礎。無論是需要高效能的資料處理系統,還是需要穩定可靠的長時間執行服務,這些知識都將幫助你設計出更好的解決方案。
Rust中的Actor模式:理解Actor模型與Rust實作
在現代並發程式設計中,Actor模型提供了一種強大的抽象方式,能夠簡化複雜系統的設計。作為一位經常處理高併發系統的開發者,玄貓發現Rust的非同步特性與Actor模型結合得尤為出色。這篇文章將探討如何在Rust中實作Actor模型,並比較其與傳統互斥鎖方法的效能差異。
Actor模型的核心概念
Actor模型的核心理念在於透過訊息傳遞而非分享狀態來進行通訊。每個Actor都是一個獨立的計算單元,具有以下特性:
- 維護自己的私有狀態
- 接收並處理訊息
- 可以建立其他Actor
- 可以向其他Actor傳送訊息
在Rust中,我們可以利用tokio
提供的非同步通道來實作這種模式。讓我們先看一個基本的Actor實作:
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
struct RespMessage {
value: i64,
responder: oneshot::Sender<i64>,
}
async fn resp_actor(mut rx: Receiver<RespMessage>) {
let mut state = 0;
while let Some(msg) = rx.recv().await {
state += msg.value;
let _ = msg.responder.send(state);
}
}
這段程式碼中,我們定義了一個簡單的Actor,它維護一個內部狀態,並透過接收訊息來更新這個狀態。每次接收到訊息時,Actor都會將訊息中的值加到狀態上,然後透過oneshot
通道回傳更新後的狀態值。
使用Actor處理訊息與回應
現在,讓我們看如何使用這個Actor:
let (tx, rx) = channel::<RespMessage>(100);
let _resp_actor_handle = tokio::spawn(async {
resp_actor(rx).await;
});
for i in 0..10 {
let (resp_tx, resp_rx) = oneshot::channel::<i64>();
let msg = RespMessage {
value: i,
responder: resp_tx
};
tx.send(msg).await.unwrap();
println!("Response: {}", resp_rx.await.unwrap());
}
在這個例子中,我們使用oneshot
通道是因為我們只需要回應一次訊息,然後客戶端程式碼就可以繼續執行其他任務。對於我們的使用場景來說,這是最佳選擇,因為oneshot
通道在記憶體使用和同步方面針對僅傳送一條訊息然後關閉的情況進行了最佳化。
從這個例子可以看出,我們可以透過通道向Actor傳送結構體,這使得功能可以變得更加複雜。例如,我們可以傳送一個封裝了多種訊息類別的列舉,根據傳送的訊息類別指示Actor執行不同的操作。Actor還可以建立新的Actor或向其他Actor傳送訊息。
Actor與互斥鎖的效能比較
在上面的例子中,我們也可以使用互斥鎖來實作類別似的功能。那麼,這兩種方法在效能上有何差異?讓我們比較一下。
首先,我們需要額外的引入:
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc::error::TryRecvError;
接著,我們可以用互斥鎖實作與Actor相同的功能:
async fn actor_replacement(state: Arc<Mutex<i64>>, value: i64) -> i64 {
let mut state = state.lock().await;
*state += value;
return *state
}
雖然這段程式碼看起來簡單,但在效能上如何表現呢?我們可以設計一個簡單的測試:
let state = Arc::new(Mutex::new(0));
let mut handles = Vec::new();
let now = tokio::time::Instant::now();
for i in 0..100000000 {
let state_ref = state.clone();
let future = async move {
let handle = tokio::spawn(async move {
actor_replacement(state_ref, i).await
});
let _ = handle.await.unwrap();
};
handles.push(tokio::spawn(future));
}
for handle in handles {
let _ = handle.await.unwrap();
}
println!("Elapsed: {:?}", now.elapsed());
在這個測試中,我們同時啟動大量任務嘗試存取互斥鎖,然後等待它們完成。在M2 MacBook上以--release
模式執行時,所有互斥鎖任務完成的時間約為155秒。
而使用Actor模型的相同測試:
let (tx, rx) = channel::<RespMessage>(100000000);
let _resp_actor_handle = tokio::spawn(async {
resp_actor(rx).await;
});
let mut handles = Vec::new();
let now = tokio::time::Instant::now();
for i in 0..100000000 {
let tx_ref = tx.clone();
let future = async move {
let (resp_tx, resp_rx) = oneshot::channel::<i64>();
let msg = RespMessage {
value: i,
responder: resp_tx
};
tx_ref.send(msg).await.unwrap();
let _ = resp_rx.await.unwrap();
};
handles.push(tokio::spawn(future));
}
for handle in handles {
let _ = handle.await.unwrap();
}
println!("Elapsed: {:?}", now.elapsed());
在相同條件下,Actor模型的測試只需要103秒,比互斥鎖快了52秒!
為何Actor模型更高效?
這種效能差異的主要原因在於取得互斥鎖的開銷。當將訊息放入通道時,我們只需檢查通道是否已滿或已關閉。而取得互斥鎖時,檢查更為複雜,通常涉及檢查鎖是否被其他任務持有。如果是,嘗試取得鎖的任務需要註冊興趣並等待被通知。
一般來說,在並發環境中,透過通道傳遞訊息比互斥鎖擴充套件性更好,因為傳送者不必等待其他任務完成操作。他們可能需要等待將訊息放入通道的佇列,但等待訊息進入佇列比等待操作完成、釋放鎖,然後等待任務取得鎖要快得多。因此,通道可以帶來更高的吞吐量。
更複雜的例子會進一步突顯這種差異。假設我們有一個比簡單增加值更複雜的交易,需要在提交最終結果到狀態並回傳數字之前進行一些檢查和計算。作為高效的工程師,我們可能希望在這個過程進行的同時做其他事情。
使用Actor程式碼,我們已經具備這種靈活性:
let future = async move {
let (resp_tx, resp_rx) = oneshot::channel::<i32>();
let msg = RespMessage {
value: i,
responder: resp_tx
};
tx_ref.send(msg).await.unwrap();
// 在這裡做其他事情
let _ = resp_rx.await.unwrap();
};
然而,互斥鎖實作只會將控制權交還給排程器。如果我們想在等待複雜交易完成的同時推進互斥鎖任務,我們必須生成另一個非同步任務:
async fn actor_replacement(state: Arc<Mutex<i32>>, value: i32) -> i32 {
let update_handle = tokio::spawn(async move {
let mut state = state.lock().await;
*state += value;
return *state
});
// 在這裡做其他事情
update_handle.await.unwrap()
}
但是,生成額外非同步任務的開銷會將我們測試中的經過時間增加到174秒,比Actor多了73秒!這並不令人驚訝,因為我們只是為了讓我們可以在任務的後期等待交易結果,就向執行時傳送一個非同步任務並取得一個處理控制程式碼。
實作Router模式
既然我們已經瞭解了Actor的工作原理,現在讓我們看如何使用Router模式將Actor整合到系統中。Router是一個接收訊息的Actor,這些訊息可以包裝在列舉中,幫助Router找到正確的Actor。
在這個例子中,我們將實作一個基本的鍵值儲存系統。需要強調的是,雖然我們在Rust中構建了這個鍵值儲存,但這個教育性質的例子不應該用於生產環境。像RocksDB和Redis這樣的成熟解決方案投入了大量工作和專業知識,使它們的鍵值儲存變得強大與可擴充套件。
對於我們的鍵值儲存,我們需要設定、取得和刪除鍵。首先,我們需要定義訊息結構:
use tokio::sync::{
mpsc::channel,
mpsc::{Receiver, Sender},
oneshot,
};
use std::sync::OnceLock;
struct SetKeyValueMessage {
key: String,
value: Vec<u8>,
response: oneshot::Sender<()>,
}
struct GetKeyValueMessage {
key: String,
response: oneshot::Sender<Option<Vec<u8>>>,
}
struct DeleteKeyValueMessage {
key: String,
response: oneshot::Sender<()>,
}
enum KeyValueMessage {
Get(GetKeyValueMessage),
Delete(DeleteKeyValueMessage),
Set(SetKeyValueMessage),
}
enum RoutingMessage {
KeyValue(KeyValueMessage),
}
現在我們有了一個可以路由到鍵值Actor的訊息,這個訊息使用正確的操作和執行操作所需的資料。對於我們的鍵值Actor,我們接受KeyValueMessage,比對變體,並執行操作:
async fn key_value_actor(mut receiver: Receiver<KeyValueMessage>) {
let mut map = std::collections::HashMap::new();
while let Some(message) = receiver.recv().await {
match message {
KeyValueMessage::Get(
GetKeyValueMessage { key, response }
) => {
let _ = response.send(map.get(&key).cloned());
}
KeyValueMessage::Delete(
DeleteKeyValueMessage { key, response }
) => {
map.remove(&key);
let _ = response.send(());
}
KeyValueMessage::Set(
SetKeyValueMessage { key, value, response }
) => {
map.insert(key, value);
let _ = response.send(());
}
}
}
}
有了鍵值訊息的處理,我們需要將鍵值Actor連線到Router Actor:
async fn router(mut receiver: Receiver<RoutingMessage>) {
let (key_value_sender, key_value_receiver) = channel(32);
tokio::spawn(key_value_actor(key_value_receiver));
while let Some(message) = receiver.recv().await {
match message {
RoutingMessage::KeyValue(message) => {
let _ = key_value_sender.send(message).await;
}
}
}
}
我們在Router Actor中建立了鍵值Actor。Actor可以建立其他Actor,在Router Actor中建立鍵值Actor可以確保系統設定不會出錯,並減少Actor系統在程式中的設定佔用空間。Router是我們的介面,所以所有內容都將透過Router到達其他Actor。
現在Router已經定義好了,我們必須關注Router的通道。所有傳送到Actor系統的訊息都將透過該通道。我們選擇了數字32,這意味著通道一次最多可以容納32條訊息,這個緩衝區大小給了我們一些靈活性。
為了避免開發者需要追蹤通道傳送者的參照,我們將傳送者定義為全域靜態變數:
static ROUTER_SENDER: OnceLock<Sender<RoutingMessage>> = OnceLock::new();
當我們建立Router的主通道時,我們將設定傳送者。你可能會想知道在Router Actor函式內部構建主通道並設定ROUTER_SENDER
是否更符合人體工學。然而,如果函式在通道設定之前嘗試向主通道傳送訊息,可能會出現一些並發問題。請記住,非同步執行時可以跨越多個執行緒,所以一個非同步任務可能會在Router Actor嘗試設定通道的同時嘗試呼叫通道。因此,最好在主函式的開始就設定通道,然後再生成任何東西。這樣,即使Router Actor不是在非同步執行時上第一個被輪詢的任務,它仍然可以存取在被輪詢之前傳送到通道的訊息。
注意靜態全域變數
我們使用全域變數(ROUTER_SENDER
)和OnceLock
來簡化範例,避免用額外的設定程式碼使章節變得雜亂。雖然這種方法使程式碼簡單明瞭,但重要的是要注意在非同步Rust程式碼中使用全域狀態的潛在缺點:
- 脆弱性:全域狀態可能導致難以跟蹤的錯誤,特別是在較大或更
Actor模型在Rust中的實作與應用
在現代分散式系統開發中,Actor模型提供了一種優雅的方式來處理並發和狀態管理。本文將探討如何在Rust中使用Tokio實作Actor模型,並建立一個具有錯誤還原能力的鍵值儲存系統。
Actor模型的基本概念與Rust實作
Actor模型將系統分解為獨立的計算單元(Actor),每個Actor維護自己的狀態,並透過訊息傳遞與其他Actor通訊。在Rust中,我們可以利用通道(channel)來實作這種訊息傳遞機制。
當我在設計大型分散式系統時,發現Actor模型特別適合處理需要狀態隔離的場景。以下是一個基本實作:
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let (sender, receiver) = channel(32);
tokio::spawn(router(receiver, sender.clone()));
// ...其他程式碼
}
在這個例子中,我們建立了一個容量為32的通道,並將傳送端複製後傳遞給路由器Actor。這種設計允許多個元件分享同一個傳送通道,同時保持訊息路由的集中管理。
構建鍵值儲存Actor系統
為了展示Actor模型的實用性,玄貓設計了一個鍵值儲存系統,由三個主要元件組成:路由器、鍵值儲存Actor和寫入備份Actor。
路由器與訊息定義
首先,我們需要定義訊息結構來表達不同操作:
pub async fn set(key: String, value: Vec<u8>) -> Result<(), std::io::Error> {
let (tx, rx) = oneshot::channel();
ROUTER_SENDER.get().unwrap().send(
RoutingMessage::KeyValue(KeyValueMessage::Set(
SetKeyValueMessage {
key,
value,
response: tx,
}))).await.unwrap();
rx.await.unwrap();
Ok(())
}
這個set
函式展示了我們如何傳送訊息到路由器。每個訊息都包含操作類別(Set)、資料(鍵值對)以及一個用於接收回應的一次性通道。
類別似地,我們也可以實作get
和delete
函式:
pub async fn get(key: String) -> Result<Option<Vec<u8>>, std::io::Error> {
let (tx, rx) = oneshot::channel();
ROUTER_SENDER.get().unwrap().send(
RoutingMessage::KeyValue(KeyValueMessage::Get(
GetKeyValueMessage {
key,
response: tx,
}))).await.unwrap();
Ok(rx.await.unwrap())
}
pub async fn delete(key: String) -> Result<(), std::io::Error> {
let (tx, rx) = oneshot::channel();
ROUTER_SENDER.get().unwrap().send(
RoutingMessage::KeyValue(KeyValueMessage::Delete(
DeleteKeyValueMessage {
key,
response: tx,
}))).await.unwrap();
rx.await.unwrap();
Ok(())
}
測試基本功能
使用這些功能,我們可以編寫一個簡單的主函式來測試我們的系統:
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let (sender, receiver) = channel(32);
ROUTER_SENDER.set(sender).unwrap();
tokio::spawn(router(receiver));
let _ = set("hello".to_owned(), b"world".to_vec()).await?;
let value = get("hello".to_owned()).await?;
println!("value: {:?}", String::from_utf8(value.unwrap()));
let _ = delete("hello".to_owned()).await?;
let value = get("hello".to_owned()).await?;
println!("value: {:?}", value);
Ok(())
}
執行這段程式碼會產生以下輸出:
value: Ok("world")
value: None
這表明我們的鍵值儲存系統能夠正確地設定、取得和刪除值。然而,這個系統有一個重大缺陷:如果系統關閉或當機,所有資料都會丟失。
為Actor實作狀態還原
為瞭解決資料持久化問題,我需要增加一個寫入Actor,負責將資料儲存到檔案中。當系統重新啟動時,它可以從檔案中還原狀態。
系統流程設計
新系統的工作流程如下:
- 客戶端程式碼呼叫Actor系統
- 路由器將訊息傳送到鍵值儲存Actor
- 鍵值儲存Actor複製操作並傳送到寫入Actor
- 寫入Actor執行操作並將整個對映寫入資料檔案
- 鍵值儲存Actor在自己的對映上執行操作並回傳結果
系統啟動時的流程:
- 路由器Actor啟動,建立鍵值儲存Actor
- 鍵值儲存Actor建立寫入Actor
- 寫入Actor從檔案讀取資料,填充自己的狀態,並將資料傳送到鍵值儲存Actor
寫入Actor實作
首先,我們需要定義寫入Actor的訊息類別:
enum WriterLogMessage {
Set(String, Vec<u8>),
Delete(String),
Get(oneshot::Sender<HashMap<String, Vec<u8>>>),
}
impl WriterLogMessage {
fn from_key_value_message(message: &KeyValueMessage)
-> Option<WriterLogMessage> {
match message {
KeyValueMessage::Get(_) => None,
KeyValueMessage::Delete(message) => Some(
WriterLogMessage::Delete(message.key.clone())
),
KeyValueMessage::Set(message) => Some(
WriterLogMessage::Set(
message.key.clone(),
message.value.clone()
)
),
}
}
}
接著,我們需要實作從檔案載入資料的功能:
async fn read_data_from_file(file_path: &str)
-> io::Result<HashMap<String, Vec<u8>>> {
let mut file = File::open(file_path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
let data: HashMap<String, Vec<u8>> = serde_json::from_str(
&contents
)?;
Ok(data)
}
async fn load_map(file_path: &str) -> HashMap<String, Vec<u8>> {
match read_data_from_file(file_path).await {
Ok(data) => {
println!("Data loaded from file: {:?}", data);
return data
},
Err(e) => {
println!("Failed to read from file: {:?}", e);
println!("Starting with an empty hashmap.");
return HashMap::new()
}
}
}
我特意設計了錯誤處理機制,確保即使檔案不存在或損壞,系統也能以空對映啟動。這在生產環境中非常重要,因為我們不希望因為資料檔案問題導致整個系統無法啟動。
現在,我們可以實作寫入Actor:
async fn writer_actor(mut receiver: Receiver<WriterLogMessage>)
-> io::Result<()> {
let mut map = load_map("./data.json").await;
let mut file = File::create("./data.json").await.unwrap();
while let Some(message) = receiver.recv().await {
match message {
WriterLogMessage::Set(key, value) => {
map.insert(key, value);
},
WriterLogMessage::Delete(key) => {
map.remove(&key);
},
WriterLogMessage::Get(response) => {
let _ = response.send(map.clone());
}
}
let contents = serde_json::to_string(&map).unwrap();
file.set_len(0).await?;
file.seek(std::io::SeekFrom::Start(0)).await?;
file.write_all(contents.as_bytes()).await?;
file.flush().await?;
}
Ok(())
}
需要注意的是,這裡我們採用了一種簡單但非高效的方式:每次操作後都重寫整個檔案。在實際生產環境中,你可能需要更複雜的事務日誌機制來提高效能。
更新鍵值儲存Actor
我們需要修改鍵值儲存Actor,讓它在啟動時建立寫入Actor並從中取得初始狀態:
// 建立寫入Actor
let (writer_key_value_sender, writer_key_value_receiver) = channel(32);
tokio::spawn(writer_actor(writer_key_value_receiver));
// 從寫入Actor取得狀態
let (get_sender, get_receiver) = oneshot::channel();
let _ = writer_key_value_sender.send(WriterLogMessage::Get(
get_sender
)).await;
let mut map = get_receiver.await.unwrap();
然後,在處理訊息時,我們需要將操作同步到寫入Actor:
while let Some(message) = receiver.recv().await {
if let Some(
write_message
) = WriterLogMessage::from_key_value_message(
&message) {
let _ = writer_key_value_sender.send(
write_message
).await;
}
// 處理訊息...
}
實作Actor監督機制
隨著系統複雜度增加,我們需要監控Actor的健康狀態。如果某個Actor當機,系統應該能夠檢測到並重新建立它。
監督Actor設計
我們將增加一個監督Actor,負責跟蹤系統中的所有Actor。它透過定期接收Actor的心跳訊息來判斷Actor是否存活,如果某個Actor沒有傳送心跳,監督Actor會傳送重置命令。
首先,我們需要擴充套件路由訊息類別:
enum RoutingMessage {
KeyValue(KeyValueMessage),
Heartbeat(ActorType),
Reset(ActorType),
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
enum ActorType {
KeyValue,
Writer
}
然後,我們需要修改鍵值儲存Actor,使其儲存寫入Actor的控制程式碼:
let (writer_key_value_sender, writer_key_value_receiver) = channel(32);
let _writer_handle = tokio::spawn(
writer_actor(writer_key_value_receiver)
);
實作心跳機制
為了實作心跳機制,我們需要讓Actor定期向監督Actor傳送心跳訊息。同時,監督Actor需要檢查每個Actor的最後心跳時間,如果超過閾值,就傳送重置命令。
這種設計有幾個優點:
- 系統可以自動檢測和還原失敗的Actor
- 監督機制與業務邏輯分離,符合關注點分離原則
- 透過路由器傳送訊息,避免了複雜的參照管理
在我的實際專案經驗中,這種監督模式非常有效,特別是在長時間執行的服務中。它確保系統能夠在部分元件失敗時繼續執行,提高了整體可靠性。
設計考量與最佳實踐
在實作Actor系統時,有幾個重要的設計考量需要注意:
訊息設計:訊息應該是自描述的,包含足夠的訊息讓接收者知道如何處理。
錯誤處理:每個Actor應該能夠優雅地處理錯誤,並且不應該因為一個錯誤而當機整個系統。
狀態隔離:Actor的核心優勢之一是狀態隔離,確保每個Actor只管理自己的狀態。
資源管理:適當地管理資源,例如寫入Actor對檔案的獨佔存取可以避免並發問題。
監督策略:根據系統需求設計合適的監督策略,例如重啟失敗的Actor或者通知管理員。
Actor模型雖然增加了一些複雜性,但它提供了一種結構化的方式來處理並發和狀態管理,使系統更加健壯和可維護。
在Rust中實作Actor模型特別有優勢,因為Rust的所有權系統和async/await語法使得編寫安全的並發程式碼變得更加容易。透過Tokio的任務和通道,我們可以構建高效與類別安全的Actor系統。
透過本文介紹的技術,你可以構建一個具有狀態還原和監督能力的Actor系統,為你的分散式應用提供堅實的基礎。無論是處理高併發的Web服務還是複雜的資料處理管道,Actor模型都能提供一個清晰與可擴充套件的架構。
Rust中的Actor模型與設計模式:從容錯系統到優雅架構
探討Actor模型的容錯機制
在現代分散式系統設計中,Actor模型提供了一種優雅的方式來處理並發和錯誤還原。透過觀察範例中的KeyValue和Writer Actor,我們可以看到一個完整的容錯系統實作。
心跳機制的實作
Actor系統的心臟是其監督機制。範例中實作的心跳系統非常值得深入理解:
let timeout_duration = Duration::from_millis(200);
let router_sender = ROUTER_SENDER.get().unwrap().clone();
loop {
match time::timeout(timeout_duration, receiver.recv()).await {
Ok(Some(message)) => {
// 處理訊息...
},
Ok(None) => break,
Err(_) => {
router_sender.send(
RoutingMessage::Heartbeat(ActorType::KeyValue)
).await.unwrap();
}
};
}
這段程式碼優雅地展示了Actor的自我報告機制。當200毫秒內沒有收到訊息時,Actor會主動傳送心跳訊息給路由器,確保監督系統知道此Actor仍在運作。這是一個關鍵設計,因為在分散式系統中,區分「沒有工作要做」和「Actor已經死亡」是至關重要的。
Writer Actor的實作
相似的心跳邏輯也應用在Writer Actor上:
let timeout_duration = Duration::from_millis(200);
let router_sender = ROUTER_SENDER.get().unwrap().clone();
loop {
match time::timeout(timeout_duration, receiver.recv()).await {
Ok(Some(message)) => {
// 處理寫入訊息...
let contents = serde_json::to_string(&map).unwrap();
file.set_len(0).await?;
file.seek(std::io::SeekFrom::Start(0)).await?;
file.write_all(contents.as_bytes()).await?;
file.flush().await?;
},
Ok(None) => break,
Err(_) => {
router_sender.send(
RoutingMessage::Heartbeat(ActorType::Writer)
).await.unwrap();
}
};
}
這裡的Writer Actor負責將資料持久化到檔案系統,同時也維持著它的心跳訊息。在玄貓的經驗中,這種檔案IO操作特別容易出問題,因此心跳機制在此特別重要。
監督Actor的實作
監督Actor是整個容錯系統的核心:
async fn heartbeat_actor(mut receiver: Receiver<ActorType>) {
let mut map = HashMap::new();
let timeout_duration = Duration::from_millis(200);
loop {
match time::timeout(timeout_duration, receiver.recv()).await {
Ok(Some(actor_name)) => map.insert(
actor_name, Instant::now()
),
Ok(None) => break,
Err(_) => {
continue;
}
};
let half_second_ago = Instant::now() -
Duration::from_millis(500);
for (key, &value) in map.iter() {
// 檢查Actor是否還活著...
}
}
}
監督Actor維護一個HashMap,記錄每個Actor的最後心跳時間。如果某個Actor超過半秒沒有傳送心跳,監督Actor就會啟動重置流程。
Actor重置機制的深層解析
當監督Actor檢測到某個Actor失效時,會觸發重置流程:
if value < half_second_ago {
match key {
ActorType::KeyValue | ActorType::Writer => {
ROUTER_SENDER.get().unwrap().send(
RoutingMessage::Reset(ActorType::KeyValue)
).await.unwrap();
map.remove(&ActorType::KeyValue);
map.remove(&ActorType::Writer);
break;
}
}
}
這裡有一個精妙的設計:即使是Writer Actor失效,系統也會重置KeyValue Actor,因為KeyValue Actor會自動重啟Writer Actor。這種層級化的重置策略減少了重複重啟的可能性。
路由Actor的重置處理也非常優雅:
match message {
ActorType::KeyValue | ActorType::Writer => {
let (new_key_value_sender, new_key_value_receiver) = channel(32);
key_value_handle.abort();
key_value_sender = new_key_value_sender;
key_value_receiver = new_key_value_receiver;
key_value_handle = tokio::spawn(
key_value_actor(key_value_receiver)
);
time::sleep(Duration::from_millis(100)).await;
},
}
這段程式碼建立新的通道,中止現有的Actor任務,重新分配傳送者和接收者,然後產生新的Actor。重要的是,它還包含了一個短暫的休眠以確保新任務已經在非同步執行時啟動並執行。
設計模式在非同步系統中的應用
轉向第9章的內容,我們需要理解如何將非同步程式設計的設計模式應用於實際問題。
隔離模組的建構
在現有的同步程式碼函式庫合非同步功能時,保持互動的「爆炸半徑」較小是明智的。大規模重寫很少能按時完成,而與風險較高。相反,我們應該採用漸進式的方法。
在我的實踐中,我發現最有效的策略是先建立一個隔離的非同步模組,然後透過明確定義的介面與現有系統整合。這樣可以將風險限制在可控範圍內,同時獲得非同步程式設計的好處。
瀑布設計模式與裝飾器模式
瀑布設計模式允許我們構建可重用非同步元件的路徑。而裝飾器模式則讓我們能夠在不修改核心任務程式碼的情況下增加功能,如日誌記錄或效能監控。
這些模式在非同步系統中特別有用,因為它們允許我們以一種模組化和可維護的方式構建複雜的行為。例如,透過裝飾器模式,我們可以在編譯時透過標誌來啟用或停用特定功能,而不是硬編碼這些功能。
重試與斷路器模式
在分散式系統中,錯誤是不可避免的。重試模式允許我們在遇到暫時性故障時自動重試操作,而斷路器模式則可以防止系統在面對持續故障時不斷重試,從而避免資源浪費和系統當機。
這些模式共同工作,可以建立一個既有彈性又能優雅處理故障的系統。
從Actor模型到設計模式:綜合思考
將Actor模型與這些設計模式結合使用,我們可以構建出既能高效處理並發,又能優雅應對故障的系統。Actor模型提供了隔離和訊息傳遞的基礎架構,而設計模式則提供了構建在這個基礎上的高階行為。
在實際工作中,我發現這種組合特別強大:Actor模型處理「誰在做什麼」,而設計模式處理「如何做得更好」。例如,我們可以將重試邏輯作為Actor的一部分實作,或者作為裝飾器應用於Actor的訊息處理。
透過這種方式,我們可以構建出既模組化又靈活的系統,能夠適應不斷變化的需求和環境。
在非同步程式設計的世界中,這些模式和技術不僅是理論概念,更是實際問題的解決方案。透過理解並應用這些概念,我們可以建立出更加健壯、高效與易於維護的系統。
漸進式非同步整合:不必推倒重來的現代化策略
開發大型系統時,我經常遇到這樣的情境:現有的同步程式碼函式庫龐大與穩定,但新需求要求更高的效能和併發處理能力。完全重寫成非同步架構不僅風險高,還會延緩其他功能的開發進度。在多家企業的技術顧問經驗中,我發現漸進式整合非同步功能的策略往往更為實際。
理想的解決方案是從小處著手—建立獨立的非同步模組,同時提供同步入口點。這種方法有幾個關鍵優勢:
- 非同步模組可以在不影響現有程式碼的情況下獨立開發和測試
- 同步入口點讓其他開發者無需立即學習非同步程式設計即可使用新功能
- 團隊可以按照自己的節奏逐步理解和採用非同步程式設計
橋接同步與非同步世界的設計思路
那麼,如何在提供同步介面的同時獲得非同步程式設計的好處呢?核心思路是建立一個中間層,負責管理非同步任務並提供簡單的同步API。
這個設計的工作流程如下:
- 將非同步任務傳送到執行環境
- 將任務的處理器(handle)存入對映表
- 回傳一個唯一識別碼,對應對映表中的處理器
- 當需要結果時,開發者傳入唯一識別碼,同步等待結果
這種方法讓開發者可以將識別碼視為非同步處理器的代理,但無需直接與非同步程式碼互動。
實作隔離式非同步模組
讓我們開始實作這個隔離式非同步模組。首先,我們需要以下依賴:
[dependencies]
tokio = { version = "1.33.0", features = ["full"] }
uuid = { version = "1.5.0", features = ["v4"] }
接下來,建立async_mod.rs
檔案,其中包含我們的非同步模組程式碼。首先匯入必要的模組:
use std::sync::LazyLock;
use tokio::runtime::{Runtime, Builder};
use tokio::task::JoinHandle;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub type AddFutMap = LazyLock<Arc<Mutex<HashMap<String, JoinHandle<i32>>>>>;
設定執行環境
我們需要建立Tokio執行環境來處理非同步任務:
static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
});
定義非同步核心功能
接著定義我們的非同步加法函式,加入一個延遲來模擬實際的非同步任務:
async fn async_add(a: i32, b: i32) -> i32 {
println!("starting async_add");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
println!("finished async_add");
a + b
}
注意,這個非同步函式不是公開的,它僅在模組內部使用。
構建處理器函式
現在我們需要建立一個處理器函式,作為同步和非同步程式碼之間的橋樑:
fn add_handler(a: Option<i32>, b: Option<i32>, id: Option<String>)
-> Result<(Option<i32>, Option<String>), String> {
static MAP: AddFutMap = LazyLock::new(|| Arc::new(
Mutex::new(HashMap::new())
));
match (a, b, id) {
(Some(a), Some(b), None) => {
// 處理新增任務的情況
let handle = TOKIO_RUNTIME.spawn(async_add(a, b));
let id = uuid::Uuid::new_v4().to_string();
MAP.lock().unwrap().insert(id.clone(), handle);
Ok((None, Some(id)))
},
(None, None, Some(id)) => {
// 處理取得結果的情況
let handle = match MAP.lock().unwrap().remove(&id) {
Some(handle) => handle,
None => return Err("No handle found".to_string())
};
let result: i32 = match TOKIO_RUNTIME.block_on(async {
handle.await
}){
Ok(result) => result,
Err(e) => return Err(e.to_string())
};
Ok((Some(result), None))
},
_ => Err(
"either a or b need to be provided or a handle_id".to_string()
)
}
}
這個處理器函式使用了惰性初始化的對映表來儲存非同步任務處理器。我們使用了Arc<Mutex<HashMap>>
來確保執行緒安全,即使在多執行緒環境中也能正確存取對映表。
關於多執行緒安全的思考
在實際專案中,我發現許多開發者對於執行緒安全性的考量不夠充分。即使目前只在主執行緒中呼叫處理器函式,也應該為未來的擴充套件做好準備。如果100%確定處理器只會在主執行緒中被呼叫,可以去掉Arc
和Mutex
,但這需要使用unsafe
程式碼,風險很高。
另一種選擇是使用thread_local
,只要開發者在相同執行緒中取得結果,這是安全的。但在我的經驗中,提前考慮執行緒安全往往能避免未來的棘手問題。
公開友好的介面
處理器函式的介面不夠人性化,開發者可能會傳入錯誤的參陣列合。因此,我們需要提供更友好的公開介面:
pub fn send_add(a: i32, b: i32) -> Result<String, String> {
match add_handler(Some(a), Some(b), None) {
Ok((None, Some(id))) => Ok(id),
Ok(_) => Err(
"Something went wrong, please contact author".to_string()
),
Err(e) => Err(e)
}
}
pub fn get_add(id: String) -> Result<i32, String> {
match add_handler(None, None, Some(id)) {
Ok((Some(result), None)) => Ok(result),
Ok(_) => Err(
"Something went wrong, please contact author".to_string()
),
Err(e) => Err(e)
}
}
這些公開函式提供了清晰的介面,強制開發者以正確的方式使用我們的模組。
在主程式中使用非同步模組
現在我們可以在main.rs
中使用我們的非同步模組:
mod async_mod;
fn main() {
println!("Hello, world!");
let id = async_mod::send_add(1, 2).unwrap();
println!("id: {}", id);
std::thread::sleep(std::time::Duration::from_secs(4));
println!("main sleep done");
let result = async_mod::get_add(id).unwrap();
println!("result: {}", result);
}
執行後,輸出應該類別似:
Hello, world!
starting async_add
id: e2a2f3e1-2a77-432c-b0b8-923483ae637f
finished async_add
main sleep done
result: 3
我們可以看到,非同步任務在主執行緒繼續執行時被處理,主執行緒可以在需要時取得結果。
隔離式設計的優勢
這種隔離式設計帶來了極大的靈活性。我們可以自由地實驗不同的執行環境和設定,甚至可以切換到本地集合(local sets)並開始使用本地執行緒狀態來快取最近計算的值,以提高計算效能。而與,由於我們的介面完全與非同步原語解耦,其他使用我們模組的開發者不會注意到這些變化。
瀑布流設計模式
瀑布流設計模式(也稱為責任鏈模式)是一種非同步任務鏈,其中每個任務將其結果直接傳給下一個任務。這種模式在處理需要多步驟處理的工作流程中特別有用。
基本實作
在Rust中實作瀑布流設計模式相當直接。我們可以利用Rust的錯誤處理系統來編寫安全與簡潔的程式碼:
type WaterFallResult = Result<String, Box<dyn std::error::Error>>;
async fn task1() -> WaterFallResult {
Ok("Task 1 completed".to_string())
}
async fn task2(input: String) -> WaterFallResult {
Ok(format!("{} then Task 2 completed", input))
}
async fn task3(input: String) -> WaterFallResult {
Ok(format!("{} and finally Task 3 completed", input))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let output1 = task1().await?;
let output2 = task2(output1).await?;
let result = task3(output2).await?;
println!("{}", result);
Ok(())
}
由於所有任務都回傳相同的錯誤類別,我們可以使用?
運算元將它們順序連線起來。
靈活的瀑布流
瀑布流方法簡單與可預測,它還允許我們重用非同步任務作為構建塊。例如,我們可以在這些非同步任務周圍增加邏輯:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let output1 = task1().await?;
let output2: i32;
if output1 > 10 {
output2 = task2(output1).await?;
} else {
output2 = task3(output1).await?;
}
println!("{}", output2);
Ok(())
}
考慮到我們可以使用邏輯來控制瀑布流的走向,這種實作方式對於構建略有不同但使用相同核心元件的工作流程非常有用。我們還可以在元件之間輕鬆插入指標或日誌記錄。
裝飾器模式之美
裝飾器模式是圍繞某個功能的包裝器,可以在不修改原始程式碼的情況下增加新功能或在主執行前後執行邏輯。典型的裝飾器應用包括:
- 測試夾具:在測試前設定資料儲存狀態,並在測試後銷毀狀態
- 日誌記錄:可以輕鬆關閉日誌記錄而無需更改核心邏輯
- 工作階段管理:處理使用者工作階段的建立和銷毀
基本裝飾器實作
讓我們看如何實作一個基本的裝飾器。首先定義一個特性(trait):
trait Greeting {
fn greet(&self) -> String;
}
然後定義一個實作該特性的結構體:
struct HelloWorld;
impl Greeting for HelloWorld {
fn greet(&self) -> String {
"Hello, World!".to_string()
}
}
現在我們可以定義一個裝飾器結構體,它包含一個實作了相同特性的內部元件:
struct ExcitedGreeting<T> {
inner: T,
}
impl<T> ExcitedGreeting<T>
where
T: Greeting
{
fn greet(&self) -> String {
let mut greeting = self.inner.greet();
greeting.push_str(" I'm so excited to be in Rust!");
greeting
}
}
這裡我們呼叫內部結構體的特性方法,增加到字串,然後回傳修改後的字串。測試我們的裝飾器模式很簡單:
fn main() {
let raw_one = HelloWorld;
let raw_two = HelloWorld;
let decorated = ExcitedGreeting { inner: raw_two };
println!("{}", raw_one.greet());
println!("{}", decorated.greet());
}
條件裝飾器
我們甚至可以使裝飾器模式的實作依賴於編譯特性。例如,在Cargo.toml
中增加一個特性:
[features]
logging_decorator = []
然後可以根據特性標誌重寫我們的主函式,以編譯帶有裝飾邏輯的版本(或不帶):
fn main() {
let greeter = HelloWorld;
#[cfg(feature = "logging_decorator")]
let greeter = ExcitedGreeting { inner: greeter };
println!("{}", greeter.