Rust 的非同步程式設計模型以 Future trait 為核心,掌握其運作機制對於開發高效能網路應用至關重要。本文首先示範如何建構自定義的 Future,並模擬檔案讀取與非同步計時器,解析 Poll::Ready 和 Poll::Pending 的狀態切換與 waker 的喚醒機制。接著,文章介紹 P2P 網路的基本概念,比較其與客戶端/伺服器模式的差異,並說明 P2P 的特性,如無需許可、容錯和抗審查。最後,文章以 libp2p 這個 Rust 的 P2P 函式庫為例,示範如何建構 P2P 應用程式,包含節點的建立、ping 命令交換和根據 Kademlia 協定的對等節點發現。
自定義非同步Future的實作與解析
在探討Rust中的非同步程式設計時,實作自定義的Future是理解其底層運作機制的關鍵步驟。本文將詳細介紹如何建立一個自定義的非同步Future,並對其核心程式碼進行深入解析。
初始程式碼修改與執行結果分析
首先,我們對之前的程式進行修改,使其poll()函式傳回一個有效的Poll::Ready值。修改後的src/main.rs程式碼如下:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
struct ReadFileFuture {}
impl Future for ReadFileFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Tokio! Stop polling me");
cx.waker().wake_by_ref();
Poll::Ready(String::from("Hello, there from file 1")) // 傳回有效的字串值
}
}
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let future1 = ReadFileFuture {};
println!("{:?}", future1.await);
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
// 模擬從檔案讀取內容的函式
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0)); // 修改為正確的Duration建立方式
String::from("Hello, there from file 2")
}
}
程式碼解密:
ReadFileFuture結構體實作了Future特性,其poll方法會被非同步執行器呼叫以檢查是否準備好傳回結果。- 直接傳回
Poll::Ready,表示該Future已經準備好並傳回一個字串值。 read_from_file2函式模擬了一個非同步的檔案讀取操作,透過sleep函式模擬耗時操作。tokio::spawn用於啟動非同步任務,而tokio::join!則用於等待多個非同步任務完成。
執行上述程式後,終端輸出如下:
Hello before reading file!
Tokio! Stop polling me
"Hello, there from file 1"
"Hello, there from file 2"
程式不再掛起,成功執行並完成了兩個非同步任務。
實作自定義非同步計時器Future
接下來,我們將實作一個代表非同步計時器的自定義Future。該計時器接受一個到期時間,並在到期時傳回一個字串值。
程式碼實作:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::{Duration, Instant};
struct AsyncTimer {
expiration_time: Instant,
}
impl Future for AsyncTimer {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.expiration_time {
println!("Hello, it's time for Future 1");
Poll::Ready(String::from("Future 1 has completed"))
} else {
println!("Hello, it's not yet time for Future 1. Going to sleep");
let waker = cx.waker().clone();
let expiration_time = self.expiration_time;
std::thread::spawn(move || {
let current_time = Instant::now();
if current_time < expiration_time {
std::thread::sleep(expiration_time - current_time);
}
waker.wake(); // 到期後喚醒執行器
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let h1 = tokio::spawn(async {
let future1 = AsyncTimer {
expiration_time: Instant::now() + Duration::from_millis(4000), // 設定4秒後到期
};
println!("{:?}", future1.await);
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
// 模擬從檔案讀取內容的函式
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
String::from("Future 2 has completed")
}
}
程式碼解密:
AsyncTimer結構體儲存了到期時間,並在poll方法中檢查是否到期。- 若未到期,則啟動一個新執行緒進行睡眠,並在到期後呼叫
waker.wake()通知執行器重新排程該任務。 poll方法傳回Poll::Pending,表示任務尚未完成,允許執行器排程其他任務。- 當計時器到期並傳回
Poll::Ready,表示任務完成並傳回結果。
執行上述程式後,輸出如下:
Hello, it's not yet time for Future 1. Going to sleep
"Future 2 has completed"
Hello, it's time for Future 1
"Future 1 has completed"
此輸出展示了兩個非同步任務的執行順序和結果。
建構P2P節點與非同步Rust程式設計
在前一章中,我們探討了非同步程式設計的基礎以及如何使用Rust撰寫非同步程式碼。本章將利用低階P2P網路函式庫和非同步程式設計技術,建構幾個簡單的點對點(P2P)應用程式範例。
為何學習P2P技術?
P2P是一種網路技術,能夠在不同電腦之間分享各種運算資源,如CPU、網路頻寬和儲存空間。這種技術常見於線上檔案分享,例如音樂、圖片和其他數位媒體。像BitTorrent和Gnutella等流行的檔案分享P2P應用程式,不依賴中央伺服器或中介來連線多個客戶端。最重要的是,它們利用使用者的電腦同時作為客戶端和伺服器,從而將運算負擔從中央伺服器上解除安裝。
P2P網路的基本概念
傳統的分散式系統通常採用客戶端/伺服器模式。一個典型的例子是網頁瀏覽器和網頁伺服器之間的互動,其中瀏覽器(客戶端)向伺服器請求資訊或運算。伺服器檢查客戶端的授權,並在許可的情況下完成請求。
P2P網路是另一類別分散式系統,其中一組節點(或對等節點)直接相互作用,共同提供某種服務,而無需中央協調者或管理員。IPFS、BitTorrent和區塊鏈網路(如比特幣和以太坊)都是典型的P2P系統。在P2P系統中,每個節點既可以是客戶端(向其他節點請求資訊),也可以是伺服器(儲存和檢索資料並回應客戶端請求)。
與客戶端/伺服器網路相比,P2P網路能夠支援一類別不同的應用,這些應用具有無需許可、容錯和抗審查的特性。
無需許可
在P2P網路中,由於資料和狀態在多個節點上複製,沒有伺服器能夠切斷客戶端對資訊的存取。
容錯
由於沒有單一的故障點(如中央伺服器),P2P網路具有更高的容錯能力。
抗審查
由於資料在多個節點上複製,相比於集中式伺服器上的資料,P2P網路中的資料更難被審查。
客戶端/伺服器與P2P網路的比較
圖11.1展示了客戶端/伺服器與P2P網路之間的差異。在P2P網路的上下文中,我們將交替使用「節點」和「對等節點」這兩個術語。
建構P2P系統的技術需求
建構P2P系統比建構傳統的客戶端/伺服器系統更為複雜。以下是建構P2P系統的一些關鍵技術需求:
傳輸層
每個P2P網路中的節點可以支援不同的協定,如HTTP(s)、TCP、UDP等。
使用libp2p建構P2P應用程式
本章將介紹如何使用libp2p,一個低階的P2P網路函式庫,來建構簡單的P2P應用程式。我們將探討libp2p的核心架構,並實作節點之間的ping命令交換和對等節點的發現。
libp2p簡介
libp2p是一個用於建構P2P應用程式的網路函式庫。它提供了一套完整的工具和協定,用於支援節點之間的通訊和資料交換。
libp2p的核心架構
libp2p的核心架構包括以下幾個關鍵元件:
傳輸層
libp2p支援多種傳輸協定,如TCP、UDP和WebSockets。
多路復用
libp2p支援多路復用技術,能夠在單一連線上同時處理多個資料流。
安全
libp2p提供了加密和身份驗證機制,以確保資料的安全性和完整性。
實作節點之間的ping命令交換
在本文中,我們將實作一個簡單的ping命令交換範例,以展示libp2p的基本用法。
use libp2p::{
identity,
PeerId,
Swarm,
NetworkBehaviour,
SwarmEvent,
};
use libp2p::ping::{Ping, PingConfig};
use libp2p::swarm::NetworkBehaviourEventProcess;
// 定義一個自定義的NetworkBehaviour
#[derive(NetworkBehaviour)]
struct MyBehaviour {
ping: Ping,
}
impl NetworkBehaviourEventProcess<PingEvent> for MyBehaviour {
fn inject_event(&mut self, event: PingEvent) {
match event {
PingEvent::PingSuccess { peer, .. } => {
println!("Ping成功:{:?}", peer);
}
PingEvent::PingFailure { peer, .. } => {
println!("Ping失敗:{:?}", peer);
}
}
}
}
fn main() {
// 建立一個新的Swarm例項
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
let mut swarm = Swarm::new(MyBehaviour {
ping: Ping::new(PingConfig::new()),
});
// 監聽本地地址
let listen_addr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
Swarm::listen_on(&mut swarm, listen_addr).unwrap();
// 連線到遠端節點
let remote_addr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
Swarm::dial(&mut swarm, remote_addr).unwrap();
// 處理Swarm事件
loop {
match swarm.next_event().await {
SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping_event)) => {
// 處理Ping事件
}
_ => {}
}
}
}
程式碼解析
上述範例展示瞭如何使用libp2p建立一個簡單的P2P應用程式,並實作節點之間的ping命令交換。以下是程式碼的關鍵部分:
- 我們定義了一個自定義的
NetworkBehaviour,其中包含了Ping行為。 - 我們建立了一個新的
Swarm例項,並將其組態為監聽本地地址。 - 我們連線到一個遠端節點,並處理來自
Swarm的事件。
發現對等節點
在本文中,我們將探討如何在P2P網路中發現對等節點。
對等節點發現機制
libp2p提供了多種對等節點發現機制,包括:
Kademlia
Kademlia是一種分散式的雜湊表(DHT),用於在P2P網路中發現對等節點。
mDNS
mDNS是一種根據DNS的對等節點發現機制,用於在本地網路中發現對等節點。
實作對等節點發現
use libp2p::{
identity,
PeerId,
Swarm,
NetworkBehaviour,
SwarmEvent,
};
use libp2p::kademlia::{Kademlia, KademliaConfig, KademliaEvent};
use libp2p::swarm::NetworkBehaviourEventProcess;
// 定義一個自定義的NetworkBehaviour
#[derive(NetworkBehaviour)]
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
}
impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
fn inject_event(&mut self, event: KademliaEvent) {
match event {
KademliaEvent::QueryResult { result, .. } => {
match result {
QueryResult::GetClosestPeers(result) => {
println!("取得最近的對等節點:{:?}", result);
}
_ => {}
}
}
_ => {}
}
}
}
fn main() {
// 建立一個新的Swarm例項
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
let store = MemoryStore::new(local_peer_id.clone());
let mut swarm = Swarm::new(MyBehaviour {
kademlia: Kademlia::with_config(local_peer_id, store, KademliaConfig::default()),
});
// 啟動Kademlia查詢
swarm.kademlia.bootstrap().unwrap();
// 處理Swarm事件
loop {
match swarm.next_event().await {
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(kademlia_event)) => {
// 處理Kademlia事件
}
_ => {}
}
}
}
程式碼解析
上述範例展示瞭如何使用libp2p的Kademlia機制來發現對等節點。以下是程式碼的關鍵部分:
- 我們定義了一個自定義的
NetworkBehaviour,其中包含了Kademlia行為。 - 我們建立了一個新的
Swarm例項,並將其組態為使用Kademlia機制。 - 我們啟動了一個Kademlia查詢,以取得最近的對等節點。