在 Rust 的非同步程式設計中,裝飾器(Decorator)狀態機(State Machine) 設計模式能夠提升程式的模組化與可讀性,特別是在處理複雜的非同步工作流程時。裝飾器模式允許我們動態地為非同步函式新增功能,而狀態機則是管理非同步流程的一種強大方式,可確保程式在不同狀態之間安全地轉換。

Rust中的裝飾器與狀態機設計模式:裝飾器模式的實際應用

在前面的程式碼中,我們看到了裝飾器模式的完整實作。這段程式碼首先完成了執行入口點:

fn main() {
    #[cfg(feature = "logging_decorator")]
    let hello = ExcitedGreeting { inner: HelloWorld };
    #[cfg(not(feature = "logging_decorator"))]
    let hello = HelloWorld;
    println!("{}", hello.greet());
}

這裡使用了條件編譯功能,根據是否啟用了logging_decorator特性來決定是直接使用HelloWorld結構體,還是使用裝飾器ExcitedGreeting包裝它。要啟用裝飾器,只需在編譯時加上特定特性標記:

cargo run --features "logging_decorator"

從結構體裝飾器到Future裝飾器

裝飾器模式的威力在於它能夠輕鬆擴充套件到Rust的非同步世界。接下來我將探討如何為Future實作裝飾器模式。首先,需要匯入必要的模組:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

我們定義一個日誌記錄特徵,這將成為我們裝飾器的核心功能:

trait Logging {
    fn log(&self);
}

然後實作一個裝飾器結構體來包裝任何實作了Future和Logging特徵的類別:

struct LoggingFuture<F: Future + Logging> {
    inner: F,
}

impl<F: Future + Logging> Future for LoggingFuture<F> {
    type Output = F::Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
        inner.log();
        inner.poll(cx)
    }
}

這裡用了unsafe程式碼塊,因為Rust編譯器無法檢查pin的投影。不過,我們可以透過修改結構體定義來避免使用unsafe:

struct LoggingFuture<F: Future + Logging> {
    inner: Pin<Box<F>>,
}

impl<F: Future + Logging> Future for LoggingFuture<F> {
    type Output = F::Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        let inner = this.inner.as_mut();
        inner.log();
        inner.poll(cx)
    }
}

接著,我們為所有實作Future特徵的類別實作Logging特徵:

impl<F: Future> Logging for F {
    fn log(&self) {
        println!("Polling the future!");
    }
}

這樣,任何被我們裝飾器包裝的Future都會在每次被poll時輸出日誌。完整的應用範例如下:

async fn my_async_function() -> String {
    "Result of async computation".to_string()
}

#[tokio::main]
async fn main() {
    let logged_future = LoggingFuture { inner: my_async_function() };
    let result = logged_future.await;
    println!("{}", result);
}

執行結果會是:

Polling the future!
Result of async computation

狀態機模式

狀態機模式在非同步程式設計中尤其有用。一個簡單的狀態機可以用列舉來表示狀態,並定義狀態之間的轉換邏輯。

基本狀態機實作

首先定義狀態和事件:

enum State {
    On,
    Off,
}

enum Event {
    SwitchOn,
    SwitchOff,
}

然後實作狀態轉換邏輯:

impl State {
    async fn transition(self, event: Event) -> Self {
        match (&self, event) {
            (State::On, Event::SwitchOff) => {
                println!("Transitioning to the Off state");
                State::Off
            },
            (State::Off, Event::SwitchOn) => {
                println!("Transitioning to the On state");
                State::On
            },
            _ => {
                println!("No transition possible, staying in the current state");
                self
            },
        }
    }
}

測試這個狀態機:

#[tokio::main]
async fn main() {
    let mut state = State::On;
    state = state.transition(Event::SwitchOff).await;
    state = state.transition(Event::SwitchOn).await;
    state = state.transition(Event::SwitchOn).await;
    
    match state {
        State::On => println!("State machine is in the On state"),
        _ => println!("State machine is not in the expected state"),
    }
}

執行結果:

Transitioning to the Off state
Transitioning to the On state
No transition possible, staying in the current state
State machine is in the On state

根據狀態的Future選擇器

