Rust 的非同步特性為網路程式設計提供了強大的支援。本文示範如何利用 Tokio 和 Actor 模型重構 TCP 伺服器,以提升其平行處理能力。文章首先介紹了使用 select! 巨集實作非同步讀取操作,接著逐步引導讀者將伺服器轉換為 Actor,並透過通道進行訊息傳遞。過程中,我們也關注了程式碼的重構和最佳化,例如錯誤處理、資料封裝和模組化設計。最終,我們展望了系統未來的發展方向,包括複製、交易、持久化和資料流處理等技術。

第五章:使用 Actors 實作平行處理

步驟 3:重構連線處理器

本文將重構連線處理器,使用 select! 巨集代替直接等待 stream.read。由於這是一個重構步驟,我們不會改變現有的行為,但會為後續的多個非同步操作做好準備。當我們接收到伺服器傳送的訊息時,這將非常有用。

重構過程很簡單。原本的程式碼是:

match stream.read(&mut buffer).await {

現在將被替換為:

select! {
    result = stream.read(&mut buffer) => {
        match result {

其餘函式保持不變。

程式碼範例:

async fn handle_connection(mut stream: TcpStream, storage: Arc<Mutex<Storage>>) {
    let mut buffer = [0; 512];
    loop {
        select! {
            result = stream.read(&mut buffer) => {
                match result {
                    Ok(size) if size != 0 => {
                        // 處理讀取到的資料
                    }
                    Ok(_) => {
                        println!("Connection closed");
                        break;
                    }
                    Err(e) => {
                        println!("Error: {}", e);
                        break;
                    }
                }
            }
        }
    }
}

內容解密:

  1. select! 巨集允許我們同時等待多個非同步操作,並在其中一個完成時執行相應的程式碼。
  2. stream.read(&mut buffer) 是非同步讀取操作的範例,當資料可讀時,它將傳回讀取的位元組數。
  3. 使用 match 陳述式處理 stream.read 的結果,根據不同的結果進行不同的處理。
  4. 如果讀取到的位元組數不為零,則進一步處理讀取到的資料。
  5. 如果連線關閉或發生錯誤,則相應地處理這些情況。

步驟 4:向伺服器傳送請求

本步驟涉及比前幾個步驟更多的變更,但並沒有增加額外的複雜度。目前,連線處理器直接呼叫 process_request 並傳遞 RESP 請求。我們將對其進行修改,以便使用 Request

程式碼範例:

let request = Request {
    value: resp,
    sender: connection_sender.clone(),
};
let response = match process_request(request, storage.clone()) {
    Ok(v) => v,
    Err(e) => {
        eprintln!("Error parsing command: {}", e);
        return;
    }
};

內容解密:

  1. 建立一個 Request 物件,其中包含 RESP 請求和一個傳送者通道,用於稍後向伺服器傳送訊息。
  2. 呼叫 process_request 處理請求,並根據結果進行相應的處理。
  3. 如果處理成功,則將回應寫入 socket;如果發生錯誤,則輸出錯誤資訊。

測試變更

由於 process_request 的簽名已更改,測試程式碼也需要進行相應的修改。

程式碼範例:

#[test]
fn test_process_request_ping() {
    let (connection_sender, _) = mpsc::channel::<ServerMessage>(32);
    let request = Request {
        value: RESP::Array(vec![RESP::BulkString(String::from("PING"))]),
        sender: connection_sender,
    };
    let storage = Arc::new(Mutex::new(Storage::new()));
    let output = process_request(request, storage).unwrap();
    assert_eq!(output, RESP::SimpleString(String::from("PONG")));
}

內容解密:

  1. 在測試中建立一個 Request 物件,並建立一個 MPSC 通道,用於傳送 ServerMessage
  2. 呼叫 process_request 並驗證其輸出是否符合預期。
  3. 使用 assert_eq! 巨集檢查輸出是否正確。

透過這些變更,我們為後續使用 Actors 實作平行處理奠定了基礎。接下來的步驟將進一步完善這個系統,使其更加健壯和高效。

第五章:使用 Actors 實作平行處理

步驟 5 - 將伺服器轉換為 Actor

目前,我們可以將伺服器轉換為一個完整的 Actor。這明顯涉及連線處理程式,但在此步驟中,我們將重點關注伺服器。在下一步中,我們將修復連線處理程式,並確保通訊的兩端正常運作。因此,在本文結束時,程式碼將無法正常運作。

伺服器

讓我們建立一個結構來捕捉伺服器需要管理的資訊。

// src/server.rs
pub struct Server {
    pub storage: Option<Storage>,
}

正如你所看到的,這是 Actor 背後的核心思想:伺服器擁有儲存。

// src/server.rs
impl Server {
    pub fn new() -> Self {
        Self { storage: None }
    }

    pub fn set_storage(mut self, storage: Storage) -> Self {
        self.storage = Some(storage);
        self
    }
}

在此實作中,伺服器可以在沒有儲存的情況下初始化,但這只是一個風格選擇,不會影響 Actor 的運作方式。

下一個變更引入了 Actor 本身,它是一個名為 run_server 的函式。

// src/server.rs
use crate::connection::ConnectionMessage;
use tokio::sync::mpsc;

pub async fn run_server(mut server: Server, mut crx: mpsc::Receiver<ConnectionMessage>) {
    loop {
        tokio::select! {
            Some(message) = crx.recv() => {
                match message {
                    ConnectionMessage::Request(request) => {
                        process_request(request, &mut server).await;
                    }
                }
            }
        }
    }
}

正如你所看到的,這是使用 Server 和一個接收來自連線處理程式的訊息的通道進行初始化。主體是一個無限迴圈,提取訊息、提取請求,並在其上執行 process_request

說到這裡,函式 process_request 也需要更改,因為我們現在直接傳遞對 Server 的參照,而不是 Arc<Mutex<Storage>>

// src/server.rs
pub async fn process_request(request: Request, server: &mut Server) {
    let elements = match &request.value {
        RESP::Array(v) => v,
        _ => {
            panic!()
        }
    };

    let mut command = Vec::new();
    for elem in elements.iter() {
        match elem {
            RESP::BulkString(v) => command.push(v.clone()),
            _ => {
                panic!()
            }
        }
    }

    let storage = match server.storage.as_mut() {
        Some(storage) => storage,
        None => panic!(),
    };

    let response = storage.process_command(&command);
}

內容解密:

  1. process_request 函式的變更:不再需要鎖定儲存,因為伺服器是該資源的唯一擁有者。回應不再由函式傳回,因為我們需要在下一步中建立一個系統將其發送回連線處理程式。
  2. panic!() 的使用:由於尚未建立接收訊息的系統另一半,因此使用 panic!() 作為錯誤訊號的佔位符。
  3. 移除錯誤傳回:包含 StorageError::IncorrectRequest 的錯誤傳回程式碼已被刪除,並替換為 panic!()

由於我們建立了一個新的型別,因此最好進行測試。因此,我們將建立 test_create_newtest_set_storage。由於 process_request 目前無法以任何有意義的方式輸出回應,因此我們還應該註解掉剩餘的測試。

// src/server.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_create_new() {
        let server: Server = Server::new();
        match server.storage {
            Some(_) => panic!(),
            None => (),
        };
    }

    #[test]
    fn test_set_storage() {
        let storage = Storage::new();
        let server: Server = Server::new().set_storage(storage);
        match server.storage {
            Some(_) => (),
            None => panic!(),
        };
    }

    // #[test]
    // fn test_process_request_ping() {
    //     ...
    // }

    // #[test]
    // fn test_process_request_echo() {
    //     ...
    // }

    // #[test]
    // fn test_process_request_not_array() {
    //     ...
    // }

    // #[test]
    // fn test_process_request_not_bulkstrings() {
    //     ...
    // }
}

連線處理程式

現在伺服器已經轉換完畢,我們需要更改連線處理程式,以便它使用訊息通道傳送請求。

函式 handle_connection 不再需要接收 Arc<Mutex<Storage>>,而是接收一個用於 ConnectionMessage 實體的傳送者。

// src/main.rs
async fn handle_connection(mut stream: TcpStream, server_sender: mpsc::Sender<ConnectionMessage>) {
    // ...
}

process_request 的呼叫被替換為對 server_sender.send 的呼叫,由於後者不直接傳回回應,因此管理錯誤的程式碼已被刪除。同樣,這將在下一步中完成。

// src/main.rs
async fn handle_connection(mut stream: TcpStream, server_sender: mpsc::Sender<ConnectionMessage>) {
    // ...
    let request = Request {
        value: resp,
        sender: connection_sender.clone(),
    };

    match server_sender.send(ConnectionMessage::Request(request)).await {
        Ok(()) => {},
        Err(e) => {
            eprintln!("Error sending request: {}", e);
            return;
        }
    }
    // ...
}

內容解密:

  1. handle_connection 的變更:不再接收 Arc<Mutex<Storage>>,而是接收一個用於 ConnectionMessage 實體的傳送者。
  2. 使用訊息通道:對 process_request 的呼叫被替換為對 server_sender.send 的呼叫,以透過訊息通道傳送請求。
  3. 錯誤處理:由於 server_sender.send 不直接傳回回應,因此刪除了管理錯誤的程式碼。

由於 handle_connection 更改了其原型,因此我們需要相應地更改 main 的程式碼。正如你所看到的,我們還建立了將用於向伺服器傳送訊息的通道。

// src/main.rs
use connection::ConnectionMessage;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // ...
    let (server_sender, _) = mpsc::channel::<ConnectionMessage>(32);

    loop {
        tokio::select! {
            connection = listener.accept() => {
                match connection {
                    Ok((stream, _)) => {
                        tokio::spawn(handle_connection(stream, server_sender.clone()));
                    }
                    Err(e) => {
                        println!("Error: {}", e);
                        continue;
                    }
                }
            }
            // ...
        }
    }
}

原始碼

https://github.com/lgiordani/sider/tree/ed1/step5.5

第五章:使用角色(Actors)實作並發

第七步:整理程式碼

在進行重大重構後,保持系統處於可運作狀態至關重要。重構過程會產生許多臨時程式碼和未使用的匯入,導致程式碼函式庫混亂。在本步驟中,我們將填補這些漏洞,確保程式碼寫得井井有條。

7.1 還原主動過期機制

首先,我們需要還原在前一步驟中被移除的主動過期機制,因為儲存(storage)不再能從 main 函式中直接存取。這裡的策略很簡單。

首先,從 main.rs 中移除 expire_keys,並將其重新建立為 Server 結構體的一個方法。

// src/server.rs
impl Server {
    pub fn new() -> Self {
        Self { storage: None }
    }

    pub fn set_storage(mut self, storage: Storage) -> Self {
        self.storage = Some(storage);
        self
    }

    /// #### 內容解密:
    /// `expire_keys` 方法用於處理鍵的過期邏輯。
    /// 首先檢查 `self.storage` 是否存在,如果存在,則呼叫其 `expire_keys` 方法。
    pub fn expire_keys(&mut self) {
        let storage = match self.storage.as_mut() {
            Some(storage) => storage,
            None => return,
        };
        storage.expire_keys();
    }
}

expire_keys 變成 Server 的方法純粹是出於偏好。這樣做可以使程式碼更具物件導向風格。

現在,由於伺服器角色 run_server 是一個 select! 迴圈,我們可以再次建立 interval_timer 並等待 interval_timer.tick()

// src/main.rs
#[tokio::main]
async fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    let storage = Storage::new();
    let mut server = Server::new();
    server = server.set_storage(storage);
    let (server_sender, server_receiver) = mpsc::channel::<ConnectionMessage>(32);
    
    tokio::spawn(run_server(server, server_receiver));
    
    let mut interval_timer = tokio::time::interval(std::time::Duration::from_millis(100));
    
    loop {
        select! {
            // ... 其他事件處理
            _ = interval_timer.tick() => {
                // #### 內容解密:
                // 在此處呼叫伺服器的 `expire_keys` 方法來處理鍵過期。
                // 但是,由於伺服器現在是作為一個角色執行,我們需要透過訊息來間接呼叫它。
                // 這裡需要設計一個訊息來觸發伺服器的過期處理。
            }
        }
    }
}

