RESP 協定解析的實作核心在於準確地從緩衝區中提取資料並轉換成對應的資料結構。Bulk String 的解析流程包含長度提取和資料讀取,而 Array 型別的解析則需要遞迴處理內部的元素。伺服器端需要根據解析後的 RESP 資料執行對應的命令,例如 PING 命令的處理。完善的錯誤處理機制能提升伺服器的穩定性,例如針對解析錯誤和命令錯誤的處理。透過最佳化請求處理流程,可以提升伺服器的效能和反應速度。

RESP 協定解析實作

RESP Bulk String 解析實作

在 RESP 協定中,Bulk String 是一種重要的資料型別。解析 Bulk String 需要從緩衝區中提取資料長度,並根據該長度讀取相應的資料。

實作 resp_extract_length 函式

首先,我們需要實作 resp_extract_length 函式,用於從緩衝區中提取資料長度。

// src/resp.rs
pub fn resp_extract_length(buffer: &[u8], index: &mut usize) -> RESPResult<RESPLength> {
    let line: String = binary_extract_line_as_string(buffer, index)?;
    let length: RESPLength = line.parse()?;
    Ok(length)
}

此函式首先呼叫 binary_extract_line_as_string 從緩衝區中提取一行資料,並將其轉換為字串。然後,將該字串解析為 RESPLength 型別的值。

定義 RESPLength 型別

src/resp_result.rs 中定義 RESPLength 型別:

// src/resp_result.rs
type RESPLength = i32;

由於 Bulk String 的長度可能為 -1(表示 Null),因此使用 i32 型別來表示長度。

實作 parse_bulk_string 函式

接下來,實作 parse_bulk_string 函式,用於解析 Bulk String。

// src/resp.rs
fn parse_bulk_string(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
    resp_remove_type('$', buffer, index)?;
    let length = resp_extract_length(buffer, index)?;
    if length == -1 {
        return Ok(RESP::Null);
    }
    if length < -1 {
        return Err(RESPError::IncorrectLength(length));
    }
    let bytes = binary_extract_bytes(buffer, index, length as usize)?;
    let data: String = String::from_utf8(bytes)?;
    // Increment the index to skip the \r\n
    *index += 2;
    Ok(RESP::BulkString(data))
}

此函式首先檢查 Bulk String 的型別標記,然後提取資料長度。根據長度的不同,分別處理 Null 和非 Null 的情況。

相關測試

parse_bulk_string 函式編寫相關測試:

// src/resp.rs
#[cfg(test)]
mod tests {
    // ...
    #[test]
    fn test_parse_bulk_string() {
        let buffer = "$2\r\nOK\r\n".as_bytes();
        let mut index: usize = 0;
        let output = parse_bulk_string(buffer, &mut index).unwrap();
        assert_eq!(output, RESP::BulkString(String::from("OK")));
        assert_eq!(index, 8);
    }

    #[test]
    fn test_parse_bulk_string_empty() {
        let buffer = "$-1\r\n".as_bytes();
        let mut index: usize = 0;
        let output = parse_bulk_string(buffer, &mut index).unwrap();
        assert_eq!(output, RESP::Null);
        assert_eq!(index, 5);
    }

    // ...
}

RESP Array 解析實作

RESP Array 是另一種重要的資料型別。解析 Array 需要遞迴處理其內部的其他 RESP 型別。

更新 RESP 列舉

首先,更新 RESP 列舉以支援 Array 型別:

// src/resp.rs
#[derive(Debug, PartialEq)]
pub enum RESP {
    Array(Vec<RESP>),
    BulkString(String),
    Null,
    SimpleString(String),
}

實作 parse_array 函式

接下來,實作 parse_array 函式,用於解析 RESP Array。

// src/resp.rs
fn parse_array(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
    resp_remove_type('*', buffer, index)?;
    let length = resp_extract_length(buffer, index)?;
    if length == -1 {
        return Ok(RESP::Null);
    }
    if length < -1 {
        return Err(RESPError::IncorrectLength(length));
    }
    let mut array = Vec::new();
    for _ in 0..length {
        let elem = parser_router(buffer, index)?.call(buffer, index)?;
        array.push(elem);
    }
    Ok(RESP::Array(array))
}

此函式首先檢查 Array 的型別標記,然後提取 Array 的長度。根據長度的不同,分別處理 Null 和非 Null 的情況。

更新 parser_router 函式

更新 parser_router 函式,以支援 Array 型別的解析:

// src/resp.rs
fn parser_router(
    buffer: &[u8],
    index: &mut usize,
) -> Option<fn(&[u8], &mut usize) -> RESPResult<RESP>> {
    match buffer[*index] {
        b'+' => Some(parse_simple_string),
        b'$' => Some(parse_bulk_string),
        b'*' => Some(parse_array),
        _ => None,
    }
}

Redis 協定(RESP)實作與伺服器請求處理最佳化

在前一階段的基礎上,我們進一步完善了 RESP(Redis Serialization Protocol)協定的實作,並最佳化了伺服器對請求的處理流程。本章節將深入解析 parse_array 函式的實作、伺服器錯誤處理機制的建立,以及如何有效地處理 PING 命令。