狀態機模式的一個強大應用是根據不同的狀態選擇不同的Future執行路徑。以下是一個根據開關狀態選擇不同Future的實作:

struct StateFuture<F: Future, X: Future> {
    pub state: State,
    pub on_future: F,
    pub off_future: X,
}

impl<F: Future, X: Future> Future for StateFuture<F, X> {
    type Output = State;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.state {
            State::On => {
                let inner = unsafe {
                    self.map_unchecked_mut(|s| &mut s.on_future)
                };
                let _ = inner.poll(cx);
                cx.waker().wake_by_ref();
                Poll::Pending
            },
            State::Off => {
                let inner = unsafe {
                    self.map_unchecked_mut(|s| &mut s.off_future)
                };
                let _ = inner.poll(cx);
                cx.waker().wake_by_ref();
                Poll::Pending
            },
        }
    }
}

這個Future會根據當前狀態選擇不同的內部Future進行輪詢,實作了根據狀態的非同步操作選擇。

重試模式

在非同步操作中,重試模式是處理暫時性故障的常用策略。特別是對於網路請求、資料函式庫等可能因為臨時故障而失敗的操作,合理的重試策略可以提高應用的穩定性。

首先定義一個始終回傳錯誤的測試函式:

async fn get_data() -> Result<String, Box<dyn std::error::Error>> {
    Err("Error".into())
}

然後實作重試邏輯:

async fn do_something() -> Result<(), Box<dyn std::error::Error>> {
    let mut miliseconds = 1000;
    let total_count = 5;
    let mut count = 0;
    let result: String;
    
    loop {
        match get_data().await {
            Ok(data) => {
                result = data;
                break;
            },
            Err(err) => {
                println!("Error: {}", err);
                count += 1;
                if count == total_count {
                    return Err(err);
                }
            }
        }
        
        tokio::time::sleep(
            tokio::time::Duration::from_millis(miliseconds)
        ).await;
        miliseconds *= 2;
    }
    
    Ok(())
}

這種實作使用指數退避策略,每次重試的等待時間都會翻倍,這有助於減輕對目標系統的壓力。

測試重試邏輯:

#[tokio::main]
async fn main() {
    let outcome = do_something().await;
    println!("Outcome: {:?}", outcome);
}

執行結果:

Error: Error
Error: Error
Error: Error
Error: Error
Error: Error
Outcome: Err("Error")

斷路器模式

斷路器模式是重試模式的進一步延伸,當系統檢測到持續性的錯誤時,會暫時停止嘗試,防止系統資源的浪費。

斷路器模式的核心是維護兩個原子值:

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::future::Future;
use tokio::task::JoinHandle;

static OPEN: AtomicBool = AtomicBool::new(false);
static COUNT: AtomicUsize = AtomicUsize::new(0);

OPEN表示斷路器是否開啟,COUNT記錄錯誤次數。當錯誤次數超過閾值時,斷路器開啟,新的任務將不再被執行。

在實際工作中,玄貓經常將這些設計模式組合使用。例如,在開發一個高用性的微服務系統時,我會在服務間通訊層實作斷路器模式,同時在內部操作上使用重試模式,並透過狀態機模式管理整個服務的生命週期。這種組合使用能夠大幅提高系統的彈性和可靠性。

設計模式的選擇與應用

選擇合適的設計模式需要考慮以下因素:

  1. 問題特性:裝飾器模式適合在不改變原有程式碼的情況下新增功能;狀態機模式適合有明確狀態轉換邏輯的系統;重試和斷路器模式適合需要處理外部依賴不穩定情況的場景。

  2. 系統需求:高用性系統可能需要組合使用多種模式;效能敏感的系統需要謹慎使用可能增加延遲的模式。

  3. 維護成本:過度使用設計模式可能增加程式碼複雜度,應在實用性和可維護性之間找到平衡。

在Rust的非同步程式設計中,這些設計模式不僅提供瞭解決特定問題的方法,更重要的是它們能夠與Rust的所有權系統、錯誤處理機制和非同步模型無縫整合。正確應用這些模式,可以使我們的非同步程式碼更加健壯、可維護和高效。