#### 內容解密:

上述程式碼片段展示瞭如何還原主動過期機制。主要步驟包括將 expire_keys 方法新增到 Server 結構體中,並在主迴圈中使用 interval_timer 定期觸發過期檢查。然而,由於伺服器現在是作為一個角色執行,我們需要設計一個適當的訊息來間接觸發伺服器的過期處理。

此圖示展示了主動過期機制的流程。首先,主迴圈等待 interval_timer.tick() 事件。當事件觸發時,主迴圈會傳送一個訊息給伺服器角色,通知它進行鍵過期檢查。伺服器角色接收到訊息後,執行相應的過期處理邏輯。

程式碼整理與最佳化

在完成了主要的功能實作後,我們需要對程式碼進行整理和最佳化。這包括移除未使用的匯入、最佳化錯誤處理邏輯、以及新增必要的單元測試等。

#### 內容解密:

程式碼整理的目的是為了提高程式碼的可讀性和可維護性。這涉及到多個方面,包括但不限於:

  1. 移除未使用的匯入:檢查程式碼中未被使用的匯入陳述式,並將其移除,以減少程式碼的冗餘。
  2. 最佳化錯誤處理:檢查現有的錯誤處理邏輯,確保它們能夠有效地捕捉和處理可能發生的錯誤。
  3. 新增單元測試:為關鍵的功能模組新增單元測試,以確保它們能夠正確地工作。