RESP 陣列解析實作

parse_array 函式負責解析 RESP 協定中的陣列型別。該函式首先驗證輸入資料的型別是否正確,然後提取陣列的長度,並根據該長度逐一解析陣列中的每個元素。

fn parse_array(buffer: &[u8], index: &mut usize) -> RESPResult<RESP> {
    resp_remove_type('*', buffer, index)?;
    let length = resp_extract_length(buffer, index)?;
    if length < 0 {
        return Err(RESPError::IncorrectLength(length));
    }
    let mut data = Vec::new();
    for _ in 0..length {
        match parser_router(buffer, index) {
            Some(parse_func) => {
                let array_element: RESP = parse_func(buffer, index)?;
                data.push(array_element);
            }
            None => return Err(RESPError::Unknown),
        }
    }
    Ok(RESP::Array(data))
}

內容解密:

  1. resp_remove_type('*', buffer, index)?;:驗證輸入資料的第一個字元是否為 *,確認資料型別為陣列。
  2. let length = resp_extract_length(buffer, index)?;:提取陣列的長度資訊。
  3. if length < 0 { return Err(RESPError::IncorrectLength(length)); }:檢查陣列長度是否有效,無效則回傳錯誤。
  4. for _ in 0..length { ... }:迴圈解析陣列中的每個元素,並將結果存入 data 向量中。
  5. Ok(RESP::Array(data)):將解析完成的陣列以 RESP::Array 型別回傳。

相關測試案例確保了 parse_array 函式的正確性,例如:

#[test]
fn test_parse_array() {
    let buffer = "*2\r\n+OK\r\n$5\r\nVALUE\r\n".as_bytes();
    let mut index: usize = 0;
    let output = parse_array(buffer, &mut index).unwrap();
    assert_eq!(
        output,
        RESP::Array(vec![
            RESP::SimpleString(String::from("OK")),
            RESP::BulkString(String::from("VALUE"))
        ])
    );
    assert_eq!(index, 20);
}

伺服器錯誤處理機制

為提升伺服器的健壯性,我們在 src/server.rs 中定義了 ServerError 列舉和 ServerResult 型別,用於處理伺服器執行過程中可能出現的錯誤。

#[derive(Debug, PartialEq)]
pub enum ServerError {
    CommandError,
}

impl fmt::Display for ServerError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ServerError::CommandError => write!(f, "Error while processing!"),
        }
    }
}

pub type ServerResult<T> = Result<T, ServerError>;

內容解密:

  1. ServerError 列舉:定義了伺服器可能遇到的錯誤型別,目前包含 CommandError
  2. ServerResult 型別:簡化錯誤處理的結果型別,包裝 Result 使其傳回 ServerError

請求處理流程最佳化

我們引入了 process_request 函式來統一處理客戶端請求。該函式首先解析 RESP 請求,提取命令並執行對應的操作。

pub fn process_request(request: RESP) -> ServerResult<RESP> {
    let elements = match request {
        RESP::Array(v) => v,
        _ => return Err(ServerError::CommandError),
    };
    let mut command = Vec::new();
    for elem in elements.iter() {
        match elem {
            RESP::BulkString(v) => command.push(v),
            _ => return Err(ServerError::CommandError),
        }
    }
    match command[0].to_lowercase().as_str() {
        "ping" => Ok(RESP::SimpleString(String::from("PONG"))),
        _ => Err(ServerError::CommandError),
    }
}

內容解密:

  1. let elements = match request { ... }:檢查請求是否為陣列型別,非陣列請求直接回傳錯誤。
  2. for elem in elements.iter() { ... }:遍歷陣列元素,將 RESP::BulkString 轉換為命令字串。
  3. match command[0].to_lowercase().as_str() { ... }:根據命令的第一個字串決定執行的操作,目前僅支援 PING 命令。

連線處理更新

最後,我們修改了 handle_connection 函式,使其呼叫 process_request 處理客戶端請求,並將結果回傳給客戶端。

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    loop {
        match stream.read(&mut buffer).await {
            Ok(size) if size != 0 => {
                let mut index: usize = 0;
                let request = match bytes_to_resp(&buffer[..size].to_vec(), &mut index) {
                    Ok(v) => v,
                    Err(e) => {
                        eprintln!("Error: {}", e);
                        return;
                    }
                };
                let response = match process_request(request) {
                    Ok(v) => v,
                    Err(e) => {
                        eprintln!("Error parsing command: {}", e);
                        return;
                    }
                };
                if let Err(e) = stream.write_all(response.to_string().as_bytes()).await {
                    eprintln!("Error writing to socket: {}", e);
                }
            }
            // 其他情況處理...
        }
    }
}

內容解密:

  1. bytes_to_resp 解析請求:將接收到的位元組流解析為 RESP 請求。
  2. process_request 處理請求:根據解析後的請求執行對應的操作。
  3. stream.write_all 回傳結果:將處理結果寫回客戶端連線。