設計模式不是萬能藥,它們是工具箱中的工具,需要根據具體問題選擇合適的工具。在實際開發中,我常發現簡單直接的解決方案比過度工程化的設計更有效。關鍵是理解每種模式的優缺點,並在適當的場景中應用它們。

實作無依賴的Rust非同步斷路器模式

在處理分散式系統時,我常發現錯誤處理機制的重要性往往被低估。特別是在非同步程式設計中,當系統的某部分發生故障時,如何避免連鎖反應導致整個系統當機是一個關鍵問題。這就是為什麼我特別欣賞斷路器模式(Circuit Breaker Pattern)在Rust非同步程式設計中的應用。

在本文中,我將分享如何使用純標準函式庫斷路器模式,並進一步展示如何從零開始建立完整的非同步TCP伺服器。不依賴第三方套件不僅能幫助我們深入理解非同步原理,也能讓系統更加輕量與可控。

斷路器模式的工作原理

斷路器模式的核心概念非常直觀:當系統中某個部分重複失敗達到特定閾值時,「斷路器」會開啟,阻止進一步的請求流向可能已經故障的部分。這樣可以:

  1. 防止系統資源被無效請求耗盡
  2. 讓故障部分有時間還原
  3. 快速失敗而非讓請求長時間等待

讓我們看以下的Rust實作,這是一個簡單但功能完整的斷路器:

fn spawn_task<F, T>(future: F) -> Result<JoinHandle<T>, String>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let open = OPEN.load(Ordering::SeqCst);
    if open == false {
        return Ok(tokio::task::spawn(future))
    }
    Err("Circuit Open".to_string())
}

這個函式是斷路器模式的核心。它接受一個非同步任務(future),並嘗試將其生成為一個獨立的任務。但在生成前,它會檢查斷路器的狀態。如果斷路器開啟(OPEN為true),則拒絕生成新任務並回傳錯誤;否則正常生成任務。

OPEN是一個原子布林值,使用SeqCst(順序一致性)排序來確保在多執行緒環境中的可見性和一致性。這確保了當一個執行緒將斷路器設定為開啟狀態時,其他執行緒能立即看到這一變化。

實作測試任務

為了測試我們的斷路器,讓我們定義兩個簡單的非同步任務:

async fn error_task() {
    println!("error task running");
    let count = COUNT.fetch_add(1, Ordering::SeqCst);
    if count == 2 {
        println!("opening circuit");
        OPEN.store(true, Ordering::SeqCst);
    }
}

async fn passing_task() {
    println!("passing task running");
}

error_task模擬一個可能失敗的操作,它計算被呼叫的次數,並在達到特定閾值(這裡是2)時開啟斷路器。passing_task則是一個始終成功的簡單任務。

COUNT是另一個原子計數器,用來追蹤錯誤任務的執行次數。當計數達到2時,我們將斷路器狀態設定為開啟(OPEN.store(true, Ordering::SeqCst))。

測試斷路器功能

現在讓我們在主函式中測試這個斷路器的功能:

#[tokio::main]
async fn main() -> Result<(), String> {
    let _ = spawn_task(passing_task())?.await;
    let _ = spawn_task(error_task())?.await;
    let _ = spawn_task(error_task())?.await;
    let _ = spawn_task(error_task())?.await;
    let _ = spawn_task(passing_task())?.await;
    Ok(())
}

執行這段程式碼會產生以下輸出:

passing task running
error task running
error task running
error task running
opening circuit
Error: "Circuit Open"

從輸出可以看出,第一個passing_task和前兩個error_task正常執行。當第二個error_task執行時,計數達到閾值,斷路器被開啟。因此,第三個error_task和最後一個passing_task都無法執行,系統回傳"Circuit Open"錯誤。

這正是斷路器模式的精髓:在系統偵測到連續失敗後,自動阻止新的請求,避免資源浪費和連鎖故障。

從零開始構建非同步執行器

瞭解了斷路器模式後,讓我們更進一步,看如何從零開始構建一個完整的非同步系統。在本文中,我將展示如何使用Rust標準函式庫一個無依賴的非同步TCP伺服器。

專案結構設定

首先,我們需要設定專案的基本結構。我採用了工作區(workspace)方式組織程式碼,將系統分為四個主要模組:

