在分散式快取系統中,鍵值到期是一個重要的功能。本文將會詳細介紹如何在 Rust 語言中,根據已有的鍵值儲存系統,實作類別似 Redis 的鍵值到期機制。此機制包含兩種策略:被動到期,在存取鍵值時檢查是否過期;以及主動到期,定期掃描並刪除過期鍵值。實作上,我們使用 StorageData 結構體儲存鍵值資料,包含值、建立時間和到期時間等資訊,並使用 HashMap 來儲存具有到期時間的鍵值,以提升查詢效率。為了實作主動過期策略,我們使用 Tokio 的 time::interval 函式,定時觸發清理過期鍵值的函式。程式碼中也包含了詳細的測試案例,用於驗證實作的正確性,並確保在各種情況下都能正常運作。

第四章:鍵值到期機制

在 Redis 中,鍵值的到期機制是一個重要的功能。本章將探討如何實作鍵值的到期機制,包括被動到期和主動到期兩種方式。

被動到期與主動到期

Redis 支援兩種鍵值到期機制:被動到期和主動到期。被動到期是指在檢索鍵值時檢查其是否已到期,如果已到期則刪除該鍵值。主動到期則是指伺服器定期掃描鍵值並刪除已到期的鍵值。

從使用者的角度來看,這兩種機制的效果相同:具有明確到期時間的鍵值最終會被刪除。在本章結束時,我們將能夠傳送類別似 SET answer 42 EX 5 的命令,並在 5 秒後看到鍵值 answer 消失。

實作步驟

步驟 1:新增建立時間和到期時間

首先,我們需要在儲存的資料中新增建立時間和到期時間。為此,我們定義了一個名為 StorageData 的結構體,用於表示儲存的資料。

use std::time::{Duration, SystemTime};

#[derive(Debug)]
pub struct StorageData {
    pub value: StorageValue,
    pub creation_time: SystemTime,
    pub expiry: Option<Duration>,
}

接下來,我們實作了 add_expiry 方法和從 String 建立 StorageData 的函式。

impl StorageData {
    pub fn add_expiry(&mut self, expiry: Duration) {
        self.expiry = Some(expiry);
    }
}

impl From<String> for StorageData {
    fn from(s: String) -> StorageData {
        StorageData {
            value: StorageValue::String(s),
            creation_time: SystemTime::now(),
            expiry: None,
        }
    }
}

步驟 2:新增到期支援

現在,我們需要在 Storage 結構體中新增對到期的支援。我們建立了一個名為 expire_keys 的函式,該函式會定期被觸發,以決定是否刪除已到期的鍵值。

為了加快該操作的執行速度,我們維護了一個單獨的 HashMap,用於記錄具有到期時間的鍵值。

pub struct Storage {
    store: HashMap<String, StorageData>,
    expiry: HashMap<String, SystemTime>,
    active_expiry: bool,
}

程式碼解析

impl Storage {
    pub fn new() -> Self {
        let store: HashMap<String, StorageData> = HashMap::new();
        Self {
            store: store,
            expiry: HashMap::<String, SystemTime>::new(),
            active_expiry: true,
        }
    }
}

在上述程式碼中,我們初始化了 Storage 結構體中的新欄位。

#### 內容解密:

  1. expiry 欄位是用於記錄具有到期時間的鍵值及其對應的到期時間。
  2. active_expiry 欄位是用於控制主動到期機制的開關。
  3. new 方法中,我們初始化了 storeexpiryactive_expiry 欄位。

實作鍵值過期機制

在前一章的基礎上,我們進一步實作了鍵值過期的功能。本章節將詳細介紹如何實作鍵值的過期機制,以及如何定期執行清理過期鍵值的任務。

鍵值過期機制的實作

首先,我們需要在 Storage 結構體中新增一個 expiry 欄位,用於儲存鍵值的過期時間。同時,我們也需要新增一個 active_expiry 欄位,用於控制是否啟用鍵值過期機制。

// src/storage.rs
pub struct Storage {
    store: HashMap<String, String>,
    expiry: HashMap<String, SystemTime>,
    active_expiry: bool,
}

測試案例更新

為了驗證 Storage 結構體的新功能,我們需要更新測試案例。

// src/storage.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_create_new() {
        let storage: Storage = Storage::new();
        assert_eq!(storage.store.len(), 0);
        assert_eq!(storage.expiry.len(), 0);
        assert_eq!(storage.expiry, HashMap::<String, SystemTime>::new());
        assert!(storage.active_expiry);
    }
}

實作鍵值過期清理功能

接下來,我們需要實作兩個方法:set_active_expiryexpire_keys。前者用於控制是否啟用鍵值過期機制,後者則是用於清理過期的鍵值。

// src/storage.rs
impl Storage {
    pub fn set_active_expiry(&mut self, value: bool) {
        self.active_expiry = value;
    }

    pub fn expire_keys(&mut self) {
        if !self.active_expiry {
            return;
        }
        let now = SystemTime::now();
        let expired_keys: Vec<String> = self
            .expiry
            .iter()
            .filter_map(|(key, &value)| if value < now { Some(key.clone()) } else { None })
            .collect();

        for k in expired_keys {
            self.store.remove(&k);
            self.expiry.remove(&k);
        }
    }
}

測試過期鍵值清理

為了驗證 expire_keys 方法的正確性,我們需要新增測試案例。

