async
和 .await
關鍵字是 Rust 中相對較新的功能,它們極大地簡化了非同步程式設計。
async 與 .await 的基本語法
async
: 可用於函式、閉包或程式碼區塊,會回傳一個Future
.await
: 用於等待Future
的結果,本質上是呼叫Future
特性的poll()
方法
使用這些關鍵字讓我們能夠編寫看起來像同步程式碼但實際執行方式是非同步的程式,大降低了使用 Future
的複雜性。
async 上下文
要使用 .await
,必須在非同步上下文味著包含 .await
的程式碼必須位於標記為 async
的區塊或函式中,與該非同步程式碼必須在非同步執行時上執行。
未執行的 Future 行為示範
考慮以下程式,它展示了不同方式處理非同步程式碼的行為差異:
#[tokio::main]
async fn main() {
async {
println!("這行會最先印出");
}
.await;
let _future = async {
println!("這行永遠不會印出");
};
tokio::task::spawn(async {
println!(
"這行有時會印出,取決於執行速度"
)
});
println!("這行一定會印出,但可能不是最後印出");
}
這個範例揭示了非同步程式設計中的幾個重要概念:
- 第一個
println!
一定會在其他輸出前執行,因為我們使用了.await
等待其完成。 - 第二個
println!
永遠不會執行,因為我們建立了Future
但從未執行它(既沒有.await
也沒有將其生成到執行時上)。 - 第三個
println!
被生成到 Tokio 執行時上執行,但由於我們沒有等待它完成,所以它可能執行也可能不執行順序也不確定。 - 最後一個
println!
總是會執行,但它可能是最後一個執行也可能不是,取決於非同步任務的執行速度。
這個範例生動展示了非同步程式設計中的一個核心概念:僅定義一個 Future
不會導致其執行,必須顯式地執行它(透過 .await
或將其生成到執行時)。
非同步任務生命週期管理
在非同步程式設計中,理解任務的生命週期至關重要。當使用 tokio::task::spawn
時,我們建立了獨立的任務,這些任務會在背景執行,而不會阻塞主程式流程。
等待與不等待的選擇
在設計非同步系統時,我經常需要決定是否等待特定任務完成。這個決定取決於任務的性質:
- 需要結果的任務:必須使用
.await
等待完成 - 背景處理任務:可以使用
spawn
啟動並讓其獨立執行 - 有限資源任務:即使不需要結果,也可能需要限制同時執行的數量
任務取消與資源釋放
值得注意的是,當包含非同步任務的範圍結束時,未完成的任務可能會被取消。這在資源管理方面具有重要影響,特別是當任務涉及檔案控制程式碼、網路連線或其他需要正確關閉的資源時。
非同步程式設計的最佳實踐
從我的經驗來看,以下是一些非同步程式設計的最佳實踐:
- 將長時間執行的 CPU 密集型任務移至專用執行緒池
- 避免在非同步上下文塞操作
- 適當使用超時機制防止任務無限等待
- 設計良好的錯誤處理策略,特別是針對可能失敗的非同步操作
- 謹慎管理分享資源,避免死鎖和資源競爭
理解 Rust 非同步模型的獨特性
Rust 的非同步模型與其他語言有顯著不同。它採用了根據輪詢(poll-based)而非根據回呼(callback-based)的方法,這使得它能夠在不犧牲效能的情況下提供更好的開發體驗。
零成本抽象的實作
Rust 的非同步模型是一個零成本抽象的絕佳例子。編譯器將 async
/.await
語法轉換為狀態機,這種方式避免了執行時開銷,同時保持了程式碼的可讀性和可維護性。
與其他語言的比較
與 JavaScript 或 Python 等語言相比,Rust 的非同步模型需要更多的顯式控制,但這種控制帶來了更好的效能和更少的意外行為。特別是,Rust 的非同步模型在處理大量並發任務時表現出色,同時保持了記憶體全和執行緒全。
非同步程式設計是現代高效能系統的關鍵組成部分。透過正確理解和應用 Rust 的非同步模型,我們可以構建既高效又可靠的系統,充分利用現代硬體的平行能力,同時避免傳統並發程式設計中的許多陷阱。
Rust 的 async
/.await
語法和 Tokio 執行時的結合,為開發者提供了強大的工具來處理複雜的非同步場景。雖然學習曲線可能較陡,但掌握這些概念後,能夠開發出在效能和可靠性方面都表現出色的應用程式。
Tokio任務的生命週期管理與執行控制
在探討Rust非同步程式設計時,理解任務的生命週期管理至關重要。當我們使用Tokio執行非同步程式時,有時會遇到一些看似簡單但實際上需要深入理解的問題。
非同步執行的時機挑戰
在使用Tokio時,我開發中曾遇到一個常見的問題:為什麼某些非同步程式碼似乎沒有按預期執行?考慮以下情況:
#[tokio::main]
async fn main() {
println!("First print statement");
tokio::task::spawn(async {
println!("Third print statement");
});
println!("Second print statement");
}
在這個例子中,第三個println!可能不會一致地顯示。這是因為程式可能在Tokio執行階段的排程器有機會執行該程式碼之前就結束了。這揭示了一個重要概念:非同步任務的生命週期與主程式的生命週期並不總是一致的。
任務等待與執行保證
如果要確保非同步任務在程式結束前執行完成,我們需要等待由tokio::task::spawn()
回傳的Future完成。這個函式有一個重要特性:它允許我們從非同步上下文步執行階段上啟動非同步任務,同時回傳一個可以像其他物件一樣傳遞的Future(具體是tokio::task::JoinHandle
)。
從非同步函式建立任務
use tokio::task::JoinHandle;
fn not_an_async_function() -> JoinHandle<()> {
tokio::task::spawn(async {
println!("Second print statement");
})
}
#[tokio::main]
async fn main() {
println!("First print statement");
not_an_async_function().await.ok();
}
這段程式碼展示瞭如何從普通函式回傳一個非同步任務。not_an_async_function()
雖然不是非同步函式(沒有標記為async
),但它回傳一個JoinHandle
(一種實作了Future
特徵的類別)。
關鍵在於tokio::task::spawn()
函式能在非同步執行階段上產生任務並回傳一個JoinHandle
,讓我們可以等待任務完成或取消任務。在main
函式中,我們使用.await
等待從函式回傳的Future完成,這確保了印出陳述式的執行。.ok()
則用於忽略可能的錯誤結果。
在非同步上下文await
在非同步上下文使用.await
是不可能的。但我們可以使用tokio::runtime::Handle::block_on()
方法來阻塞等待Future的結果。這需要取得執行階段的handle並將其移動到需要阻塞的執行緒:
use tokio::runtime::Handle;
fn not_an_async_function(handle: Handle) {
handle.block_on(async {
println!("Second print statement");
})
}
#[tokio::main]
async fn main() {
println!("First print statement");
let handle = Handle::current();
std::thread::spawn(move || {
not_an_async_function(handle);
});
}
這個例子展示瞭如何在非同步上下文普通執行緒中執行非同步程式碼Handle::current()
取得當前非同步執行階段的handle,這個handle可以被克隆和分享,提供從非同步上下文步執行階段的能力。
我們使用std::thread::spawn()
建立一個新執行緒並使用move
捕捉變數(包括handle)。在新執行緒,我們呼叫not_an_async_function()
並傳入handle,該函式使用block_on()
在當前執行緒阻塞直到非同步程式碼行完成。
雖然這種方法有效,但並不優雅,應該盡量避免使用。在大多數情況下,應該優先使用async
和.await
。
非同步程式設計的核心原則
簡而言之,當需要執行非同步任務或回傳Future時,應使用async
包裝程式碼塊(包括函式和閉包);當需要等待非同步任務時,應使用.await
。建立一個非同步塊並不會執行Future,它仍然需要在執行階段上執行(或使用tokio::task::spawn()
生成)。
非同步程式設計中的並發與平行處理
在非同步程式設計中,並發(concurrency)和平行處理(parallelism)是兩個不同但相關的概念。使用非同步時,這兩種特性都不是自動獲得的,需要合理構建程式碼才能充分利用。
Tokio中的並發與平行處理控制
Tokio對平行處理沒有明確控制(除了使用tokio::task::spawn_blocking()
啟動阻塞任務外,這總是在單獨的執行緒執行)。我們可以明確控制並發,但無法控制個別任務的平行度,這些細節由執行階段決定。Tokio允許我們設定工作執行緒數量,但執行階段會決定為每個任務使用哪些執行緒
引入並發的三種方式
在程式碼中引入並發可以透過以下三種方式實作:
- 使用
tokio::task::spawn()
生成任務 - 使用
tokio::join!(...)
或futures::future::join_all()
合併多個Future - 使用
tokio::select! { ... }
巨集它允許我們等待多個並發程式碼支
平行處理的實作
要引入平行處理,必須使用tokio::task::spawn()
,但這並不提供明確的平行處理控制。當生成任務時,我們告訴Tokio這個任務可以在任何執行緒執行,但Tokio仍然決定使用哪個執行緒如果我們的Tokio執行階段只有一個工作執行緒即使用tokio::task::spawn()
,所有任務也會在一個執行緒執行。
並發與平行處理的實際演示
讓我們透過一些範例程式碼來展示這種行為:
async fn sleep_1s_blocking(task: &str) {
use std::{thread, time::Duration};
println!("Entering sleep_1s_blocking({task})");
thread::sleep(Duration::from_secs(1));
println!("Returning from sleep_1s_blocking({task})");
}
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
println!("Test 1: Run 2 async tasks sequentially");
sleep_1s_blocking("Task 1").await;
sleep_1s_blocking("Task 2").await;
println!("Test 2: Run 2 async tasks concurrently (same thread)");
tokio::join!(
sleep_1s_blocking("Task 3"),
sleep_1s_blocking("Task 4")
);
println!("Test 3: Run 2 async tasks in parallel");
tokio::join!(
tokio::spawn(sleep_1s_blocking("Task 5")),
tokio::spawn(sleep_1s_blocking("Task 6"))
);
}
這個例子故意使用了阻塞的std::thread::sleep()
而不是非同步的睡眠函式,以展示平行處理的效果。我們明確設定Tokio使用兩個工作執行緒這允許我們平行執行任務。
- 測試1:連續呼叫
sleep_1s_blocking()
兩次,沒有並發或平行處理。 - 測試2:使用
tokio::join!()
同時呼叫sleep_1s_blocking()
兩次,引入了並發但非平行處理(因為是阻塞函式)。 - 測試3:使用
tokio::spawn()
生成任務然後合併結果,引入了平行處理。
執行這段程式碼會產生以下輸出:
Test 1: Run 2 async tasks sequentially
Entering sleep_1s_blocking(Task 1)
Returning from sleep_1s_blocking(Task 1)
Entering sleep_1s_blocking(Task 2)
Returning from sleep_1s_blocking(Task 2)
Test 2: Run 2 async tasks concurrently (same thread)
Entering sleep_1s_blocking(Task 3)
Returning from sleep_1s_blocking(Task 3)
Entering sleep_1s_blocking(Task 4)
Returning from sleep_1s_blocking(Task 4)
Test 3: Run 2 async tasks in parallel
Entering sleep_1s_blocking(Task 5)
Entering sleep_1s_blocking(Task 6)
Returning from sleep_1s_blocking(Task 5)
Returning from sleep_1s_blocking(Task 6)
從輸出可以看出,只有在第三次測試中,我們使用tokio::spawn()
(等同於tokio::task::spawn()
)啟動每個任務時,程式碼才會平行執行。我們可以看到兩個"Entering…“陳述式都在"Returning…“陳述式之前出現,這表明任務確實是平行執行的。
非阻塞睡眠的效果
雖然第二次測試確實是並發執行的,但由於我們使用了阻塞的睡眠函式,所以任務實際上是順序執行的。讓我們用非阻塞的睡眠函式更新程式碼:
async fn sleep_1s_nonblocking(task: &str) {
use tokio::time::{sleep, Duration};
println!("Entering sleep_1s_nonblocking({task})");
sleep(Duration::from_secs(1)).await;
println!("Returning from sleep_1s_nonblocking({task})");
}
這個版本使用了tokio::time::sleep
,這是一個非阻塞的睡眠函式,能夠在等待時釋放執行緒允許其他任務執行。當我們使用這個非阻塞版本時,即使在單個執行緒,並發任務也能交錯執行,因為在等待時控制權會回傳給執行階段。
使用這個非阻塞版本執行三個測試,我們得到以下輸出:
Test 4: Run 2 async tasks sequentially (non-blocking)
Entering sleep_1s_nonblocking(Task 7)
Returning from sleep_1s_nonblocking(Task 7)
Entering sleep_1s_nonblocking(Task 8)
Returning from sleep_1s_nonblocking(Task 8)
Test 5: Run 2 async tasks concurrently (same thread, non-blocking)
Entering sleep_1s_nonblocking(Task 9)
Entering sleep_1s_nonblocking(Task 10)
Returning from sleep_1s_nonblocking(Task 10)
Returning from sleep_1s_nonblocking(Task 9)
Test 6: Run 2 async tasks in parallel (non-blocking)
Entering sleep_1s_nonblocking(Task 11)
在第五次測試中,我們可以看到兩個任務確實是並發執行的,因為兩個"Entering…“陳述式都在任何"Returning…“陳述式之前出現。這表明,即使在同一個執行緒,非阻塞的非同步程式碼也能實作真正的並發。
並發與平行處理的選擇
在設計非同步系統時,我需要根據具體需求選擇合適的並發和平行處理策略:
- 對於I/O密集型任務(如網路請求),並發通常是一個好選擇,因為這些操作大部分時間都在等待外部資源。
- 對於CPU密集型任務,平行處理更為重要,因為這些任務需要實際的計算能力。
Tokio為這兩種情況都提供了工具,但關鍵是要理解何時使用spawn()
(引入潛在的平行處理)以及何時使用join!()
(僅引入並發)。
非同步程式設計的最佳實踐
在我多年的Rust非同步程式設計經驗中,發現以下幾點最佳實踐對構建高效的非同步系統非常有幫助:
理解任務生命週期:確保理解非同步任務的生命週期,特別是它們與主程式的關係。
選擇適當的並發模型:根據任務的性質(I/O密集或CPU密集)選擇適當的並發或平行處理策略。
避免阻塞執行階段:在非同步上下文用阻塞操作,如
std::thread::sleep()
或阻塞I/O。如果必須使用阻塞操作,請考慮使用tokio::task::spawn_blocking()
。合理使用資源:設定適當數量的工作執行緒
理解Rust非同步執行模型的關鍵差異
在非同步Rust程式設計中,理解阻塞與非阻塞操作的區別至關重要。透過觀察執行時序圖,我們可以清楚看到這兩種方法在實際執行時的行為差異。
阻塞與非阻塞執行的視覺化比較
當我們使用阻塞式睡眠時,任務會依序執行。從圖中可以看出,每個任務必須等待前一個任務完成後才能開始執行,這導致總執行時間變成所有任務時間的總和。
相反地,使用非阻塞睡眠時,我們可以觀察到任務能夠並發執行。雖然圖中顯示Test 5和Test 6看似平行執行,但實際上只有Test 6真正以平行方式執行,而Test 5是以並發方式執行。
這個差異在調整Tokio設定時變得更加明顯。如果將worker_threads
設為1並重新執行測試,阻塞版本的所有任務會按順序執行,但非阻塞版本仍然保持並發執行,即使只有一個執行緒。
// 非阻塞睡眠的實作範例
async fn sleep_1s_nonblocking(task_name: &str) {
println!("Entering sleep_1s_nonblocking({})", task_name);
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Returning from sleep_1s_nonblocking({})", task_name);
}
這段程式碼實作了一個非阻塞的睡眠函式。當呼叫此函式時,它會先輸出進入睡眠狀態的訊息,然後使用Tokio的sleep
函式進行非阻塞等待。關鍵在於.await
運算元,它允許函式暫停執行而不會阻塞整個執行緒,使其他任務能在這段等待時間內執行。當睡眠結束後,函式繼續執行並輸出完成訊息。這種實作方式使得多個任務能夠並發執行,即使在單執行緒環境中也能實作並發性。
如果你初次接觸Rust的非同步程式設計,並發與平行的概念可能會有些混淆。我建議親自嘗試實作範例並調整不同引數,以更好地理解其中的運作原理。在我剛開始使用非同步Rust時,也花了不少時間才真正掌握這些概念的精髓。
實作非同步觀察者模式在非同步程式設計中特別有用,但在Rust中實作非同步觀察者模式有一些挑戰。讓我們來看如何克服這些限制。
非同步特性的限制與解決方案
在撰寫本文時,Rust的非同步支援有一個重要限制:我們無法在特性(trait)中直接使用非同步方法。例如,以下程式碼是無效的:
trait MyAsyncTrait {
async fn do_thing();
}
這個限制使得實作非同步觀察者模式變得複雜。不過,我們可以透過理解Rust如何實作async fn
語法糖來找到解決方案。
實際上,async
和.await
關鍵字只是處理Future的便捷語法。當我們宣告一個async
函式或程式碼區塊時,編譯器會將該程式碼包裝在Future中。因此,我們仍然可以在特性中建立等效的async
函式,只是需要明確地做到這一點(不使用語法糖)。
從同步到非同步觀察者
讓我們從原始的同步觀察者特性開始:
pub trait Observer {
type Subject;
fn observe(&self, subject: &Self::Subject);
}
要將observe()
方法轉換為非同步函式,第一步是讓它回傳一個Future
。我們可以嘗試這樣做:
pub trait Observer {
type Subject;
type Output: Future<Output = ()>;
fn observe(&self, subject: &Self::Subject) -> Self::Output;
}
在這個第一次嘗試中,我們定義了一個關聯類別Output
,它有Future<Output = ()>
特性約束,並將observe
方法修改為回傳這個Output
類別。這看起來應該可行,程式碼能編譯透過。但當我們嘗試實作這個特性時,會遇到問題:因為Future
只是一個特性(不是具體類別),我們無法明確指定Output
的類別。這表明我們不能直接用關聯類別來解決這個問題。
使用特性物件解決問題
由於無法直接使用關聯類別,我們需要使用特性物件並將Future包裝在Box
中:
pub trait Observer {
type Subject;
type Output;
fn observe(
&self,
subject: &Self::Subject,
) -> Box<dyn Future<Output = Self::Output>>;
}
現在我們可以實作這個非同步觀察者特性:
struct Subject;
struct MyObserver;
impl Observer for MyObserver {
type Subject = Subject;
type Output = ();
fn observe(
&self,
_subject: &Self::Subject,
) -> Box<dyn Future<Output = Self::Output>> {
Box::new(async {
// 在這裡執行一些非同步操作
use tokio::time::{sleep, Duration};
sleep(Duration::from_millis(100)).await;
})
}
}
在這個實作中,我們定義了一個MyObserver
結構體,並為它實作了Observer
特性。我們指定關聯類別Subject
為Subject
結構體,Output
為()
(即無回傳值)。在observe
方法中,我們使用Box::new
將一個async
區塊包裝起來,這個區塊內部使用了Tokio的sleep
函式進行非同步等待。這個方法回傳的是一個裝箱的Future
特性物件。
處理Future的Pin問題
如果我們嘗試使用上面的實作,會遇到另一個問題:
#[tokio::main]
async fn main() {
let subject = Subject;
let observer = MyObserver;
observer.observe(&subject).await;
}
編譯器會報錯說dyn Future<Output = ()>
無法被unpinned。這是因為在Rust的標準函式庫Future
特性的poll
方法需要接收一個Pin<&mut Self>
類別的引數:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
這意味著在使用.await
(它會呼叫poll
方法)之前,我們需要將Future固定(pinned)。幸運的是,取得一個固定指標很簡單,我們只需要再次更新我們的Observer
特性:
pub trait Observer {
type Subject;
type Output;
fn observe(
&self,
subject: &Self::Subject,
) -> Pin<Box<dyn Future<Output = Self::Output>>>;
}
然後更新實作:
impl Observer for MyObserver {
type Subject = Subject;
type Output = ();
fn observe(
&self,
_subject: &Self::Subject,
) -> Pin<Box<dyn Future<Output = Self::Output>>> {
Box::pin(async {
// 在這裡執行一些非同步操作
use tokio::time::{sleep, Duration};
sleep(Duration::from_millis(100)).await;
})
}
}
在這個更新版本中,我們將回傳類別從Box<dyn Future<...>>
改為Pin<Box<dyn Future<...>>>
,並使用Box::pin()
函式而不是Box::new()
。Box::pin()
函式會方便地為我們回傳一個固定的Box。這解決了使用.await
時遇到的問題,因為現在我們的Future已經被固定,可以被安全地輪詢了。
完整的非同步觀察者模式實作
但我們還沒有完全解決問題。Observable
特性的實作更加複雜,特別是當我們需要在非同步方法中傳遞self
參照時。我們還需要確保每個Observer
例項都實作了Send
和Sync
,這樣我們才能並發地觀察更新。
以下是最終版本的Observer
特性:
pub trait Observer: Send + Sync {
type Subject;
type Output;
fn observe<'a>(
&'a self,
subject: &'a Self::Subject,
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>;
}
以及對應的Observable
特性:
pub trait Observable {
type Observer;
fn update<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>;
fn attach(&mut self, observer: Self::Observer);
fn detach(&mut self, observer: Self::Observer);
}
在最終版本中,我們對Observer
特性做了幾個重要的改進:
- 增加了
Send + Sync
超特性約束,確保我們的觀察者可以在執行緒間安全地分享 - 增加了生命週期引數
'a
,允許我們在非同步方法中傳遞self
和subject
參照 - 在回傳類別中增加了
'a
和Send
約束,確保Future可以在執行緒間安全地移動
同樣,Observable
特性的update
方法現在回傳一個固定的、可傳送的Future,它的生命週期與self
參照相關聯。這些改進使得我們可以安全地在非同步環境下使用觀察者模式。