[workspace]
members = [
  "client",
  "server",
  "data_layer",
  "async_runtime"
]

這種結構有幾個優點:

  1. 關注點分離:每個模組負責系統的一個特定方面
  2. 程式碼重用async_runtimedata_layer可以被客戶端和伺服器共用
  3. 獨立開發:各模組可以獨立測試和開發

資料層實作

資料層負責定義通訊協定中使用的資料結構,以及序列化和反序列化方法。以下是data_layer/src/data.rs的基本結構:

use std::io::{self, Cursor, Read, Write};

#[derive(Debug)]
pub struct Data {
    pub field1: u32,
    pub field2: u16,
    pub field3: String,
}

impl Data {
    pub fn serialize(&self) -> io::Result<Vec<u8>> {
        let mut bytes = Vec::new();
        bytes.write(&self.field1.to_ne_bytes())?;
        bytes.write(&self.field2.to_ne_bytes())?;
        let field3_len = self.field3.len() as u32;
        bytes.write(&field3_len.to_ne_bytes())?;
        bytes.extend_from_slice(self.field3.as_bytes());
        Ok(bytes)
    }
    
    pub fn deserialize(cursor: &mut Cursor<&[u8]>) -> io::Result<Data> {
        // 初始化適當大小的緩衝區
        let mut field1_bytes = [0u8; 4];
        let mut field2_bytes = [0u8; 2];
        
        // 從遊標讀取資料到緩衝區
        cursor.read_exact(&mut field1_bytes)?;
        cursor.read_exact(&mut field2_bytes)?;
        
        // 將位元組陣列轉換為適當的資料類別
        let field1 = u32::from_ne_bytes(field1_bytes);
        let field2 = u16::from_ne_bytes(field2_bytes);
        
        // 讀取字元串長度
        let mut len_bytes = [0u8; 4];
        cursor.read_exact(&mut len_bytes)?;
        let len = u32::from_ne_bytes(len_bytes) as usize;
        
        // 讀取字元串內容
        let mut field3_bytes = vec![0u8; len];
        cursor.read_exact(&mut field3_bytes)?;
        let field3 = String::from_utf8(field3_bytes)
            .map_err(|_| io::Error::new(
                io::ErrorKind::InvalidData, "Invalid UTF-8"
            ))?;
            
        // 回傳結構化資料
        Ok(Data { field1, field2, field3 })
    }
}

這個實作雖然簡單,但展示瞭如何手動處理二進位序列化,這在網路通訊中非常重要:

  1. 序列化方法將結構體轉換為位元組陣列,適合網路傳輸
  2. 反序列化方法從位元組流重建結構體
  3. 字元串長度的處理保證了可變長度資料的正確解析

在實際專案中,我通常會使用Serde這樣的函式庫理序列化,但這裡的手動實作幫助我們理解底層原理。

構建非同步執行時

接下來是本文最核心的部分:從零開始構建非同步執行時。非同步執行時需要以下幾個關鍵元件:

  1. Waker:負責喚醒已準備好繼續執行的Future
  2. Executor:負責處理Future直到完成
  3. Sender/Receiver:提供非同步資料傳送和接收功能
  4. Sleeper:提供非同步休眠功能

實作自定義Waker

Waker是非同步Rust的核心元件之一,負責通知執行器某個Future已準備好被輪詢。以下是實作自定義Waker的關鍵程式碼:

use std::task::{RawWaker, RawWakerVTable};

// 定義虛擬函式表
static VTABLE: RawWakerVTable = RawWakerVTable::new(
    my_clone,
    my_wake,
    my_wake_by_ref,
    my_drop,
);

// 克隆函式實作
unsafe fn my_clone(raw_waker: *const ()) -> RawWaker {
    RawWaker::new(raw_waker, &VTABLE)
}

// 喚醒函式實作
unsafe fn my_wake(raw_waker: *const ()) {
    drop(Box::from_raw(raw_waker as *mut u32));
}

// 不消耗waker的喚醒函式
unsafe fn my_wake_by_ref(_raw_waker: *const ()) {
    // 在這個簡化實作中不做任何事
}

// 清理函式實作
unsafe fn my_drop(raw_waker: *const ()) {
    drop(Box::from_raw(raw_waker as *mut u32));
}