透過這些步驟,我們可以顯著提高程式碼的品質,使其更加健壯和易於維護。

第七步:最佳化伺服器與連線處理

在前面的章節中,我們已經成功將伺服器轉換為一個演員(actor),並且處理了連線請求。本步驟將進一步最佳化伺服器的實作,特別是在處理回應的型別、請求的處理邏輯以及連線處理的分離。

7.1 伺服器回應型別的定義

首先,我們需要定義伺服器回應的型別。之前的實作中,伺服器可以傳回任意型別的結果,但這並不夠明確。因此,我們定義了一個 ServerValue 列舉來包裝可能的回應型別,目前只有 RESP 型別。

// src/server_result.rs
#[derive(Debug)]
pub enum ServerValue {
    RESP(RESP),
}

pub type ServerResult = Result<ServerValue, ServerError>;

#[derive(Debug)]
pub enum ServerMessage {
    Data(ServerValue),
    Error(ServerError),
}

內容解密:

  • ServerValue 列舉定義了伺服器可能的回應值,目前僅包含 RESP 型別。
  • ServerResult 型別是 ServerValue 的結果型別,可能包含錯誤。
  • ServerMessage 列舉用於表示伺服器傳送給連線處理的訊息,可以是資料或錯誤。

7.2 請求處理的最佳化

