Rust 的所有權系統和生命週期管理,使其成為構建可靠高效網路應用程式的理想選擇。本文將示範如何利用 Rust 的非同步能力結合 Tokio 執行環境,打造一個根據 RESP 協定的非同步 Redis 訂單系統。此係統將能有效處理大量併發請求,並展現 Rust 在網路程式設計領域的優勢。我們將從訂單資料結構的定義開始,逐步實作 Redis 命令的解析與處理,最後整合 Tokio 進行非同步網路通訊。過程中,我們會使用 chrono 處理時間戳記,lazy_static 進行全域資料初始化,以及 resp crate 進行 RESP 訊息的編碼和解碼。透過這些技術的整合,我們將建構一個具備高效率和穩定性的非同步 Redis 訂單系統。

匯入必要的 crate

首先,我們需要匯入必要的 crate,包括 chronolazy_staticresp。這些 crate 將幫助我們處理時間、懶惰初始化和 Redis 命令的編碼和解碼。

$ cargo add chrono lazy_static

定義訂單結構

接下來,我們定義了訂單的結構,包括 OrderItemOrder 結構包含了訂單的物品和建立時間,而 Item 是一個列舉,代表了不同的 Intel 處理器型號。

// Represents an order from a user
#[derive(Debug, Clone)]
pub struct Order {
    item: Item,
    created_at: String,
}

// Represents a store's or manufacturer's items
// We will do intel processors using msrp USD
#[derive(Debug, Copy, Clone)]
pub enum Item {
    I5, // I5-12600
    I7, // I7-12700
    I9, // I9-12900
    Nil, // if product doesn't exist
}

實作訂單的方法

我們實作了 Ordernew 方法和 to_string 方法。new 方法建立了一個新的訂單,包含了指定的物品和當前的時間。to_string 方法傳回了訂單的字串表示,包括了物品的名稱、價格和建立時間。

impl ToString for Order {
    fn to_string(&self) -> String {
        format!("Item: {} [${}] -- Created at: {}", &self.item.to_string(), &self.item.get_price(), &self.created_at)
    }
}

impl Order {
    pub fn new(item: &str) -> Self {
        Self {
            item: Item::new(item),
            created_at: TIME_NOW.to_string(),
        }
    }
}

實作物品的方法

我們實作了 Itemnew 方法、get_price 方法和 to_string 方法。new 方法建立了一個新的物品,根據指定的名稱。get_price 方法傳回了物品的價格。to_string 方法傳回了物品的字串表示。

impl Item {
    pub fn new(item: &str) -> Self {
        // ...
    }

    pub fn get_price(&self) -> u32 {
        // ...
    }
}

impl ToString for Item {
    fn to_string(&self) -> String {
        // ...
    }
}

使用懶惰初始化的資料函式庫

我們使用了懶惰初始化的方式來建立了一個簡單的資料函式庫,儲存了訂單的資訊。

lazy_static! {
    static ref TIME_NOW: DateTime<Utc> = Utc::now();
    pub static ref ORDERS: Mutex<HashMap<String, Order>> = Mutex::new(HashMap::new());
}

這個資料函式庫使用了 HashMap 來儲存訂單的資訊,鍵是訂單的 ID,值是 Order 的例項。資料函式庫使用了 Mutex 來確保執行緒安全。

實作 Redis 風格的訂單系統

專案定義

首先,我們需要定義一個 Item 列舉,代表不同的 CPU 型號。這個列舉需要實作 ToString 介面,以便於將其轉換為字串。

impl ToString for Item {
    fn to_string(&self) -> String {
        match &self {
            Item::I5 => "i5-12600".to_string(),
            Item::I7 => "i7-12700".to_string(),
            Item::I9 => "i9-12900".to_string(),
            Item::Nil => "Item doesn't exist".to_string(),
        }
    }
}

專案建立和價格查詢

接下來,我們需要實作 Itemnew 方法,根據給定的字串建立對應的 Item 例項。另外,我們還需要實作 get_price 方法,根據 Item 例項傳回其對應的價格。

impl Item {
    pub fn new(item: &str) -> Self {
        match item {
            "i5" => Self::I5,
            "i7" => Self::I7,
            "i9" => Self::I9,
            _ => Self::Nil,
        }
    }

    pub fn get_price(&self) -> u32 {
        match &self {
            Item::I5 => 240,
            Item::I7 => 350,
            Item::I9 => 590,
            Item::Nil => 0,
        }
    }
}

主函式和 TCP 連線

main.rs 中,我們需要定義主函式 main,並建立一個 TCP 聯結器。使用者可以透過命令列引數指定要繫結的地址,否則將使用預設地址 127.0.0.1:6378

