在分散式快取系統中,鍵值到期是一個重要的功能。本文將會詳細介紹如何在 Rust 語言中,根據已有的鍵值儲存系統,實作類別似 Redis 的鍵值到期機制。此機制包含兩種策略:被動到期,在存取鍵值時檢查是否過期;以及主動到期,定期掃描並刪除過期鍵值。實作上,我們使用 StorageData 結構體儲存鍵值資料,包含值、建立時間和到期時間等資訊,並使用 HashMap 來儲存具有到期時間的鍵值,以提升查詢效率。為了實作主動過期策略,我們使用 Tokio 的 time::interval 函式,定時觸發清理過期鍵值的函式。程式碼中也包含了詳細的測試案例,用於驗證實作的正確性,並確保在各種情況下都能正常運作。
第四章:鍵值到期機制
在 Redis 中,鍵值的到期機制是一個重要的功能。本章將探討如何實作鍵值的到期機制,包括被動到期和主動到期兩種方式。
被動到期與主動到期
Redis 支援兩種鍵值到期機制:被動到期和主動到期。被動到期是指在檢索鍵值時檢查其是否已到期,如果已到期則刪除該鍵值。主動到期則是指伺服器定期掃描鍵值並刪除已到期的鍵值。
從使用者的角度來看,這兩種機制的效果相同:具有明確到期時間的鍵值最終會被刪除。在本章結束時,我們將能夠傳送類別似 SET answer 42 EX 5 的命令,並在 5 秒後看到鍵值 answer 消失。
實作步驟
步驟 1:新增建立時間和到期時間
首先,我們需要在儲存的資料中新增建立時間和到期時間。為此,我們定義了一個名為 StorageData 的結構體,用於表示儲存的資料。
use std::time::{Duration, SystemTime};
#[derive(Debug)]
pub struct StorageData {
pub value: StorageValue,
pub creation_time: SystemTime,
pub expiry: Option<Duration>,
}
接下來,我們實作了 add_expiry 方法和從 String 建立 StorageData 的函式。
impl StorageData {
pub fn add_expiry(&mut self, expiry: Duration) {
self.expiry = Some(expiry);
}
}
impl From<String> for StorageData {
fn from(s: String) -> StorageData {
StorageData {
value: StorageValue::String(s),
creation_time: SystemTime::now(),
expiry: None,
}
}
}
步驟 2:新增到期支援
現在,我們需要在 Storage 結構體中新增對到期的支援。我們建立了一個名為 expire_keys 的函式,該函式會定期被觸發,以決定是否刪除已到期的鍵值。
為了加快該操作的執行速度,我們維護了一個單獨的 HashMap,用於記錄具有到期時間的鍵值。
pub struct Storage {
store: HashMap<String, StorageData>,
expiry: HashMap<String, SystemTime>,
active_expiry: bool,
}
程式碼解析
impl Storage {
pub fn new() -> Self {
let store: HashMap<String, StorageData> = HashMap::new();
Self {
store: store,
expiry: HashMap::<String, SystemTime>::new(),
active_expiry: true,
}
}
}
在上述程式碼中,我們初始化了 Storage 結構體中的新欄位。
#### 內容解密:
expiry欄位是用於記錄具有到期時間的鍵值及其對應的到期時間。active_expiry欄位是用於控制主動到期機制的開關。- 在
new方法中,我們初始化了store、expiry和active_expiry欄位。
實作鍵值過期機制
在前一章的基礎上,我們進一步實作了鍵值過期的功能。本章節將詳細介紹如何實作鍵值的過期機制,以及如何定期執行清理過期鍵值的任務。
鍵值過期機制的實作
首先,我們需要在 Storage 結構體中新增一個 expiry 欄位,用於儲存鍵值的過期時間。同時,我們也需要新增一個 active_expiry 欄位,用於控制是否啟用鍵值過期機制。
// src/storage.rs
pub struct Storage {
store: HashMap<String, String>,
expiry: HashMap<String, SystemTime>,
active_expiry: bool,
}
測試案例更新
為了驗證 Storage 結構體的新功能,我們需要更新測試案例。
// src/storage.rs
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_new() {
let storage: Storage = Storage::new();
assert_eq!(storage.store.len(), 0);
assert_eq!(storage.expiry.len(), 0);
assert_eq!(storage.expiry, HashMap::<String, SystemTime>::new());
assert!(storage.active_expiry);
}
}
實作鍵值過期清理功能
接下來,我們需要實作兩個方法:set_active_expiry 和 expire_keys。前者用於控制是否啟用鍵值過期機制,後者則是用於清理過期的鍵值。
// src/storage.rs
impl Storage {
pub fn set_active_expiry(&mut self, value: bool) {
self.active_expiry = value;
}
pub fn expire_keys(&mut self) {
if !self.active_expiry {
return;
}
let now = SystemTime::now();
let expired_keys: Vec<String> = self
.expiry
.iter()
.filter_map(|(key, &value)| if value < now { Some(key.clone()) } else { None })
.collect();
for k in expired_keys {
self.store.remove(&k);
self.expiry.remove(&k);
}
}
}
測試過期鍵值清理
為了驗證 expire_keys 方法的正確性,我們需要新增測試案例。
// src/storage.rs
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_expire_keys() {
let mut storage: Storage = Storage::new();
storage
.set(String::from("akey"), String::from("avalue"))
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 0);
}
#[test]
fn test_expire_keys_deactivated() {
let mut storage: Storage = Storage::new();
storage.set_active_expiry(false);
storage
.set(String::from("akey"), String::from("avalue"))
.unwrap();
storage.expiry.insert(
String::from("akey"),
SystemTime::now() - Duration::from_secs(5),
);
storage.expire_keys();
assert_eq!(storage.store.len(), 1);
}
}
定期執行過期鍵值清理
為了定期執行 expire_keys 方法,我們需要使用 Tokio 的 time::interval 函式建立一個定時器。
// src/main.rs
async fn expire_keys(storage: Arc<Mutex<Storage>>) {
let mut guard = storage.lock().unwrap();
guard.expire_keys();
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6379").await?;
let storage = Arc::new(Mutex::new(Storage::new()));
let mut interval_timer = tokio::time::interval(Duration::from_millis(10));
loop {
tokio::select! {
connection = listener.accept() => {
match connection {
Ok((stream, _)) => {
tokio::spawn(handle_connection(stream, storage.clone()));
}
Err(e) => {
println!("Error: {}", e);
continue;
}
}
}
_ = interval_timer.tick() => {
tokio::spawn(expire_keys(storage.clone()));
}
}
}
}
注意事項
值得注意的是,expire_keys 方法可能會執行很長時間,因此可能會阻塞整個非同步系統。在生產環境中,我們需要考慮最佳化這個問題,例如每次只清理一定數量的過期鍵值。
第四章:鍵值到期機制(Key Expiry)實作解析
在 Redis 中,鍵值的到期機制是一項重要的功能。本章節將探討如何為我們的鍵值儲存系統實作到期機制,主要聚焦於 SET 命令的引數解析。
步驟四:設定引數(SET Parameters)
為了支援鍵值的到期機制,我們需要修改 SET 命令的行為。由於 SET 命令支援多種引數,我們需要設計一個獨立的函式來解析這些引數。
設計引數解析結構
首先,我們在 src/set.rs 中定義了幾個列舉(enums)和結構(struct)來表示 SET 命令的引數選項:
#[derive(Debug, PartialEq)]
pub enum KeyExistence {
NX,
XX,
}
#[derive(Debug, PartialEq)]
pub enum KeyExpiry {
EX(u64),
PX(u64),
}
#[derive(Debug, PartialEq)]
pub struct SetArgs {
pub expiry: Option<KeyExpiry>,
pub existence: Option<KeyExistence>,
pub get: bool,
}
impl SetArgs {
pub fn new() -> Self {
SetArgs {
expiry: None,
existence: None,
get: false,
}
}
}
實作引數解析函式
接下來,我們實作了 parse_set_arguments 函式來解析 SET 命令的引數:
pub fn parse_set_arguments(arguments: &Vec<String>) -> StorageResult<SetArgs> {
let mut args = SetArgs::new();
let mut idx: usize = 0;
loop {
if idx >= arguments.len() {
break;
}
match arguments[idx].to_lowercase().as_str() {
"nx" => {
if args.existence == Some(KeyExistence::XX) {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
args.existence = Some(KeyExistence::NX);
idx += 1;
}
"xx" => {
if args.existence == Some(KeyExistence::NX) {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
args.existence = Some(KeyExistence::XX);
idx += 1;
}
"get" => {
args.get = true;
idx += 1;
}
_ => {
return Err(StorageError::CommandSyntaxError(arguments.join(" ")));
}
}
}
Ok(args)
}
解析邏輯詳細解說
- 迴圈遍歷引數:我們使用
loop來遍歷輸入的引數向量,並使用索引idx來追蹤目前的位置。 - 匹配引數:對於每個引數,我們使用
match陳述式來檢查它是否符合支援的選項(如NX、XX、GET)。 - 處理互斥選項:對於互斥的選項(如
NX和XX),我們會檢查是否已經設定了對方的選項,如果是,則傳回語法錯誤。 - 更新引數結構:根據匹配的結果,我們更新
SetArgs結構中的相應欄位。
測試案例驗證
為了確保 parse_set_arguments 函式的正確性,我們編寫了一系列的測試案例:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_nx() {
let commands: Vec<String> = vec![String::from("NX")];
let args = parse_set_arguments(&commands).unwrap();
assert_eq!(args.existence, Some(KeyExistence::NX));
}
#[test]
fn test_parse_xx_and_nx() {
let commands: Vec<String> = vec![String::from("XX"), String::from("NX")];
assert!(matches!(
parse_set_arguments(&commands),
Err(StorageError::CommandSyntaxError(_))
));
}
#[test]
fn test_parse_get() {
let commands: Vec<String> = vec![String::from("GET")];
let args = parse_set_arguments(&commands).unwrap();
assert!(args.get);
}
}
這些測試案例涵蓋了不同的場景,包括單一選項、多個選項的組合、以及互斥選項的錯誤情況。