// 建立RawWaker的函式
pub fn create_raw_waker() -> RawWaker {
    let data = Box::into_raw(Box::new(42u32));
    RawWaker::new(data as *const (), &VTABLE)
}

這個實作雖然簡單,但展示了Waker的核心機制:

  1. RawWakerVTable定義了一組函式,用於處理Waker的生命週期和通知機制
  2. my_clone函式建立Waker的副本,通常在輪詢Future時使用
  3. my_wakemy_wake_by_ref函式在Future準備好時被呼叫
  4. my_drop函式處理Waker的資源清理

在實際應用中,我們可以擴充套件這個實作,例如:

  • 在wake函式中設定一個原子布林值,讓執行器知道有Future準備好了
  • 維護一個就緒任務佇列,wake函式將任務ID新增到佇列中
  • 將任務相關資料而不是簡單的數字42傳入Waker

這種自定義Waker的能力展示了Rust非同步系統的靈活性和可擴充套件性。

從標準函式庫完整非同步TCP伺服器

在理解了斷路器模式和非同步執行器的基礎上,我們可以進一步構建完整的非同步TCP伺服器。這個伺服器將具有以下特點:

  1. 多執行緒處理連線請求
  2. 非同步處理每個客戶端的通訊
  3. 使用我們自定義的非同步執行器
  4. 實作錯誤處理和優雅關閉

這種無依賴的實作方式特別適合:

  • 嵌入式系統或資源受限環境
  • 需要最小化依賴的安全關鍵系統
  • 學習和理解非同步Rust的底層機制
  • 需要完全控制非同步行為的特殊應用

透過從零構建這些元件,我們不僅能更深入理解非同步Rust的工作原理,還能根據特定需求定製和最佳化我們的非同步系統。

非同步系統設計的關鍵考量

在實作像斷路器這樣的非同步模式時,有幾個關鍵因素需要考慮:

錯誤處理與還原策略

斷路器模式的核心是錯誤處理,但僅檢測錯誤還不夠。我們還需要考慮:

  1. 半開狀態:在一段時間後嘗試還原,允許少量請求透過以測試系統是否已還原
  2. 錯誤區分:區分暫時性錯誤和永久性錯誤,對不同類別錯誤採取不同策略
  3. 降級服務:當斷路器開啟時,提供備用或簡化的服務而非完全拒絕

資源管理與隔離

從零開始開發非同步伺服器:無依賴的自訂執行器

實作喚醒器的後續步驟

雖然我們已經建立了喚醒器(waker)的基本框架,但這僅是開始。透過自建喚醒器,我們獲得了極大的彈性與客製化能力。現在,讓我們繼續前進,開發執行器(executor)—這是我們非同步執行環境的核心。

執行器的設計與實作

執行器扮演著關鍵角色,它負責接收Future、將它們轉換為任務(Task)、回傳處理結果的控制程式碼(handle),以及管理任務佇列。我們需要在async_runtime/src/executor.rs檔案中實作這套機制。

首先,我們需要匯入必要的模組:

use std::{
    future::Future,
    sync::{Arc, mpsc},
    task::{Context, Poll, Waker},
    pin::Pin,
    collections::VecDeque
};
use crate::waker::create_raw_waker;

在開始撰寫執行器前,我們需要定義Task結構體,作為執行器中傳遞的基本單位:

pub struct Task {
    future: Pin<Box<dyn Future<Output = ()> + Send>>,
    waker: Arc<Waker>,
}

你可能已經注意到,我們的Task結構中的future回傳值是()空元組。這看起來有些奇怪,因為在實際應用中,我們希望能處理各種不同回傳型別的任務。若要讓所有Future都回傳相同類別,會嚴重限制執行器的實用性。

你可能會想到使用泛型引數來解決這個問題:

pub struct Task<T> {
    future: Pin<Box<dyn Future<Output = T> + Send>>,
    waker: Arc<Waker>,
}

但這樣做會產生新的問題。編譯器會針對每個T的變體生成不同的Task結構體,而執行器也需要根據不同的T來處理Task,這會導致為每種T變體建立多個執行器,最終變得非常混亂。