fn main() {
    let address = env::args()
        .skip(1)
        .next()
        .unwrap_or("127.0.0.1:6378".to_owned());

    let listener = TcpListener::bind(&address).unwrap();
    println!("orders_sync listening on: {}", &address);

    for tcp_stream in listener.incoming() {
        let tcp_stream = tcp_stream.unwrap();
        println!("New incoming connection from: {:?}", tcp_stream);
        handle_incoming(tcp_stream);
    }
}

處理傳入的 TCP 連線

handle_incoming 函式中,我們需要解碼傳入的 TCP 連線,並根據解碼的結果進行相應的處理。

fn handle_incoming(stream: TcpStream) {
    let mut tcp_stream = BufReader::new(stream);
    let decoder = Decoder::new(&mut tcp_stream).decode();

    match decoder {
        Ok(value) => {
            let reply = process_order(value);

            match tcp_stream.get_mut().write_all(&reply) {
                Ok(()) => (),
                Err(e) => eprintln!("Error: {:?}", e),
            }
        }
        Err(e) => {
            eprintln!("Invalid command: {:?}", e);
        }
    };
}

處理訂單

process_order 函式中,我們需要根據傳入的 Value 例項進行相應的處理,包括 GETSETDEL 命令。

fn process_order(value: Value) -> Vec<u8> {
    // 根據 value 例項進行相應的處理
    // ...
}

命令處理

根據 Redis 的官方檔案,GET 命令用於取得指定鍵的值,SET 命令用於設定指定鍵的值,DEL 命令用於刪除指定鍵。

fn process_order(value: Value) -> Vec<u8> {
    match value {
        Value::Array(ref array) => {
            if array.len() < 2 {
                return "Invalid command".as_bytes().to_vec();
            }

            let command = array[0].as_str().unwrap();
            let key = array[1].as_str().unwrap();

            match command {
                "GET" => {
                    // 處理 GET 命令
                    // ...
                }
                "SET" => {
                    // 處理 SET 命令
                    // ...
                }
                "DEL" => {
                    // 處理 DEL 命令
                    // ...
                }
                _ => {
                    return "Invalid command".as_bytes().to_vec();
                }
            }
        }
        _ => {
            return "Invalid command".as_bytes().to_vec();
        }
    }
}

專案價格查詢

process_order 函式中,我們需要根據傳入的 Value 例項查詢對應的專案價格。

fn process_order(value: Value) -> Vec<u8> {
    // ...

    let item = Item::new(key);
    let price = item.get_price();

    // ...
}

完整的 process_order 函式

以下是完整的 process_order 函式實作:

fn process_order(value: Value) -> Vec<u8> {
    match value {
        Value::Array(ref array) => {
            if array.len() < 2 {
                return "Invalid command".as_bytes().to_vec();
            }

            let command = array[0].as_str().unwrap();
            let key = array[1].as_str().unwrap();

            match command {
                "GET" => {
                    let item = Item::new(key);
                    let price = item.get_price();

                    format!("{}: {}", key, price).as_bytes().to_vec()
                }
                "SET" => {
                    // 處理 SET 命令
                    // ...
                }
                "DEL" => {
                    // 處理 DEL 命令
                    // ...
                }
                _ => {
                    return "Invalid command".as_bytes().to_vec();
                }
            }
        }
        _ => {
            return "Invalid command".as_bytes().to_vec();
        }
    }
}

處理 Redis 訊息的 Rust 函式

以下是處理 Redis 訊息的 Rust 函式,該函式會根據接收到的訊息執行相應的命令。

處理訂單函式

pub fn process_order(msg: Value) -> Vec<u8> {
    let reply: Result<Value, Value> = if let Value::Array(v) = msg {
        match &v[0] {
            Value::Bulk(ref s) if &s.to_lowercase() == "get" => handle_get(v),
            Value::Bulk(ref s) if &s.to_lowercase() == "set" => handle_set(v),
            Value::Bulk(ref s) if &s.to_lowercase() == "del" => handle_del(v),
            _ => Err(Value::Error("Invalid command".to_string())),
        }
    } else {
        Err(Value::Error("Invalid Command".to_string()))
    };
    match reply {
        Ok(r) | Err(r) => r.encode(),
    }
}

處理 GET 命令函式

pub fn handle_get(values: Vec<Value>) -> Result<Value, Value> {
    let v = values.iter().skip(1).collect::<Vec<_>>();
    if v.is_empty() {
        return Err(Value::Error("Expected Name for GET command".to_string()));
    }
    let orders = ORDERS.lock().unwrap();
    let reply = if let Value::Bulk(ref s) = &v[0] {
        orders
            .get(s)
            .map(|o| Value::Bulk(o.to_string()))
            .unwrap_or(Value::Null)
    } else {
        Value::Null
    };
    Ok(reply)
}