// src/storage.rs
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_expire_keys() {
        let mut storage: Storage = Storage::new();
        storage
            .set(String::from("akey"), String::from("avalue"))
            .unwrap();
        storage.expiry.insert(
            String::from("akey"),
            SystemTime::now() - Duration::from_secs(5),
        );
        storage.expire_keys();
        assert_eq!(storage.store.len(), 0);
    }

    #[test]
    fn test_expire_keys_deactivated() {
        let mut storage: Storage = Storage::new();
        storage.set_active_expiry(false);
        storage
            .set(String::from("akey"), String::from("avalue"))
            .unwrap();
        storage.expiry.insert(
            String::from("akey"),
            SystemTime::now() - Duration::from_secs(5),
        );
        storage.expire_keys();
        assert_eq!(storage.store.len(), 1);
    }
}

定期執行過期鍵值清理

為了定期執行 expire_keys 方法,我們需要使用 Tokio 的 time::interval 函式建立一個定時器。

// src/main.rs
async fn expire_keys(storage: Arc<Mutex<Storage>>) {
    let mut guard = storage.lock().unwrap();
    guard.expire_keys();
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    let storage = Arc::new(Mutex::new(Storage::new()));
    let mut interval_timer = tokio::time::interval(Duration::from_millis(10));

    loop {
        tokio::select! {
            connection = listener.accept() => {
                match connection {
                    Ok((stream, _)) => {
                        tokio::spawn(handle_connection(stream, storage.clone()));
                    }
                    Err(e) => {
                        println!("Error: {}", e);
                        continue;
                    }
                }
            }
            _ = interval_timer.tick() => {
                tokio::spawn(expire_keys(storage.clone()));
            }
        }
    }
}

注意事項

值得注意的是,expire_keys 方法可能會執行很長時間,因此可能會阻塞整個非同步系統。在生產環境中,我們需要考慮最佳化這個問題,例如每次只清理一定數量的過期鍵值。

第四章:鍵值到期機制(Key Expiry)實作解析

在 Redis 中,鍵值的到期機制是一項重要的功能。本章節將探討如何為我們的鍵值儲存系統實作到期機制,主要聚焦於 SET 命令的引數解析。

步驟四:設定引數(SET Parameters)

為了支援鍵值的到期機制,我們需要修改 SET 命令的行為。由於 SET 命令支援多種引數,我們需要設計一個獨立的函式來解析這些引數。

設計引數解析結構

首先,我們在 src/set.rs 中定義了幾個列舉(enums)和結構(struct)來表示 SET 命令的引數選項:

#[derive(Debug, PartialEq)]
pub enum KeyExistence {
    NX,
    XX,
}

#[derive(Debug, PartialEq)]
pub enum KeyExpiry {
    EX(u64),
    PX(u64),
}

#[derive(Debug, PartialEq)]
pub struct SetArgs {
    pub expiry: Option<KeyExpiry>,
    pub existence: Option<KeyExistence>,
    pub get: bool,
}

impl SetArgs {
    pub fn new() -> Self {
        SetArgs {
            expiry: None,
            existence: None,
            get: false,
        }
    }
}

實作引數解析函式

接下來,我們實作了 parse_set_arguments 函式來解析 SET 命令的引數:

pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
    let mut args = SetArgs::new();
    let mut idx: usize = 0;
    
    loop {
        if idx >= arguments.len() {
            break;
        }
        
        match arguments[idx].to_lowercase().as_str() {
            "nx" => {
                if args.existence == Some(KeyExistence::XX) {
                    return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
                }
                args.existence = Some(KeyExistence::NX);
                idx += 1;
            }
            "xx" => {
                if args.existence == Some(KeyExistence::NX) {
                    return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
                }
                args.existence = Some(KeyExistence::XX);
                idx += 1;
            }
            "get" => {
                args.get = true;
                idx += 1;
            }
            _ => {
                return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
            }
        }
    }
    
    Ok(args)
}

解析邏輯詳細解說

  1. 迴圈遍歷引數:我們使用 loop 來遍歷輸入的引數向量,並使用索引 idx 來追蹤目前的位置。
  2. 匹配引數:對於每個引數,我們使用 match 陳述式來檢查它是否符合支援的選項(如 NXXXGET)。
  3. 處理互斥選項:對於互斥的選項(如 NXXX),我們會檢查是否已經設定了對方的選項,如果是,則傳回語法錯誤。
  4. 更新引數結構:根據匹配的結果,我們更新 SetArgs 結構中的相應欄位。

測試案例驗證

為了確保 parse_set_arguments 函式的正確性,我們編寫了一系列的測試案例:

#[cfg(test)]
mod tests {
    use super::*;
    
    #[test]
    fn test_parse_nx() {
        let commands: Vec<String> = vec![String::from("NX")];
        let args = parse_set_arguments(&commands).unwrap();
        assert_eq!(args.existence, Some(KeyExistence::NX));
    }
    
    #[test]
    fn test_parse_xx_and_nx() {
        let commands: Vec<String> = vec![String::from("XX"), String::from("NX")];
        assert!(matches!(
            parse_set_arguments(&commands),
            Err(StorageError::CommandSyntaxError(_))
        ));
    }
    
    #[test]
    fn test_parse_get() {
        let commands: Vec<String> = vec![String::from("GET")];
        let args = parse_set_arguments(&commands).unwrap();
        assert!(args.get);
    }
}

這些測試案例涵蓋了不同的場景,包括單一選項、多個選項的組合、以及互斥選項的錯誤情況。