我們的解決方案是:將future包裝在一個非同步區塊中,取得結果後透過通道(channel)傳送出去。這樣,所有任務的簽名都會回傳(),但我們仍能提取future的實際結果。

以下是我們執行器的基本架構:

pub struct Executor {
    pub polling: VecDeque<Task>,
}

impl Executor {
    pub fn new() -> Self {
        Executor {
            polling: VecDeque::new(),
        }
    }
    
    pub fn spawn<F, T>(&mut self, future: F) -> mpsc::Receiver<T>
    where
        F: Future<Output = T> + 'static + Send,
        T: Send + 'static,
    {
        // 實作細節將在後續說明
    }
    
    pub fn poll(&mut self) {
        // 實作細節將在後續說明
    }
    
    pub fn create_waker(&self) -> Arc<Waker> {
        Arc::new(unsafe{Waker::from_raw(create_raw_waker())})
    }
}

執行器中的polling欄位是我們存放待輪詢任務的地方。

值得注意的是,我們的create_waker函式在執行器中扮演著重要角色。由於執行器在單一執行緒上執行,一次只能處理一個future,所以如果我們的執行器包含一個資料集合,我們可以將參考傳遞給create_raw_waker函式(如果我們已設定create_raw_waker來處理它)。我們的waker可以安全地存取資料集合,因為同一時間只有一個future被處理,所以不會有多個來自future的可變參考同時存在。

當任務被輪詢後,如果仍處於Pending狀態,我們會將其放回輪詢佇列以便再次輪詢。要將任務初始放入佇列,我們使用spawn函式:

pub fn spawn<F, T>(&mut self, future: F) -> mpsc::Receiver<T>
where
    F: Future<Output = T> + 'static + Send,
    T: Send + 'static,
{
    let (tx, rx) = mpsc::channel();
    let future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(
        async move {
            let result = future.await;
            let _ = tx.send(result);
        }
    );
    let task = Task {
        future,
        waker: self.create_waker(),
    };
    self.polling.push_back(task);
    rx
}

我們使用通道來回傳控制程式碼,並將future的回傳值轉換為()

如果你不想直接暴露內部通道,可以建立以下的JoinHandle結構體作為spawn任務的回傳值:

pub struct JoinHandle<T> {
    receiver: mpsc::Receiver<T>,
}

impl<T> JoinHandle<T> {
    pub fn await(self) -> Result<T, mpsc::RecvError> {
        self.receiver.recv()
    }
}

這樣你可以使用更優雅的語法來等待結果:

match handle.await() {
    Ok(result) => println!("收到結果: {}", result),
    Err(e) => println!("接收結果時發生錯誤: {}", e),
}

現在我們的任務已經放在輪詢佇列中,可以透過執行器的poll函式來輪詢它:

pub fn poll(&mut self) {
    let mut task = match self.polling.pop_front() {
        Some(task) => task,
        None => return,
    };
    let waker = task.waker.clone();
    let context = &mut Context::from_waker(&waker);
    match task.future.as_mut().poll(context) {
        Poll::Ready(()) => {}
        Poll::Pending => {
            self.polling.push_back(task);
        }
    }
}

我們從佇列前端取出任務,將waker的參考包裝在context中,並將其傳遞給future的poll函式。如果future已準備好,我們不需要做任何事,因為結果已經透過通道傳送回去,future會被丟棄。如果future仍處於pending狀態,我們將其放回佇列。

現在,我們非同步執行環境的骨架已經完成,我們可以執行非同步程式碼了。在繼續建構其餘模組前,讓我們先測試一下執行器的運作情況。

執行我們的執行器

執行我們的非同步執行環境相當簡單。在async_runtime模組的main.rs檔案中,我們匯入以下內容:

use std::{
    future::Future,
    task::{Context, Poll},
    pin::Pin
};
mod executor;
mod waker;

我們需要一個基本的future來追蹤系統的執行情況。在本章中,我們一直使用計數future作為示範,因為它是一個簡單的future實作,會根據狀態回傳Pending或Ready。這個計數future的實作如下:

pub struct CountingFuture {
    pub count: i32,
}