接下來,我們最佳化了 process_request 函式,使其使用新定義的 ServerMessageServerValue。這樣可以簡化錯誤處理和資料傳送的邏輯。

// src/server.rs
pub async fn process_request(request: Request, server: &mut Server) {
    // ...
    match response {
        Ok(v) => {
            request.data(ServerValue::RESP(v)).await;
        }
        Err(e) => {
            request.error(ServerError::CommandError).await;
        }
    }
}

內容解密:

  • process_request 函式根據處理結果傳送 ServerMessage 給請求者。
  • 使用 request.datarequest.error 方法簡化了訊息傳送的邏輯。

7.3 請求結構的最佳化

我們為 Request 結構增加了輔助方法,以簡化錯誤和資料的傳送。

// src/request.rs
impl Request {
    pub async fn error(&self, e: ServerError) {
        self.sender.send(ServerMessage::Error(e)).await.unwrap();
    }

    pub async fn data(&self, d: ServerValue) {
        self.sender.send(ServerMessage::Data(d)).await.unwrap();
    }
}

內容解密:

  • error 方法用於傳送錯誤訊息。
  • data 方法用於傳送資料訊息。
  • 這兩個方法簡化了在 process_request 中的訊息傳送邏輯。

7.4 連線處理的分離

最後,我們將連線處理邏輯分離到單獨的檔案中,以提高程式碼的可讀性和可維護性。

// src/connection.rs
pub async fn run_listener(host: String, port: u16, server_sender: mpsc::Sender<ConnectionMessage>) {
    let listener = TcpListener::bind(format!("{}:{}", host, port)).await.unwrap();
    loop {
        tokio::select! {
            connection = listener.accept() => {
                match connection {
                    Ok((stream, _)) => {
                        tokio::spawn(handle_connection(stream, server_sender.clone()));
                    }
                    Err(e) => {
                        eprintln!("Error: {}", e);
                    }
                }
            }
        }
    }
}

