在 Rust 的非同步程式設計中,接收器(Receiver) 扮演著關鍵角色,負責從非同步通道(async channel)或訊息佇列(message queue)接收資料,確保並發任務之間的有效通訊。無論是在分散式系統、資料流處理,還是事件驅動架構中,設計高效能與可靠的接收器,都是構建穩健非同步應用的核心要素。
此外,我們將示範如何在實務應用中使用接收器,例如監聽感測器資料、處理即時事件流,或管理併發請求。這篇文章將幫助你更深入理解 Rust 非同步通訊機制,並學會如何設計高效能、可擴充套件的接收器架構。
建構接收器
接收器會等待串流中的資料,如果串流中沒有可讀取的資料,則回傳Pending。在async_runtime/src/receiver.rs
檔案中,我們匯入以下內容:
use std::{
future::Future,
task::{Context, Poll},
pin::Pin,
net::TcpStream,
io::{self, Read},
sync::{Arc, Mutex}
};
由於我們要回傳位元組,所以接收器future的形式如下:
pub struct TcpReceiver {
pub stream: Arc<Mutex<TcpStream>>,
pub buffer: Vec<u8>
}
impl Future for TcpReceiver {
type Output = io::Result<Vec<u8>>;
fn poll(
開發自己的非同步執行器:零依賴非同步伺服器實作
在Rust的生態系統中,Tokio和async-std等成熟的非同步框架已經成為許多開發者的首選。但這些框架背後的非同步機制究竟如何運作?如何從零開始開發自己的非同步執行器?本文將帶你深入探索非同步伺服器的底層實作,完全不依賴任何第三方函式庫 多年來,玄貓一直對非同步系統的底層機制充滿好奇,所以決定深入研究這個領域。透過實作自己的非同步執行器,我不僅能更好地理解現有框架的設計決策,還能針對特定需求進行客製化。這段旅程讓我對Rust的非同步系統有了更深刻的理解。
自定義睡眠功能的實作
非同步系統的核心元素之一是能夠暫停執行而不阻塞執行緒。讓我們從實作一個簡單的Sleep
結構開始:
pub struct Sleep {
when: Instant,
}
impl Sleep {
pub fn new(duration: Duration) -Self {
Sleep {
when: Instant::now() + duration,
}
}
}
這個結構非常簡單,只記錄了睡眠應該結束的時間點。接下來,我們為其實作Future
特徵:
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output{
let now = Instant::now();
if now >= self.when {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
這裡的實作包含了非同步程式設計的核心概念。每當poll
被呼叫時,我們檢查當前時間是否已經超過預定的結束時間:
- 如果已超過,我們回傳
Poll::Ready(())
,表示這個Future已完成 - 如果還沒到時間,我們通知喚醒器稍後再次喚醒我們,並回傳
Poll::Pending
這個簡單的模式是所有非同步操作的基礎:檢查條件、完成或請求再次喚醒。
構建我們的伺服器
現在我們已經準備好構建伺服器了。首先需要引入必要的模組:
use std::{
thread,
sync::{mpsc::channel, atomic::{AtomicBool, Ordering}},
io::{self, Read, Write, ErrorKind, Cursor},
net::{TcpListener, TcpStream}
};
use data_layer::data::Data;
use async_runtime::{
executor::Executor,
sleep::Sleep
};
為了處理並發請求,我們需要一種機制來分配工作並管理執行緒。我選擇使用原子布林值來追蹤執行緒狀態:
static FLAGS: [AtomicBool; 3] = [
AtomicBool::new(false),
AtomicBool::new(false),
AtomicBool::new(false),
];
這些原子布林值充當執行緒狀態的指示器。當值為false
時,執行緒處於活躍狀態;當值為true
時,執行緒已暫停等待工作。
工作執行緒的實作
為了高效地實作工作執行緒,我設計了一個巨集來避免重複程式碼:
macro_rules! spawn_worker {
($name:expr, $rx:expr, $flag:expr) ={
thread::spawn(move || {
let mut executor = Executor::new();
loop {
if let Ok(stream) = $rx.try_recv() {
println!(
"{} Received connection: {}",
$name,
stream.peer_addr().unwrap()
);
executor.spawn(handle_client(stream));
} else {
if executor.polling.len() == 0 {
println!("{} is sleeping", $name);
$flag.store(true, Ordering::SeqCst);
thread::park();
}
}
executor.poll();
}
})
};
}
這個巨集封裝了工作執行緒的核心邏輯:
- 建立一個新的執行器
- 嘗試從通道接收連線
- 如果有連線,將其轉換為非同步任務並加入執行器
- 如果沒有連線與沒有待處理的任務,則將執行緒標記為睡眠並暫停
- 在每個迴圈迭代中輪詢執行器以推進任務
這種設計允許我們在沒有工作時暫停執行緒,有效地利用系統資源。
主程式的實作
現在讓我們看如何在主函式中協調這一切:
fn main() -io::Result<(){
// 為工作執行緒建立通道
let (one_tx, one_rx) = channel::<TcpStream>();
let (two_tx, two_rx) = channel::<TcpStream>();
let (three_tx, three_rx) = channel::<TcpStream>();
// 啟動工作執行緒
let one = spawn_worker!("One", one_rx, &FLAGS[0]);
let two = spawn_worker!("Two", two_rx, &FLAGS[1]);
let three = spawn_worker!("Three", three_rx, &FLAGS[2]);
// 設定路由和參照
let router = [one_tx, two_tx, three_tx];
let threads = [one, two, three];
let mut index = 0;
// 啟動TCP監聽器
let listener = TcpListener::bind("127.0.0.1:7878")?;
println!("伺服器監聽於連線埠 7878");
// 處理傳入連線
for stream in listener.incoming() {
match stream {
Ok(stream) ={
// 將連線傳送到當前工作執行緒
let _ = router[index].send(stream);
// 如果執行緒已暫停,喚醒它
if FLAGS[index].load(Ordering::SeqCst) {
FLAGS[index].store(false, Ordering::SeqCst);
threads[index].thread().unpark();
}
// 迴圈到下一個工作執行緒
index += 1;
if index == 3 {
index = 0;
}
}
Err(e) ={
println!("連線失敗: {}", e);
}
}
}
Ok(())
}
這個實作建立了三個工作執行緒,每個執行緒都有自己的執行器和通道。主執行緒接受傳入的TCP連線並以輪詢方式將它們分發給工作執行緒。如果一個執行緒已經暫停,我們會在傳送連線之前喚醒它。
處理客戶端請求
當一個連線被傳送到工作執行緒時,它會被轉換為非同步任務並交給執行器。下面是處理客戶端請求的非同步函式:
async fn handle_client(mut stream: TcpStream) -std::io::Result<(){
stream.set_nonblocking(true)?;
let mut buffer = Vec::new();
let mut local_buf = [0; 1024];
loop {
match stream.read(&mut local_buf) {
Ok(0) ={
break;
},
Ok(len) ={
buffer.extend_from_slice(&local_buf[..len]);
},
Err(ref e) if e.kind() == ErrorKind::WouldBlock ={
if buffer.len() 0 {
break;
}
Sleep::new(std::time::Duration::from_millis(10)).await;
continue;
},
Err(e) ={
println!("讀取連線失敗: {}", e);
}
}
}
match Data::deserialize(&mut Cursor::new(buffer.as_slice())) {
Ok(message) ={
println!("收到訊息: {:?}", message);
},
Err(e) ={
println!("解碼訊息失敗: {}", e);
}
}
// 模擬處理工作
Sleep::new(std::time::Duration::from_secs(1)).await;
// 回應客戶端
stream.write_all(b"Hello, client!")?;
Ok(())
}
這個函式首先將TCP流設定為非阻塞模式,然後從流中讀取資料。當流暫時沒有更多資料時,它會暫停一小段時間,讓執行器有機會處理其他任務。一旦收到完整的訊息,它會進行處理(在這個例子中只是記錄訊息),等待1秒鐘(模擬工作),然後回應客戶端。
這個1秒的延遲是測試我們非同步系統的好方法。如果系統真的是非同步的,那麼即使我們傳送10個請求,總處理時間也應該接近1秒,而不是10秒。
構建非同步客戶端
為了測試我們的伺服器,我們還需要一個客戶端使用相同的非同步執行時,但具有傳送和接收功能:
async fn send_data(field1: u32, field2: u16, field3: String)
-io::Result<String{
let stream = Arc::new(Mutex::new(TcpStream::connect(
"127.0.0.1:7878")?
));
let message = Data {field1, field2, field3};
TcpSender {
stream: stream.clone(),
buffer: message.serialize()?
}.await?;
let receiver = TcpReceiver {
stream: stream.clone(),
buffer: Vec::new()
};
String::from_utf8(receiver.await?).map_err(|_|
io::Error::new(io::ErrorKind::InvalidData, "Invalid UTF-8")
)
}
這個函式建立TCP連線,序列化訊息並傳送,然後等待回應。現在我們可以在主函式中使用它:
fn main() -io::Result<(){
let mut executor = Executor::new();
let mut handles = Vec::new();
let start = Instant::now();
// 建立4000個非同步任務
for i in 0..4000 {
let handle = executor.spawn(send_data(
i, i as u16, format!("Hello, server! {}", i)
));
handles.push(handle);
}
// 在單獨的執行緒中執行器
std::thread::spawn(move || {
loop {
executor.poll();
}
});
println!("等待結果...");
// 收集所有任務的結果
for handle in handles {
match handle.recv().unwrap() {
Ok(result) =println!("結果: {}", result),
Err(e) =println!("錯誤: {}", e)
};
}
let duration = start.elapsed();
println!("耗時: {:?}", duration);
Ok(())
}
這個客戶端建立4000個任務,每個任務都會傳送訊息並等待回應。透過測量總執行時間,我們可以驗證我們的非同步系統是否正常工作。
自定義非同步執行時的優缺點
在實際開發中,我們究竟應該使用自定義的非同步執行時還是像Tokio這樣的成熟框架呢?讓我們來分析一下:
自定義執行時的優勢
- 深入理解:建立自己的非同步執行時能幫助你深入理解非同步系統的工作原理。
- 客製化:針對特定需求進行最佳化和調整。
- 零依賴:不需要引入任何外部依賴,可以減少二進位檔案大小。
- 學習機會:提供了寶貴的學習經驗和知識。
- 特殊場景應用:在某些非常特殊的場景下,客製化的解決方案可能更適合。
使用成熟框架的優勢
- 效能與穩定性:像Tokio這樣的框架經過了廣泛的測試和最佳化。
- 生態系統整合:許多第三方函式庫主流非同步執行時整合,使用自定義執行時可能失去這些整合。
- 維護成本:不需要自己維護和更新非同步執行時。
- 社群支援:遇到問題時可以從大型社群獲得幫助。
- 最佳實踐:框架通常實作了業界的最佳實踐和模式。
混合方法:兩全其美
有趣的是,我們可以採取混合方法,同時使用成熟框架和自定義執行時。例如,我們可以使用Tokio的通道將任務傳送到我們的自定義非同步執行時。
Rust 測試進階:從隔離測試到非同步程式驗證
在開發複雜系統時,測試是確保程式品質的關鍵環節。尤其對於 Rust 這樣注重安全與正確性的語言,良好的測試策略尤為重要。在我多年的 Rust 開發經驗中,發現許多工程師常忽略測試中的模擬(mock)和非同步測試,導致系統上線後出現難以預期的問題。
今天玄貓將分享如何在 Rust 中進行有效的單元測試,特別是如何處理非同步程式和檢測死鎖問題。這些技巧對於建立穩健的 Rust 應用程式至關重要。
測試的基本概念與相依性注入
在深入技術細節前,讓我們先理解一個核心概念:相依性注入(Dependency Injection)。這是一種設計模式,我們將一個結構體、物件或函式作為引數傳入另一個函式中,由這個傳入的引數來執行計算。
在 Rust 中,我們通常透過 trait 實作相依性注入。這種方法的強大之處在於它提供了極高的靈活性。例如,我們可以為資料函式庫和檔案讀取實作相同的 trait,然後根據需要選擇不同的實作。在測試中,我們可以建立模擬(mock)結構體來模擬這些行為,從而實作隔離測試。
準備環境:設定依賴與定義介面
首先,我們需要新增必要的依賴項到我們的 Cargo.toml
檔案中:
[dev-dependencies]
mockall = "0.11.4"
mockall
套件讓我們能夠模擬 trait 及其函式,這樣我們就可以檢查輸入並模擬輸出。
接下來,我們為我們的隔離模組定義一個介面。根據需求,我們需要兩個函式:spawn
(回傳一個鍵值)和 get_result
(回傳我們啟動的非同步任務結果)。我們可以定義以下 trait:
pub trait AsyncProcess<X, Y, Z{
fn spawn(&self, input: X) -Result<Y, String>;
fn get_result(&self, key: Y) -Result<Z, String>;
}
這裡我們使用泛型引數,以便可以變化輸入、輸出和使用的鍵值類別。
實作需要測試的函式
現在我們可以移到我們的非同步函式,該函式會生成一個任務,輸出一些內容,然後從非同步函式取得結果並處理:
fn do_something<T>(async_handle: T, input: i32) -Result<i32, String>
where T: AsyncProcess<i32, String, i32>
{
let key = async_handle.spawn(input)?;
println!("something is happening");
let result = async_handle.get_result(key)?;
if result 10 {
return Err("result is too big".to_string());
}
if result == 8 {
return Ok(result * 2)
}
Ok(result * 3)
}
這個函式依賴於一個實作了 AsyncProcess
trait 的處理器,它處理非同步任務的生成和結果取得。函式邏輯很簡單:生成任務,輸出資訊,取得結果,然後根據結果進行不同的處理。
使用 mockall 進行單元測試
現在讓我們來測試這個函式。我們將使用 mockall
建立一個模擬的 AsyncProcess
實作:
#[cfg(test)]
mod get_team_processes_tests {
use super::*;
use mockall::predicate::*;
use mockall::mock;
mock! {
DatabaseHandler {}
impl AsyncProcess<i32, String, i32for DatabaseHandler {
fn spawn(&self, input: i32) -Result<String, String>;
fn get_result(&self, key: String) -Result<i32, String>;
}
}
#[test]
fn do_something_fail() {
let mut handle = MockDatabaseHandler::new();
// 設定 spawn 函式的期望行為
handle.expect_spawn()
.with(eq(4))
.returning(|_| Ok("test_key".to_string()));
// 設定 get_result 函式的期望行為
handle.expect_get_result()
.with(eq("test_key".to_string()))
.returning(|_| Ok(11));
// 執行函式並驗證結果
let outcome = do_something(handle, 4);
assert_eq!(outcome, Err("result is too big".to_string()));
}
}
這個測試遵循了業界標準的「安排、行動、斷言」(Arrange-Act-Assert)測試模式:
- 安排(Arrange):我們設定測試環境並定義模擬物件的預期行為。
- 行動(Act):我們在安排好的條件下執行被測試的函式。
- 斷言(Assert):我們驗證結果是否符合我們的預期。
執行 cargo test
指令,我們會看到測試透過:
running 1 test
test get_team_processes_tests::do_something_fail ... ok
模擬的強大之處
模擬技術的強大之處在於它能夠隔離我們的邏輯。想象一下,如果 do_something
函式需要資料函式庫特定狀態才能執行。例如,如果它處理資料函式庫團隊成員數量,那麼這些團隊可能需要預先存在於資料函式庫
但如果我們使用模擬,就不需要設定這些前置條件。這帶來幾個好處:
- 我們不需要每次測試前都重新設定資料函式庫. 測試執行更快,因為不需要資料函式庫
- 測試變得更加原子化,可以獨立重複執行
- 我們可以更輕鬆地測試各種邊界情況
這就是為什麼採用測試驅動開發(TDD)的開發者通常能夠更快速地開發,與引入的錯誤更少。
模擬非同步程式碼
到目前為止,我們處理的都是同步程式碼。但在實際應用中,尤其是網頁開發,你很可能會使用大量的非同步函式。接下來,我們看如何測試非同步程式碼。
首先,我們需要在測試函式中有一個非同步執行時。在這個例子中,我們將使用 Tokio:
[dependencies]
tokio = { version = "1.34.0", features = ["full"] }
[dev-dependencies]
mockall = "0.11.4"
由於我們現在處理的是非同步程式碼,我們可以簡化 trait。我們不再需要兩個函式,因為非同步函式會回傳一個可以等待的處理器:
use std::future::Future;
pub trait AsyncProcess<X, Z{
fn get_result(&self, key: X) -impl Future<Output = Result<Z, String>+ Send + 'static;
}
get_result
函式回傳一個 future,而不是直接使用 async fn
。這種解構讓我們能夠更好地控制 future 實作的特性。
我們的 do_something
函式也需要重新定義:
async fn do_something<T>(async_handle: T, input: i32) -Result<i32, String>
where T: AsyncProcess<i32, i32+ Send + Sync + 'static
{
println!("something is happening");
let result: i32 = async_handle.get_result(input).await?;
if result 10 {
return Err("result is too big".to_string());
}
if result == 8 {
return Ok(result * 2)
}
Ok(result * 3)
}
現在讓我們來測試這個非同步函式。首先,我們需要調整模擬定義:
mock! {
DatabaseHandler {}
impl AsyncProcess<i32, i32for DatabaseHandler {
fn get_result(&self, key: i32) -impl Future<Output = Result<i32, String>+ Send + 'static;
}
}
然後在測試中,我們需要建立一個執行時並阻塞等待非同步函式完成:
#[test]
fn do_something_fail() {
let mut handle = MockDatabaseHandler::new();
// 設定 get_result 函式的期望行為
handle.expect_get_result()
.with(eq(4))
.returning(|_| {
Box::pin(async move { Ok(11) })
});
// 建立執行時並執行非同步函式
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let outcome = runtime.block_on(do_something(handle, 4));
assert_eq!(outcome, Err("result is too big".to_string()));
}
在這個測試中,我們建立了一個模擬的 DatabaseHandler
,設定了 get_result
函式的期望行為,然後建立了一個 Tokio 執行時來執行我們的非同步函式。
測試死鎖問題
非同步程式的另一個常見問題是死鎖。死鎖發生在非同步任務因鎖定而無法完成,沒有任何繼續執行的途徑。
一個典型的死鎖場景是:兩個非同步任務嘗試以相反的順序存取相同的兩個鎖。例如,任務一取得鎖一,任務二取得鎖二,然後任務一嘗試取得鎖二,任務二嘗試取得鎖一。這會導致兩個任務都永遠等待對方釋放鎖,從而形成死鎖。
這種死鎖不僅會阻塞這兩個任務,還會阻塞需要存取這些鎖的其他任務,最終可能導致整個系統癱瘓。因此,測試死鎖非常重要。
以下是一個測試死鎖的例子:
#[cfg(test)]
mod tests {
use tokio::sync::Mutex;
use std::sync::Arc;
use tokio::time::{sleep, Duration, timeout};
#[tokio::test]
async fn test_deadlock_detection() {
// 建立兩個互斥鎖
let resource1 = Arc::new(Mutex::new(0));
let resource2 = Arc::new(Mutex::new(0));
let resource1_clone = Arc::clone(&resource1);
let resource2_clone = Arc::clone(&resource2);
// 生成兩個任務,以不同順序取得鎖
let handle1 = tokio::spawn(async move {
let _lock1 = resource1.lock().await;
sleep(Duration::from_millis(100)).await;
let _lock2 = resource2.lock().await;
});
let handle2 = tokio::spawn(async move {
let _lock2 = resource2_clone.lock().await;
sleep(Duration::from_millis(100)).await;
let _lock1 = resource1_clone.lock().await;
});
// 設定超時,避免測試無限期掛起
let result = timeout(Duration::from_secs(5), async {
let _ = handle1.await;
let _ = handle2.await;
}).await;
// 檢查是否有死鎖
assert!(result.is_ok(), "A potential deadlock detected!");
}
}
在這個測試中,我們建立了兩個任務,分別以不同的順序取得兩個鎖。我們透過 sleep
函式確保兩個任務都有時間取得它們的第一個鎖,然後嘗試取得第二個鎖。為了避免測試無限期掛起,我們設定了一個超時。如果兩個非同步任務在 5 秒內沒有完成,我們可以斷定發生了死鎖。
執行測試後,我們會看到:
thread 'tests::test_deadlock_detection'
panicked at 'A potential deadlock detected!', src/main.rs:43:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
test tests::test_deadlock_detection ... FAILED
failures:
tests::test_deadlock_detection
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 5.01s
這表明我們成功檢測到了死鎖,並且能夠確定是哪個函式導致了死鎖。
測試最佳實踐
透過以
活鎖與競爭條件:並發程式設計的隱形殺手
活鎖問題:動卡死的並發陷阱
活鎖(Livelock)與死鎖(Deadlock)有著類別似的最終效果——它們都會讓系統陷入停滯。但兩者有著本質的區別:在死鎖中,多個非同步任務相互等待而完全停止;而活鎖則是多個非同步任務持續互相回應但無法取得實質性進展。
讓我舉個具體例子說明:活鎖就像兩個非同步任務不斷地互相回應相同的訊息,形成無限迴圈。這就像現實生活中的一個經典類別比:
- 死鎖:兩個人在走廊相遇,雙方都站著不動,等待對方讓路,但誰也不讓步
- 活鎖:兩個人在走廊相遇,雙方不斷地向同一側讓路,卻總是選擇相同的方向,結果依然無法透過
雖然我們應該不惜一切代價避免死鎖,但死鎖的發生通常很明顯,因為系統會完全停止運作。然而,更危險的是那些默產生錯誤而我們卻不知情的程式碼問題。這就是為什麼我們需要特別針對競爭條件進行測試。
競爭條件的測試
競爭條件(Race Condition)發生在資料狀態被改變,但對該狀態的參照已過時的情況。一個簡單的資料競爭例子如下圖所示:兩個非同步任務從資料儲存中取得數字並將其加一。由於第二個任務在第一個任務更新資料儲存之前就取得了資料,所以兩個任務都是從10開始增加,最終結果變成11,而正確結果應該是12。
在本章中,我們透過互斥鎖(mutex)或特定的原子操作(atomic operations)來防止資料競爭。比較和更新(compare and update)這類別原子操作是防止上述競爭條件的最簡單方法。然而,雖然預防競爭條件是最佳做法,但具體如何預防並不總是那麼明確,因此我們需要探索如何測試程式碼中的資料競爭。
讓我們來看一個測試的基本框架:
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{sleep, Duration};
use tokio::runtime::Builder;
static COUNTER: AtomicUsize = AtomicUsize::new(0);
async fn unsafe_add() {
let value = COUNTER.load(Ordering::SeqCst);
COUNTER.store(value + 1, Ordering::SeqCst);
}
#[test]
fn test_data_race() {
// 測試實作會在這裡
}
}
我們沒有使用原子加法,而是取得數字、增加數字,然後設定新值,這樣就有了圖中所示的競爭條件可能。在測試函式中,我們可以建立一個單執行緒的執行環境並啟動10,000個unsafe_add
任務。完成這些任務後,我們可以斷言COUNTER的值為10,000:
let runtime = Builder::new_current_thread().enable_all()
.build()
.unwrap();
let mut handles = vec![];
let total = 100000;
for _ in 0..total {
let handle = runtime.spawn(unsafe_add());
handles.push(handle);
}
for handle in handles {
runtime.block_on(handle).unwrap();
}
assert_eq!(
COUNTER.load(Ordering::SeqCst),
total,
"race condition occurred!"
);
如果我們執行測試,會發現測試透過了。這是因為執行環境只有一個執行緒,而與在取得和設定操作之間沒有非同步操作。但如果我們將執行環境改為多執行緒:
let runtime = tokio::runtime::Runtime::new().unwrap();
我們會得到以下錯誤:
thread 'tests::test_data_race' panicked at
'assertion failed: `(left == right)`
left: `99410`,
right: `100000`: race condition occurred!'
部分任務成為了競爭條件的受害者。如果我們在讀取和寫入之間放入另一個非同步函式,例如睡眠函式:
let value = COUNTER.load(Ordering::SeqCst);
sleep(Duration::from_secs(1)).await;
COUNTER.store(value + 1, Ordering::SeqCst);
我們會得到更嚴重的錯誤:
thread 'tests::test_data_race' panicked at
'assertion failed: `(left == right)`
left: `1`,
right: `100000`: Race Condition occurred!'
幾乎所有任務都遭遇了競爭條件。這是因為所有任務最初都在任何任務寫入COUNTER之前讀取了COUNTER,非同步睡眠將控制權交還給執行器,讓其他非同步任務得以讀取COUNTER。
即使在單執行緒環境中,如果睡眠是非阻塞的,也會出現這種結果。這突顯了在測試競爭條件時使用多執行緒測試環境的必要性。我們還可以體會到改變任務引數的速度以及測試引數變化對任務的影響。
通道容量的測試
我們知道互斥鎖和原子值不是讓多個任務存取資料的唯一方式。我們可以使用通道(channel)在非同步任務之間傳送資料。因此,我們需要測試通道的容量。
在某些例子中我們使用了無界通道(unbound channels),但有時我們希望限制通道的最大小以防止過度消耗記憶體。然而,如果通道達到其最大限制,傳送者就無法向通道傳送更多訊息。我們可能有一個系統,如一組actors,需要檢視系統是否在傳送過多訊息時堵塞。根據需求,我們可能需要系統減速,直到所有訊息都被處理,但測試系統以瞭解它在我們的使用案例中的工作方式是很好的做法。
以下是我們的測試佈局:
#[cfg(test)]
mod tests {
use tokio::sync::mpsc;
use tokio::time::{Duration, timeout};
use tokio::runtime::Builder;
#[test]
fn test_channel_capacity() {
// 測試實作會在這裡
}
}
在測試函式中,我們定義了執行環境和容量為五的通道:
let runtime = Builder::new_current_thread().enable_all()
.build()
.unwrap();
let (sender, mut receiver) = mpsc::channel::<i32>(5);
然後我們啟動一個任務,該任務傳送的訊息數量超過通道的容量:
let sender = runtime.spawn(async move {
for i in 0..10 {
sender.send(i).await.expect("Failed to send message");
}
});
我們想透過超時測試來檢視我們的系統是否會當機:
let result = runtime.block_on(async {
timeout(Duration::from_secs(5), async {
sender.await.unwrap();
}).await
});
assert!(result.is_ok(), "A potential filled channel is not handled correctly");
現在,我們的測試會失敗,因為傳送者的future永遠不會完成,因此超過了我們設定的超時間。要使測試透過,我們需要在超時測試之前加入接收者的future:
let receiver = runtime.spawn(async move {
let mut i = 0;
while let Some(msg) = receiver.recv().await {
assert_eq!(msg, i);
i += 1;
println!("Got message: {}", msg);
}
});
現在執行測試,它會透過。雖然我們測試的只是一個簡單的傳送者和接收者系統,但必須認識到我們的測試揭示了系統會因為沒有正確處理訊息而陷入停滯。通道也可能導致死鎖,就像互斥鎖一樣,如果我們的測試足夠充分,超時測試也會揭示死鎖。
通道使我們能夠在系統中非同步地分享資料。當涉及到跨程式和電腦分享資料時,我們可以使用網路協定。在實際環境中,你肯定會編寫與使用協定的伺服器互動的非同步程式碼。這些互動也需要測試。
網路互動的測試
在開發過程中測試網路互動時,可能會想在本地啟動伺服器並依賴該伺服器。然而,這對測試來說可能不好。例如,如果我們有一個操作在伺服器上刪除一行,那麼在執行測試一次後,我們就無法立即再次執行它,因為該行已被刪除。我們可以建立一個插入該行的步驟,但如果該行有依賴關係,則會變得更複雜。此外,cargo test
命令在多個程式上執行。如果幾個測試同時存取同一伺服器,我們可能會在伺服器上遇到資料競爭條件。
這就是我們使用mockito的原因。這個crate使我們能夠直接在測試中模擬伺服器,並斷言伺服器端點是否被以某些引數呼叫。對於網路測試的例子,我們需要以下依賴:
[dependencies]
tokio = { version = "1.34.0", features = ["full"] }
reqwest = { version = "0.11.22", features = ["json"] }
[dev-dependencies]
mockito = "1.2.0"
我們的測試框架如下:
#[cfg(test)]
mod tests {
use tokio::runtime::Builder;
use mockito::Matcher;
use reqwest;
#[test]
fn test_networking() {
// 測試實作會在這裡
}
}
在測試函式中,我們啟動測試伺服器:
let mut server = mockito::Server::new();
let url = server.url();
在這裡,mockito會找到電腦上當前未使用的連線埠。如果我們向URL傳送請求,我們的模擬伺服器可以跟蹤它們。請記住,我們的伺服器在測試函式的範圍內,因此測試完成後,模擬伺服器將被終止。這樣我們的測試就真正保持了原子性。
然後我們為伺服器端點定義一個模擬:
let mock = server.mock("GET", "/my-endpoint")
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("param1".into(), "value1".into()),
Matcher::UrlEncoded("param2".into(), "value2".into()),
]))
.with_status(201)
.with_body("world")
.expect(5)
.create();
我們的模擬有端點/my-endpoint
。我們的模擬還預期URL中有某些引數。我們的模擬將回傳回應碼201和正文world。我們還預期我們的伺服器會被存取五次。我們可以新增更多端點,但在這個例子中我們只使用一個,以避免使章節過於臃腫。
現在我們的模擬伺服器已經建立,我們定義執行環境:
let runtime = Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
let mut handles = vec![];
一切準備就緒,我們向執行環境傳送五個非同步任務:
for _ in 0..5 {
let url_clone = url.clone();
handles.push(runtime.spawn(async move {
let client = reqwest::Client::new();
client.get(&format!(
"{}/my-endpoint?param1=value1¶m2=value2",
url_clone)).send().await.unwrap()
}));
}
最後,我們可以阻塞執行緒等待非同步任務完成,然後斷言模擬:
for handle in handles {
runtime.block_on(handle).unwrap();
}
mock.assert();
我們可以斷言所有非同步任務都達成了存取伺服器的目標。如果其中一個任務失敗了,我們的測試會失敗。
在定義請求的URL時,最好將定義設為動態而非硬編碼。這使我們能夠根據是向活躍伺服器、本地伺服器還是模擬伺服器發出請求來改變URL的主機部分。使用環境變數很誘人,但在測試中,這可能會在cargo test
的多執行緒環境中造成問題。相反,最好定義一個提取設定變數的trait。然後我們傳遞實作此提取設定變數trait的結構。在將檢視函式繫結到活躍伺服器時,我們可以傳入一個從環境中提取設定變數的結構。然而,在測試呼叫其他伺服器的函式和檢視時,我們也可以將mockito URL傳入提取設定變數trait的實作中。
mockito還有更多功能
Rust 中的非同步測試:探索 Mutex 與 Future 行為
理解非同步互斥鎖的工作原理
在非同步 Rust 程式中,我們經常需要處理分享資源的並發存取題。下面的程式碼展示了一個非同步互斥鎖 (Mutex) 的使用範例:
async fn async_mutex_locker(mutex: Arc<Mutex<i32>>) -() {
let mut lock = mutex.lock().await;
*lock += 1;
sleep(Duration::from_millis(1)).await;
}
這個函式接收一個包裝在 Arc
中的 Mutex<i32>
,取得鎖,將值加 1,然後等待 1 毫秒。這個簡單的函式將幫助我們理解非同步互斥鎖的行為方式。
測試非同步互斥鎖的行為
在測試環境中,我們可以精確控制非同步任務的執行順序,這讓我們能夠深入探索互斥鎖的行為:
#[tokio::test]
async fn test_monitor_file_metadata() {
let mutex = Arc::new(Mutex::new(0));
let mutex_clone1 = mutex.clone();
let mutex_clone2 = mutex.clone();
let mut future1 = spawn(async_mutex_locker(mutex_clone1));
let mut future2 = spawn(async_mutex_locker(mutex_clone2));
assert_pending!(future1.poll());
assert_pending!(future2.poll());
// 測試互斥鎖的互斥行為
for _ in 0..10 {
assert_pending!(future2.poll());
sleep(Duration::from_millis(1)).await;
}
assert_eq!(future1.poll(), Poll::Ready(()));
// 即使等待足夠長的時間,第二個 future 仍然無法取得鎖
sleep(Duration::from_millis(3)).await;
assert_pending!(future2.poll());
// 釋放第一個 future 後,第二個 future 應該能夠完成
drop(future1);
sleep(Duration::from_millis(1)).await;
assert_eq!(future2.poll(), Poll::Ready(()));
// 確認最終值
let lock = mutex.lock().await;
assert_eq!(*lock, 2);
}
測試的關鍵概念
這個測試展示了幾個重要的非同步測試技巧:
控制執行順序:透過手動呼叫
poll()
方法,我們可以精確控制非同步任務的執行順序。這在測試中非常有價值,因為在實際系統中,執行順序通常是不確定的。觀察鎖的行為:我們可以確認第一個 future 取得鎖後,第二個 future 無法取得鎖,直到第一個 future 完成並釋放鎖。
驗證假設:透過斷言,我們可以驗證關於互斥鎖行為的假設,例如:
- 當一個 future 持有鎖時,其他 future 無法取得鎖
- 只有當持有鎖的 future 被丟棄時,其他 future 才能取得鎖
檢查最終狀態:測試最後,我們確認互斥鎖中的值已經被正確修改為 2。
凍結與檢查非同步系統的狀態
這種測試方法的強大之處在於它允許我們「凍結」非同步系統的狀態,逐步檢查系統行為,並在需要時改變執行順序。這給了我們極大的靈活性,可以測試各種邊緣情況和競爭條件。
例如,我們可以:
- 測試不同的 future 輪詢順序對系統行為的影響
- 檢查資源取得和釋放的正確性
- 驗證死鎖和活鎖的處理機制
- 測試錯誤條件下的系統還原能力
測試驅動開發的重要性
在玄貓的實踐經驗中,我發現在開發非同步 Rust 專案時,同步構建測試是非常重要的。這種方法可以幫助你:
- 及早發現並解決並發問題
- 確保系統在各種條件下都能正確執行
- 簡化複雜非同步邏輯的除錯過程
- 維持快速的開發節奏
非同步 Rust 的威力
Rust 的非同步程式設計模型提供了強大的工具,讓我們能夠構建高效、可靠的並發系統。透過掌握非同步 Rust 的基礎知識和測試技術,你可以將複雜的問題分解為非同步概念,並實作強大、高效的解決方案。
隨著你對非同步 Rust 的深入理解,你會發現它不僅是一個技術工具,更是一種思考問題的新方式。它讓我們能夠以更優雅、更高效的方式處理複雜的並發問題,真正體現 Rust 語言的美學和實用性。
非同步 Rust 的世界充滿活力和創新,我期待著看到你用它構建的下一代應用程式和系統。
深入探索 Rust 的非同步開發:從 Futures 到高效應用
在現代軟體開發中,非同步程式設計已成為構建高效能系統的關鍵技術。Rust 語言提供了強大的非同步機制,能夠幫助開發者建立既安全又高效的並發程式。本文將探討 Rust 非同步生態系統的核心元素,包括 Futures、協程、反應式系統以及非同步執行時等關鍵概念。
Futures 的核心概念與運作機制
Futures 是 Rust 非同步程式設計的基礎。它們代表尚未完成但最終會產生值的計算。我在處理大型分散式系統時,發現理解 Futures 的內部機制對於有效利用非同步程式設計至關重要。
Futures 的生命週期與狀態管理
Futures 在 Rust 中的實作依賴於輪詢(polling)機制。當一個 Future 被輪詢時,它要麼回傳「就緒」狀態並提供結果,要麼回傳「等待」狀態,表示尚未準備好。這種機制讓 Rust 能夠有效地管理大量並發任務而不依賴於傳統的執行緒模型。
// Future 的基本結構
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output>;
}
// Poll 的兩種可能狀態
pub enum Poll<T{
Ready(T),
Pending,
}
這個簡單的介面讓 Rust 能夠建立複雜的非同步工作流程。當輪詢回傳 Pending
時,Future 會註冊一個 waker,以便在準備就緒時通知執行時系統再次進行輪詢。
釘住(Pinning)與記憶體安全
在處理 Futures 時,一個關鍵概念是「釘住(pinning)」。這是 Rust 確保記憶體安全的機制,特別是對於自參照結構。
// 將 Future 釘住到記憶體中
let mut future = Box::pin(async {
// 非同步操作
});
// 輪詢釘住的 Future
let waker = // ...
let mut context = Context::from_waker(&waker);
let result = future.as_mut().poll(&mut context);
釘住機制解決了非同步程式設計中的一個核心問題:如何安全地處理包含自參照的複雜資料結構。我在構建高效能網路服務時,這一機制幫助我避免了許多潛在的記憶體問題。
Waker 機制與遠端喚醒
Waker 是 Rust 非同步系統中的另一個關鍵元件,它允許執行時在 Future 準備好時得到通知。當 Future 從其他執行緒被喚醒時,這一機制尤為重要。
// 從另一個執行緒喚醒 Future
let waker_clone = waker.clone();
std::thread::spawn(move || {
// 執行某些操作...
waker_clone.wake(); // 通知 Future 已準備好
});
這種方式讓 Rust 能夠有效地處理 I/O 密集型任務,如網路通訊和檔案操作,而不會阻塞主執行緒。
在 Rust 中構建非同步佇列
非同步佇列是許多高效系統的基礎元件,用於處理工作負載並在不同元件間傳輸資料。以下是如何使用 Rust 構建一個基本的非同步佇列。
使用 futures-lite 實作非同步佇列
use futures_lite::future;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
struct Queue<T{
queue: Mutex<Vec<T>>,
waker: Mutex<Option<Waker>>,
}
impl<TQueue<T{
fn new() -Self {
Queue {
queue: Mutex::new(Vec::new()),
waker: Mutex::new(None),
}
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
// 喚醒等待的消費者
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
}
async fn pop(&self) -T {
future::poll_fn(|cx| {
let mut queue = self.queue.lock().unwrap();
if let Some(item) = queue.pop() {
return Poll::Ready(item);
}
// 儲存 waker 以便稍後喚醒
let mut waker = self.waker.lock().unwrap();
*waker = Some(cx.waker().clone());
Poll::Pending
}).await
}
}
這個實作展示瞭如何使用 Rust 的基本同步原語和 Future 機制來建立一個非同步佇列使用 Mutex
來保護分享資料,並使用 Waker
來通知等待的消費者。
任務竊取與效能最佳化
在構建複雜系統時,任務竊取是提高處理效率的重要技術。這允許空閒的處理單元從忙碌的單元「竊取」工作,從而更均勻地分配負載。
// 實作任務竊取的簡化版本
struct WorkStealer<T{
local_queue: Queue<T>,
global_queue: Arc<Queue<T>>,
// 其他工作竊取邏輯...
}
任務竊取對於最大化系統資源利用率至關重要,特別是在處理不均勻工作負載時。我在最佳化一個大型資料處理系統時,透過實作工作竊取策略,將處理速度提高了近 40%。
協程(Coroutines)與流程控制
協程為 Rust 中的非同步程式設計提供了一種更自然的表達方式,特別是對於複雜的控制流。
協程與非同步任務的比較
協程與標準的非同步任務有著根本的區別。協程可以在執行過程中暫停並稍後還原,而保持其內部狀態,這使得它們特別適合於表達複雜的控制流程。
// 基本協程實作的概念示意
trait Coroutine {
type Yield;
type Return;
fn resume(&mut self) -CoroutineState<Self::Yield, Self::Return>;
}
enum CoroutineState<Y, R{
Yielded(Y),
Complete(R),
}
在處理需要交錯執行的任務時,協程提供了比標準非同步任務更自然的抽象。我曾經使用協程重構了一個複雜的資料處理管道,程式碼行數減少了 30%,同時提高了可讀性和可維護性。
實作生成器模式
生成器是協程的一種特殊形式,專注於產生一系列值。它們特別適用於處理大型資料流或實作懶惰評估。
// 使用協程實作簡單的生成器
fn generate_numbers(max: usize) -impl Coroutine<Yield=usize, Return=(){
let mut current = 0;
move || {
while current < max {
let value = current;
current += 1;
yield value;
}
}
}
這種模式在處理大型檔案傳輸或資料流時特別有用,因為它允許按需產生資料而不需要一次載入所有內容到記憶體中。
協程的實際應用
協程在實際應用中有許多用途,從檔案 I/O 到網路通訊。例如,我們可以使用協程來實作大型檔案的高效處理:
// 使用協程處理大型檔案
async fn process_large_file(path: &str) {
let file = File::open(path).await.unwrap();
let reader = BufReader::new(file);
let mut processor = process_chunks().into_coroutine();
let mut buffer = [0u8; 4096];
loop {
let bytes_read = reader.read(&mut buffer).await.unwrap();
if bytes_read == 0 { break; }
processor.resume_with(&buffer[..bytes_read]);
}
}
這種方法允許以小塊處理大型檔案,避免了將整個檔案載入記憶體的需要,同時保持程式碼的可讀性和簡潔性。
構建反應式系統
反應式系統設計模式允許應用程式對變化做出動態回應,非常適合於事件驅動的應用場景。
觀察者模式實作
觀察者模式是反應式系統的基礎。在這種模式中,「主題」維護對「觀察者」的參照,並在狀態變更時通知它們。
// 簡化的觀察者模式實作
struct Subject<T{
value: T,
observers: Vec<Box<dyn Fn(&T)>>,
}
impl<TSubject<T{
fn new(value: T) -Self {
Subject {
value,
observers: Vec::new(),
}
}
fn subscribe<F>(&mut self, observer: F)
where
F: Fn(&T) + 'static,
{
self.observers.push(Box::new(observer));
}
fn set_value(&mut self, value: T) {
self.value = value;
for observer in &self.observers {
observer(&self.value);
}
}
}
這種模式在構建互動式系統時特別有用,例如使用者介面或監控系統。
事件匯流排實作
事件匯流排為反應式系統提供了更加解耦的通訊機制,允許元件之間透過發布和訂閱事件進行互動。
// 簡化的事件匯流排實作
struct EventBus<T{
subscribers: HashMap<String, Vec<Box<dyn Fn(&T)>>>,
}
impl<T: CloneEventBus<T{
fn new() -Self {
EventBus {
subscribers: HashMap::new(),
}
}
fn subscribe<F>(&mut self, topic: &str, callback: F)
where
F: Fn(&T) + 'static,
{
let subscribers = self.subscribers.entry(topic.to_string())
.or_insert_with(Vec::new);
subscribers.push(Box::new(callback));
}
fn publish(&self, topic: &str, message: T) {
if let Some(subscribers) = self.subscribers.get(topic) {
for callback in subscribers {
callback(&message);
}
}
}
}
事件匯流排特別適合於大型系統,其中多個元件需要回應特定類別的事件而不需要直接知道其他元件的存在。
Actor 模型與訊息傳遞
Actor 模型是一種並發程式設計正規化,其中每個「actor」是一個獨立的計算單元,透過訊息傳遞與其他 actor 通訊。
Actor 模型基礎
在 Actor 模型中,每個 actor 維護自己的狀態,並透過處理接收到的訊息來改變這個狀態。這種方法消除了分享狀態帶來的許多並發問題。
// 簡化的 Actor 實作
struct Actor<T{
state: T,
mailbox: mpsc::Receiver<Box<dyn FnOnce(&mut T) + Send>>,
}
impl<T: Send + 'staticActor<T{
fn new(initial_state: T) -(Self, mpsc::Sender<Box<dyn FnOnce(&mut T) + Send>>) {
let (sender, receiver) = mpsc::channel();
(
Actor {
state: initial_state,
mailbox: receiver,
},
sender,
)
}
async fn run(&mut self) {
while let Some(message) = self.mailbox.recv().await {
message(&mut self.state);
}
}
}
這種模式使得編寫並發程式變得更加簡單,因為開發者可以專注於單個 actor 的行為而不必擔心同步問題。
路由器模式與監督
在更複雜的系統中,我們可能需要實作路由器模式來管理多個 actor,以及監督機制來處理錯誤。
// Actor 路由器範例
struct ActorRouter<K, T{
actors: HashMap<K, mpsc::Sender<Box<dyn FnOnce(&mut T) + Send>>>,
}
impl<K: Eq + Hash, T: Send + 'staticActorRouter<K, T{
fn new() -Self {
ActorRouter {
actors: HashMap::new(),
}
}
fn add_actor(&mut self, key: K, initial_state: T) {
let (actor, sender) = Actor::new(initial_state);
self.actors.insert(key, sender);
// 在背景執行 actor
tokio::spawn(async move {
actor.run().await;
});
}
async fn send<F>(&self, key: &K, message: F)
where
F: FnOnce(&mut T) + Send + 'static,
{
if let Some(sender) = self.actors.get(key) {
let message_box = Box::new(message);
sender.send(message_box).await.unwrap();
}
}
}
在實際系統中,我曾經使用這種模式來構建一個高度可擴充套件的微服務架構,其中每個服務是一個
非同步Rust的核心機制與應用
非同步程式設計已成為現代系統開發的根本,而Rust憑藉其記憶體安全和高效能特性,在非同步處理領域展現出獨特優勢。作為一位長期使用Rust開發分散式系統的工程師,我發現非同步Rust不僅提供了強大的並發處理能力,還能優雅地解決傳統多執行緒開發中的許多痛點。
非同步程式設計的Rust實踐
Rust的非同步程式設計核心在於其futures系統與await關鍵字。透過這套機制,開發者可以撰寫看似同步的程式碼,但實際執行時卻能充分利用非同步處理的效率優勢。
在Rust中,非同步程式的基本元素包括:
- Future特徵:代表尚未完成的計算,可以被輪詢(poll)以檢查完成狀態
- await關鍵字:暫停執行直到Future完成
- 任務(Tasks):可獨立排程的工作單元
- 執行時期(Runtime):管理和執行非同步任務的環境
以下是一個簡單的非同步函式範例:
async fn fetch_data() -Result<String, Error{
// 非同步I/O操作
let response = make_request().await?;
// 處理回應
Ok(process_response(response))
}
這段程式碼看起來像是同步執行的,但await
關鍵字讓程式可以在等待I/O操作時釋放執行緒,處理其他任務。這就是非同步Rust的魅力所在—寫起來像同步程式,執行效率卻是非同步的。
任務處理與執行模型
在非同步Rust中,任務(Tasks)是基本的執行單位。當我們使用spawn
函式建立新任務時,實際上是將一個Future提交給執行器(executor)處理:
// 建立一個新任務
tokio::spawn(async {
let result = fetch_data().await;
process_result(result);
});
每個任務可以獨立執行,並在需要等待資源時讓出執行權。這種模型特別適合I/O密集型應用,如網路伺服器或資料函式庫池。
Tokio作為Rust生態系統中最流行的非同步執行時期,提供了許多實用功能:
- spawn_blocking:用於執行CPU密集型任務,避免阻塞事件迴圈
- local pools:為特定任務類別最佳化的執行池
- join巨集:等待多個futures完成
非同步程式設計中的核心特徵與機制
Future特徵的深度解析
Future特徵是Rust非同步程式設計的核心。它代表一個可能尚未完成的值,並提供了poll
方法讓執行器檢查其完成狀態:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output>;
}
當我們使用await
關鍵字時,Rust編譯器會自動生成狀態機程式碼,反覆呼叫poll
直到Future完成。這個機制賦予了Rust非同步程式極高的效能與靈活性。
在實際開發中,我們很少直接實作Future特徵,因為:
- 手動實作狀態機相當複雜
async
/await
語法提供了更簡潔的抽象- 大多數情況下,組合現有的futures更加實用
喚醒機制(Wakers)與遠端控制
非同步Rust的一個關鍵創新是其喚醒機制。當Future無法立即完成時,它會註冊一個waker,在準備好繼續執行時通知執行器:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output{
// 檢查操作是否完成
if self.is_ready() {
Poll::Ready(self.get_result())
} else {
// 註冊waker,當資源準備好時通知執行器
self.register_waker(cx.waker().clone());
Poll::Pending
}
}
這種機制允許非同步操作在遠端完成時(例如網路請求收到回應)喚醒相關任務,而不需要持續輪詢或阻塞執行緒。
Unpin特徵與記憶體安全
Pin<&mut T>是Future特徵中的一個重要元素,它解決了自參照結構在非同步程式中的安全問題。當Future被await時,它可能會在堆積積疊上移動,這對於包含自參照的Future可能導致記憶體安全問題。
Pin提供了一種保證,確保被固定的值在記憶體中不會被移動。除非類別實作了Unpin特徵,否則無法安全地取得&mut T參照。
// 這個Future無法安全移動,因此需要Pin
struct SelfReferential {
data: String,
// 指向data內部的指標
ptr: *const u8,
}
// 這個Future可以安全移動,實作了Unpin
#[derive(Unpin)]
struct MovableFuture {
data: Vec<u8>,
}
建立自訂非同步系統
實作基礎非同步佇列
在某些專案中,我們需要更精細的控制,這時候實作自訂的非同步原語就變得必要。以下是一個簡化的非同步佇列實作:
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
struct AsyncQueue<T{
queue: Arc<Mutex<VecDeque<T>>>,
wakers: Arc<Mutex<Vec<Waker>>>,
}
impl<TAsyncQueue<T{
fn new() -Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
wakers: Arc::new(Mutex::new(Vec::new())),
}
}
// 將專案推入佇列並喚醒等待的任務
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
// 喚醒所有等待的任務
let mut wakers = self.wakers.lock().unwrap();
for waker in wakers.drain(..) {
waker.wake();
}
}
// 建立一個Future來等待並取出專案
fn pop(&self) -PopFuture<T{
PopFuture {
queue: self.queue.clone(),
wakers: self.wakers.clone(),
}
}
}
// 代表從佇列中取出專案的Future
struct PopFuture<T{
queue: Arc<Mutex<VecDeque<T>>>,
wakers: Arc<Mutex<Vec<Waker>>>,
}
impl<TFuture for PopFuture<T{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -Poll<Self::Output{
// 嘗試從佇列取出專案
let mut queue = self.queue.lock().unwrap();
if let Some(item) = queue.pop_front() {
return Poll::Ready(item);
}
// 如果佇列為空,註冊waker以便稍後通知
let mut wakers = self.wakers.lock().unwrap();
wakers.push(cx.waker().clone());
Poll::Pending
}
}
這個實作展示了非同步Rust的核心概念:
- 使用Arc<Mutex
>在多個任務間分享狀態 - 實作Future特徵來建立可等待的操作
- 使用Waker機制在資源可用時通知等待的任務
協程(Coroutines)與非同步Rust的關係
協程是非同步程式設計的另一種實作方式,與Rust的Future有許多相似之處。事實上,Rust的async/await語法糖底層就是根據協程的概念。
協程可以看作是可暫停和還原的函式,它們提供了一種更直觀的方式來表達非同步邏輯。在Rust中,協程透過Coroutine特徵表示:
trait Coroutine {
type Yield;
type Return;
fn resume(
self: Pin<&mut Self>,
) -CoroutineState<Self::Yield, Self::Return>;
}
這個特徵與Future非常相似,主要區別在於協程可以多次產生(yield)值,而Future只能回傳一次結果。
在Rust的演進過程中,協程的概念逐漸演變為現在的Future系統。早期的Generator特徵(後來重新命名為Coroutine)與現在的Future特徵有許多共通之處,這不是巧合,而是設計上的延續。
整合網路功能與自訂執行時期
根據mio的非同步網路處理
在開發自訂非同步執行時期時,mio(metal IO)函式庫個極為重要的基礎元件。它提供了跨平台的非阻塞I/O原語,是許多高效能Rust網路函式庫石。
以下是使用mio建立簡單TCP伺服器的範例:
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll, Token};
use std::collections::HashMap;
use std::io::{self, Read, Write};
const SERVER: Token = Token(0);
fn main() -io::Result<(){
// 建立輪詢器和事件容器
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
// 設定TCP監聽器
let addr = "127.0.0.1:8080".parse().unwrap();
let mut server = TcpListener::bind(addr)?;
// 註冊伺服器以接收連線通知
poll.registry().register(&mut server, SERVER, Interest::READABLE)?;
// 追蹤連線
let mut connections = HashMap::new();
let mut unique_token = Token(SERVER.0 + 1);
// 事件迴圈
loop {
poll.poll(&mut events, None)?;
for event in events.iter() {
match event.token() {
SERVER ={
// 接受新連線
let (mut connection, address) = server.accept()?;
println!("接受來自 {} 的連線", address);
let token = unique_token;
unique_token = Token(unique_token.0 + 1);
// 註冊連線以接收讀取通知
poll.registry().register(
&mut connection,
token,
Interest::READABLE.add(Interest::WRITABLE),
)?;
connections.insert(token, connection);
}
token ={
// 處理連線事件
if let Some(connection) = connections.get_mut(&token) {
if event.is_readable() {
let mut buffer = [0; 1024];
match connection.read(&mut buffer) {
Ok(0) ={
// 連線關閉
connections.remove(&token);
continue;
}
Ok(n) ={
println!("讀取 {} 位元組", n);
// 這裡可以處理收到的資料
}
Err(e) ={
println!("讀取錯誤: {}", e);
connections.remove(&token);
continue;
}
}
}
if event.is_writable() {
// 可以寫入資料到連線
// ...
}
}
}
}
}
}
}
這個範例展示瞭如何使用mio建立事件驅動的網路伺服器,這是開發非同步執行時期的基礎。
AsyncRead特徵與非同步I/O
為了在非同步環境中處理I/O操作,Tokio提供了AsyncRead和AsyncWrite特徵,這些特徵是標準Read和Write特徵的非同步版本:
pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -Poll<io::Result<()>>;
}
實作這些特徵允許類別在非同步環境中進行I/O操作,不會阻塞執行緒。例如,TcpStream的非同步版本實作了AsyncRead,使我們可以使用await等待讀取操作完成:
async fn handle_connection(