impl Future for CountingFuture {
    type Output = i32;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.count += 1;
        if self.count == 4 {
            println!("CountingFuture 已完成!");
            Poll::Ready(self.count)
        } else {
            cx.waker().wake_by_ref();
            println!(
                "CountingFuture 尚未完成! {}",
                self.count
            );
            Poll::Pending
        }
    }
}

現在,我們可以定義future、建立執行器、產生任務,並執行它們:

fn main() {
    let counter = CountingFuture { count: 0 };
    let counter_two = CountingFuture { count: 0 };
    let mut executor = executor::Executor::new();
    let handle = executor.spawn(counter);
    let _handle_two = executor.spawn(counter_two);
    
    std::thread::spawn(move || {
        loop {
            executor.poll();
        }
    });
    
    let result = handle.recv().unwrap();
    println!("結果: {}", result);
}

我們產生一個執行緒,並在無限迴圈中輪詢執行器中的future。在這個過程中,我們等待其中一個future的結果。在我們的伺服器中,我們將正確地實作執行器,使其能夠在程式的整個生命週期中持續接收future。這個簡單的實作會輸出以下結果:

CountingFuture 尚未完成! 1
CountingFuture 尚未完成! 1
CountingFuture 尚未完成! 2
CountingFuture 尚未完成! 2
CountingFuture 尚未完成! 3
CountingFuture 尚未完成! 3
CountingFuture 已完成!
CountingFuture 已完成!
結果: 4

可以看到,它運作正常!我們僅使用標準函式庫作了一個非同步執行環境!

值得注意的是,我們的waker實際上並沒有做什麼特別的事情。無論如何,我們都會輪詢執行器佇列中的future。如果你在CountingFuturepoll函式中註解掉cx.waker().wake_by_ref();這行程式碼,你會得到完全相同的結果,這與在smol或Tokio等執行環境中的行為不同。這告訴我們,成熟的執行環境使用waker只輪詢需要被喚醒的future,使它們的輪詢更有效率。

建構傳送器

在處理TCP連線的資料傳送時,如果連線當前被阻塞,我們必須允許執行器切換到另一個非同步任務。如果連線沒有被阻塞,我們可以將資料寫入串流。在async_runtime/src/sender.rs檔案中,我們首先匯入以下內容:

use std::{
    future::Future,
    task::{Context, Poll},
    pin::Pin,
    net::TcpStream,
    io::{self, Write},
    sync::{Arc, Mutex}
};

我們的傳送器本質上是一個future。在poll函式中,如果串流正在阻塞,我們會回傳Pending;如果串流沒有阻塞,我們會將資料寫入串流。傳送器的結構定義如下:

pub struct TcpSender {
    pub stream: Arc<Mutex<TcpStream>>,
    pub buffer: Vec<u8>
}

impl Future for TcpSender {
    type Output = io::Result<()>;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 實作細節將在下面說明
    }
}

我們的TcpStream被包裝在Arc<Mutex<T>>中,這樣我們就可以將TcpStream傳遞給Sender和Receiver。一旦我們透過串流傳送了資料,我們會希望使用Receiver future來等待回應。

在TcpSender結構的poll函式中,我們首先嘗試取得串流的鎖:

let mut stream = match self.stream.try_lock() {
    Ok(stream) => stream,
    Err(_) => {
        cx.waker().wake_by_ref();
        return Poll::Pending;
    }
};

如果無法取得鎖,我們回傳Pending,這樣就不會阻塞執行器,任務會被放回佇列以便再次輪詢。一旦我們獲得了鎖,我們將串流設定為非阻塞模式:

stream.set_nonblocking(true)?;

set_nonblocking函式使串流從write、recv、read或send函式立即回傳結果。如果I/O操作成功,結果將是Ok。如果I/O操作回傳io::ErrorKind::WouldBlock錯誤,則I/O操作需要重試,因為串流正在阻塞。我們處理這些I/O操作結果如下:

match stream.write_all(&self.buffer) {
    Ok(_) => {
        Poll::Ready(Ok(()))
    },
    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
        cx.waker().wake_by_ref();
        Poll::Pending
    },
    Err(e) => Poll::Ready(Err(e))
}

現在我們已經定義了傳送器future,可以繼續開發接收器future了。