內容解密:

  • run_listener 函式負責監聽 TCP 連線,並為每個連線產生一個處理任務。
  • 這樣可以提高伺服器的並發處理能力。

下一步:探索更多可能性

到目前為止,我們已經從一個簡單的TCP伺服器發展到一個具備基本遠端字典功能的豐富系統,並且這個系統具有聰明的架構,可以輕易地擴充套件。CodeCrafters的挑戰針對這些擴充套件功能,讓我們能夠探索以下主題:

  • 複製(Replication)
  • 交易(Transactions)
  • RDB持久化(RDB persistence)
  • 資料流(Streams)

正如我在介紹中提到的,這本文並不是100%完成的,因為我計劃逐一處理這些主題。前兩個主題的程式碼已經可以運作,但我仍然需要將它們分成步驟,並撰寫相關的說明。

結語

希望你們到目前為止喜歡這本文!如同我所說,我只是Rust的初學者,因此希望這本文能幫助其他人繼續他們在這個令人驚奇的語言上的旅程。任何形式的回饋、糾正或建議都是非常受歡迎的。

感謝你們閱讀我的書!

探索未來的技術藍圖

在未來的發展中,我們可以預見幾個關鍵的技術方向將會進一步拓展目前的系統。這些方向不僅能提升系統的功能性,也將為開發者提供更多實用的工具和技術。

1. 強化複製功能

複製是分散式系統中的一個重要議題,它能夠確保資料在多個節點之間保持一致。未來,我們可以期待看到更多關於複製的技術細節,包括如何處理衝突、如何最佳化複製的速度和效率等。

內容解密:

複製功能的核心在於資料同步。當多個節點需要保持資料一致時,系統需要有一套有效的機制來確保資料的正確性和一致性。這涉及到資料版本控制、衝突解決策略等技術。

2. 交易功能的深度整合

交易功能對於需要確保資料完整性的應用程式至關重要。未來的發展可能會著重於如何最佳化交易處理的效能,以及如何在分散式系統中實作可靠的交易。

內容解密:

交易功能的實作在於保證一系列操作的原子性。這意味著這些操作要麼全部成功,要麼全部失敗,不會出現部分成功、部分失敗的情況。這需要系統能夠有效地管理和還原資料狀態。

3. RDB持久化的改進

RDB持久化是一種將資料儲存到磁碟上的方法,它對於需要長期儲存資料的應用程式非常重要。未來的改進可能會集中在如何提高RDB持久化的效率和可靠性。

內容解密:

RDB持久化的核心是將資料以特定的格式儲存到磁碟上。這涉及到資料序列化、磁碟I/O最佳化等技術。有效的RDB持久化能夠確保資料的安全性和可還原性。

4. 資料流的高效處理

資料流技術能夠讓系統實時處理大量的資料流。未來的發展可能會關注如何提高資料流處理的效率和可擴充套件性。

內容解密:

資料流處理的核心在於能夠即時處理和分析連續的資料流。這需要系統具備高效的資料處理能力和靈活的擴充套件性,以便能夠處理日益增長的資料量。

@startuml
skinparam backgroundColor #FEFEFE
skinparam componentStyle rectangle

title Rust 非同步網路程式設計 Actor 模型

package "機器學習流程" {
    package "資料處理" {
        component [資料收集] as collect
        component [資料清洗] as clean
        component [特徵工程] as feature
    }

    package "模型訓練" {
        component [模型選擇] as select
        component [超參數調優] as tune
        component [交叉驗證] as cv
    }

    package "評估部署" {
        component [模型評估] as eval
        component [模型部署] as deploy
        component [監控維護] as monitor
    }
}

collect --> clean : 原始資料
clean --> feature : 乾淨資料
feature --> select : 特徵向量
select --> tune : 基礎模型
tune --> cv : 最佳參數
cv --> eval : 訓練模型
eval --> deploy : 驗證模型
deploy --> monitor : 生產模型

note right of feature
  特徵工程包含:
  - 特徵選擇
  - 特徵轉換
  - 降維處理
end note

note right of eval
  評估指標:
  - 準確率/召回率
  - F1 Score
  - AUC-ROC
end note

@enduml

此圖示展示了未來技術發展的方向和它們之間的關係。從複製、交易、RDB持久化到資料流,每一項技術都在推動著系統朝著更高效、更可靠的方向發展。