處理 SET 命令函式

pub fn handle_set(values: Vec<Value>) -> Result<Value, Value> {
    let v = values.iter().skip(1).collect::<Vec<_>>();
    if v.is_empty() || v.len() != 2 {
        return Err(Value::Error("Expected Name and Item for SET command".to_string()));
    }
    // 處理 SET 命令的邏輯
    unimplemented!()
}

處理 DEL 命令函式

pub fn handle_del(values: Vec<Value>) -> Result<Value, Value> {
    let v = values.iter().skip(1).collect::<Vec<_>>();
    if v.is_empty() {
        return Err(Value::Error("Expected Name for DEL command".to_string()));
    }
    // 處理 DEL 命令的邏輯
    unimplemented!()
}

圖表翻譯:

  flowchart TD
    A[接收 Redis 訊息] --> B[處理訂單]
    B --> C[執行 GET 命令]
    B --> D[執行 SET 命令]
    B --> E[執行 DEL 命令]
    C --> F[傳回訂單詳情]
    D --> G[設定訂單]
    E --> H[刪除訂單]

這個流程圖展示了處理 Redis 訊息的流程,包括接收訊息、處理訂單、執行 GET、SET 和 DEL 命令等。

訂單系統的實作

在這個章節中,我們將實作一個簡單的訂單系統,使用 Rust 和 Redis 來儲存和管理訂單。

訂單系統的需求

我們的訂單系統需要支援以下功能:

  • 訂單的建立:使用 SET 命令建立一個新的訂單。
  • 訂單的刪除:使用 DEL 命令刪除一個訂單。
  • 訂單的查詢:使用 GET 命令查詢一個訂單。

訂單系統的實作

我們使用 Rust 來實作訂單系統,並使用 Redis 來儲存和管理訂單。以下是實作的程式碼:

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// 定義訂單結構
struct Order {
    name: String,
    item: String,
}

// 定義訂單系統
struct OrderSystem {
    orders: Arc<Mutex<HashMap<String, Order>>>,
}

impl OrderSystem {
    // 建立一個新的訂單系統
    fn new() -> Self {
        OrderSystem {
            orders: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    // 處理 SET 命令
    fn handle_set(&self, values: Vec<String>) -> Result<String, String> {
        // 檢查輸入引數
        if values.len() != 2 {
            return Err("Expected Name and Item for SET command".to_string());
        }

        // 取得訂單名稱和專案
        let name = values[0].clone();
        let item = values[1].clone();

        // 建立一個新的訂單
        let order = Order { name, item };

        // 儲存訂單
        self.orders.lock().unwrap().insert(name, order);

        // 傳回成功訊息
        Ok("Order Created!".to_string())
    }

    // 處理 DEL 命令
    fn handle_del(&self, values: Vec<String>) -> Result<String, String> {
        // 檢查輸入引數
        if values.len() != 1 {
            return Err("Expected Name for DEL command".to_string());
        }

        // 取得訂單名稱
        let name = values[0].clone();

        // 刪除訂單
        self.orders.lock().unwrap().remove(&name);

        // 傳回成功訊息
        Ok("Order Deleted!".to_string())
    }
}

fn main() {
    // 建立一個新的訂單系統
    let order_system = OrderSystem::new();

    // 處理 SET 命令
    let result = order_system.handle_set(vec!["bob".to_string(), "i7".to_string()]);
    println!("{}", result.unwrap());

    // 處理 DEL 命令
    let result = order_system.handle_del(vec!["bob".to_string()]);
    println!("{}", result.unwrap());
}

測試訂單系統

我們可以使用 Redis 來測試訂單系統。首先,編譯和執行訂單系統:

$ cargo run
orders_sync listening on: 127.0.0.1:6378

然後,使用 Redis 客戶端連線到訂單系統:

$ redis-cli -p 6378
127.0.0.1:6378> set bob i7
Order Created!

最後,刪除訂單:

$ redis-cli -p 6378
127.0.0.1:6378> del bob
Order Deleted!

這樣,我們就完成了訂單系統的實作和測試。

使用 Redis 進行資料儲存和刪除

Redis 是一個高效的資料儲存系統,允許使用者儲存和刪除資料。以下是使用 Redis 進行資料儲存和刪除的範例。

儲存資料

使用 get 命令可以儲存資料。例如,儲存一筆資料,包含商品名稱、價格和建立時間:

$ redis-cli -p 6378
127.0.0.1:6378> get bob
"Item: i7-12700 [$350] -- Created at: 2022-09-04 00:04:03.260111010 UTC"

刪除資料

使用 del 命令可以刪除資料。例如,刪除一筆資料:

$ redis-cli -p 6378
127.0.0.1:6378> del tim
Order Deleted!

檢查資料是否存在

使用 get 命令可以檢查資料是否存在。例如,檢查一筆資料是否存在:

$ redis-cli -p 6378
127.0.0.1:6378> get tim
(nil)

如果資料不存在,則會傳回 (nil)

Redis 連線

Redis 伺服器可以接受多個連線。以下是 Redis 伺服器接受連線的範例:

orders_sync listening on: 127.0.0.1:6378
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:49240, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:33990, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:39752, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:49696, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:34414, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:34420, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:35464, fd: 4 }
New incoming connection from: TcpStream { addr: 127.0.0.1:6378, peer: 127.0.0.1:35472, fd: 4 }

內容解密:

以上範例展示瞭如何使用 Redis 進行資料儲存和刪除。使用 get 命令可以儲存資料,使用 del 命令可以刪除資料。Redis 伺服器可以接受多個連線,允許多個使用者同時存取資料。

圖表翻譯:

  graph LR
    A[使用者] -->|連線|> B[Redis 伺服器]
    B -->|儲存資料|> C[資料函式庫]
    C -->|刪除資料|> D[資料函式庫]
    D -->|檢查資料|> E[使用者]

以上圖表展示了使用者與 Redis 伺服器之間的互動。使用者可以連線到 Redis 伺服器,儲存和刪除資料,然後檢查資料是否存在。

非同步IO模型與Rust

非同步(Asynchronous)是一種不會在同一時間或速度下發生或完成的事件。非同步模型相較於同步模型(Synchronous)具有並發執行的優勢,可以使用非阻塞的執行緒。這是透過作業系統的輪詢(Polling)機制實作的。輪詢是一種連續檢查程式和其狀態的機制,但這種方法效率不高。因此,Linux等作業系統使用epoll,當程式狀態改變時會通知應用程式。

Rust透過Future特性實作非同步任務。Future代表了一個將來會完成的值,但不會立即完成。這是透過Future特性實作的:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

poll函式嘗試解析Future到最終值。如果值尚未可用,它會註冊當前任務以便在值可用時喚醒。這個函式傳回以下結果:

  • Poll::Pending:如果Future尚未準備好。
  • Poll::Ready(val):如果Future已經完成,傳回結果val

Future特性通常與futures函式庫一起使用,當我們需要使用它和其他與Future相關的特性,如StreamSink時,我們會使用這個函式庫。

從Mio到Tokio

作為開發人員,管理低階別的輪詢和從作業系統讀取事件很困難,幸好我們不需要自己實作這些功能。mio函式庫是一個低階別的IO函式庫,關注非阻塞API和事件通知,以構建高效能的IO應用程式。tokio函式庫結合了futuremio函式庫,是一個提供所有非同步功能的函式庫,包括工作竊取任務排程器、非阻塞API、TCP/UDP通訊端等。如果您正在進行非同步任務,很可能會使用tokio

從效能最佳化視角來看,本文探討了以 Rust 語言建構類 Redis 訂單系統的過程,涵蓋了 crate 的匯入、資料結構定義、核心方法實作、TCP 連線處理、Redis 命令解析及非同步 IO 模型的應用。透過分析程式碼片段,我們可以看到,利用 chronolazy_staticresp 等 crate 有效地管理了時間、資源初始化和 Redis 命令的編碼與解碼,提升了系統的效率。然而,目前的程式碼範例主要聚焦於 GET 命令的實作,SET 和 DEL 命令的處理邏輯尚未完整,這限制了系統的完整功能。此外,雖然使用了 Mutex 確保執行緒安全,但在高併發場景下,Mutex 的效能瓶頸仍需考量,可以考慮更細粒度的鎖或非鎖機制來最佳化。

展望未來,Rust 的非同步特性 Future 結合 tokio 這樣的執行期,為構建高效能網路應用提供了強大的工具。藉由 tokio 提供的工作竊取任務排程器和非阻塞 API,可以充分發揮 Rust 在非同步程式設計方面的優勢,進一步提升訂單系統的吞吐量和回應速度。隨著 Rust 生態的持續發展,預計會有更多高效的非同步函式庫和工具出現,為開發者提供更便捷的選擇。玄貓認為,Rust 在構建高效能、高可靠性的後端系統方面具有顯著優勢,值得技術團隊深入研究並應用於實際專案中,尤其是在對效能和穩定性要求嚴苛的場景下。