Rust 的非同步程式設計模型以 Future trait 為核心,掌握其運作機制對於開發高效能網路應用至關重要。本文首先示範如何建構自定義的 Future,並模擬檔案讀取與非同步計時器,解析 Poll::Ready 和 Poll::Pending 的狀態切換與 waker 的喚醒機制。接著,文章介紹 P2P 網路的基本概念,比較其與客戶端/伺服器模式的差異,並說明 P2P 的特性,如無需許可、容錯和抗審查。最後,文章以 libp2p 這個 Rust 的 P2P 函式庫為例,示範如何建構 P2P 應用程式,包含節點的建立、ping 命令交換和根據 Kademlia 協定的對等節點發現。

自定義非同步Future的實作與解析

在探討Rust中的非同步程式設計時,實作自定義的Future是理解其底層運作機制的關鍵步驟。本文將詳細介紹如何建立一個自定義的非同步Future,並對其核心程式碼進行深入解析。

初始程式碼修改與執行結果分析

首先,我們對之前的程式進行修改,使其poll()函式傳回一個有效的Poll::Ready值。修改後的src/main.rs程式碼如下:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;

struct ReadFileFuture {}

impl Future for ReadFileFuture {
    type Output = String;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("Tokio! Stop polling me");
        cx.waker().wake_by_ref();
        Poll::Ready(String::from("Hello, there from file 1")) // 傳回有效的字串值
    }
}

#[tokio::main]
async fn main() {
    println!("Hello before reading file!");
    let h1 = tokio::spawn(async {
        let future1 = ReadFileFuture {};
        println!("{:?}", future1.await);
    });
    let h2 = tokio::spawn(async {
        let file2_contents = read_from_file2().await;
        println!("{:?}", file2_contents);
    });
    let _ = tokio::join!(h1, h2);
}

// 模擬從檔案讀取內容的函式
fn read_from_file2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(2, 0)); // 修改為正確的Duration建立方式
        String::from("Hello, there from file 2")
    }
}

程式碼解密:

  1. ReadFileFuture結構體實作了Future特性,其poll方法會被非同步執行器呼叫以檢查是否準備好傳回結果。
  2. 直接傳回Poll::Ready,表示該Future已經準備好並傳回一個字串值。
  3. read_from_file2函式模擬了一個非同步的檔案讀取操作,透過sleep函式模擬耗時操作。
  4. tokio::spawn用於啟動非同步任務,而tokio::join!則用於等待多個非同步任務完成。

執行上述程式後,終端輸出如下:

Hello before reading file!
Tokio! Stop polling me
"Hello, there from file 1"
"Hello, there from file 2"

程式不再掛起,成功執行並完成了兩個非同步任務。

實作自定義非同步計時器Future

接下來,我們將實作一個代表非同步計時器的自定義Future。該計時器接受一個到期時間,並在到期時傳回一個字串值。

程式碼實作:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::{Duration, Instant};

struct AsyncTimer {
    expiration_time: Instant,
}

impl Future for AsyncTimer {
    type Output = String;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.expiration_time {
            println!("Hello, it's time for Future 1");
            Poll::Ready(String::from("Future 1 has completed"))
        } else {
            println!("Hello, it's not yet time for Future 1. Going to sleep");
            let waker = cx.waker().clone();
            let expiration_time = self.expiration_time;
            std::thread::spawn(move || {
                let current_time = Instant::now();
                if current_time < expiration_time {
                    std::thread::sleep(expiration_time - current_time);
                }
                waker.wake(); // 到期後喚醒執行器
            });
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let h1 = tokio::spawn(async {
        let future1 = AsyncTimer {
            expiration_time: Instant::now() + Duration::from_millis(4000), // 設定4秒後到期
        };
        println!("{:?}", future1.await);
    });
    let h2 = tokio::spawn(async {
        let file2_contents = read_from_file2().await;
        println!("{:?}", file2_contents);
    });
    let _ = tokio::join!(h1, h2);
}

// 模擬從檔案讀取內容的函式
fn read_from_file2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(2, 0));
        String::from("Future 2 has completed")
    }
}

程式碼解密:

  1. AsyncTimer結構體儲存了到期時間,並在poll方法中檢查是否到期。
  2. 若未到期,則啟動一個新執行緒進行睡眠,並在到期後呼叫waker.wake()通知執行器重新排程該任務。
  3. poll方法傳回Poll::Pending,表示任務尚未完成,允許執行器排程其他任務。
  4. 當計時器到期並傳回Poll::Ready,表示任務完成並傳回結果。

執行上述程式後,輸出如下:

Hello, it's not yet time for Future 1. Going to sleep
"Future 2 has completed"
Hello, it's time for Future 1
"Future 1 has completed"

此輸出展示了兩個非同步任務的執行順序和結果。

建構P2P節點與非同步Rust程式設計

在前一章中,我們探討了非同步程式設計的基礎以及如何使用Rust撰寫非同步程式碼。本章將利用低階P2P網路函式庫和非同步程式設計技術,建構幾個簡單的點對點(P2P)應用程式範例。

為何學習P2P技術?

P2P是一種網路技術,能夠在不同電腦之間分享各種運算資源,如CPU、網路頻寬和儲存空間。這種技術常見於線上檔案分享,例如音樂、圖片和其他數位媒體。像BitTorrent和Gnutella等流行的檔案分享P2P應用程式,不依賴中央伺服器或中介來連線多個客戶端。最重要的是,它們利用使用者的電腦同時作為客戶端和伺服器,從而將運算負擔從中央伺服器上解除安裝。

P2P網路的基本概念

傳統的分散式系統通常採用客戶端/伺服器模式。一個典型的例子是網頁瀏覽器和網頁伺服器之間的互動,其中瀏覽器(客戶端)向伺服器請求資訊或運算。伺服器檢查客戶端的授權,並在許可的情況下完成請求。

P2P網路是另一類別分散式系統,其中一組節點(或對等節點)直接相互作用,共同提供某種服務,而無需中央協調者或管理員。IPFS、BitTorrent和區塊鏈網路(如比特幣和以太坊)都是典型的P2P系統。在P2P系統中,每個節點既可以是客戶端(向其他節點請求資訊),也可以是伺服器(儲存和檢索資料並回應客戶端請求)。

與客戶端/伺服器網路相比,P2P網路能夠支援一類別不同的應用,這些應用具有無需許可、容錯和抗審查的特性。

無需許可

在P2P網路中,由於資料和狀態在多個節點上複製,沒有伺服器能夠切斷客戶端對資訊的存取。

容錯

由於沒有單一的故障點(如中央伺服器),P2P網路具有更高的容錯能力。

抗審查

由於資料在多個節點上複製,相比於集中式伺服器上的資料,P2P網路中的資料更難被審查。

客戶端/伺服器與P2P網路的比較

圖11.1展示了客戶端/伺服器與P2P網路之間的差異。在P2P網路的上下文中,我們將交替使用「節點」和「對等節點」這兩個術語。

建構P2P系統的技術需求

建構P2P系統比建構傳統的客戶端/伺服器系統更為複雜。以下是建構P2P系統的一些關鍵技術需求:

  • 傳輸層

    每個P2P網路中的節點可以支援不同的協定,如HTTP(s)、TCP、UDP等。

使用libp2p建構P2P應用程式

本章將介紹如何使用libp2p,一個低階的P2P網路函式庫,來建構簡單的P2P應用程式。我們將探討libp2p的核心架構,並實作節點之間的ping命令交換和對等節點的發現。

libp2p簡介

libp2p是一個用於建構P2P應用程式的網路函式庫。它提供了一套完整的工具和協定,用於支援節點之間的通訊和資料交換。

libp2p的核心架構

libp2p的核心架構包括以下幾個關鍵元件:

  • 傳輸層

    libp2p支援多種傳輸協定,如TCP、UDP和WebSockets。

  • 多路復用

    libp2p支援多路復用技術,能夠在單一連線上同時處理多個資料流。

  • 安全

    libp2p提供了加密和身份驗證機制,以確保資料的安全性和完整性。

實作節點之間的ping命令交換

在本文中,我們將實作一個簡單的ping命令交換範例,以展示libp2p的基本用法。

use libp2p::{
    identity,
    PeerId,
    Swarm,
    NetworkBehaviour,
    SwarmEvent,
};
use libp2p::ping::{Ping, PingConfig};
use libp2p::swarm::NetworkBehaviourEventProcess;

// 定義一個自定義的NetworkBehaviour
#[derive(NetworkBehaviour)]
struct MyBehaviour {
    ping: Ping,
}

impl NetworkBehaviourEventProcess<PingEvent> for MyBehaviour {
    fn inject_event(&mut self, event: PingEvent) {
        match event {
            PingEvent::PingSuccess { peer, .. } => {
                println!("Ping成功:{:?}", peer);
            }
            PingEvent::PingFailure { peer, .. } => {
                println!("Ping失敗:{:?}", peer);
            }
        }
    }
}

fn main() {
    // 建立一個新的Swarm例項
    let local_key = identity::Keypair::generate_ed25519();
    let local_peer_id = PeerId::from(local_key.public());
    let mut swarm = Swarm::new(MyBehaviour {
        ping: Ping::new(PingConfig::new()),
    });

    // 監聽本地地址
    let listen_addr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
    Swarm::listen_on(&mut swarm, listen_addr).unwrap();

    // 連線到遠端節點
    let remote_addr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap();
    Swarm::dial(&mut swarm, remote_addr).unwrap();

    // 處理Swarm事件
    loop {
        match swarm.next_event().await {
            SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping_event)) => {
                // 處理Ping事件
            }
            _ => {}
        }
    }
}

程式碼解析

上述範例展示瞭如何使用libp2p建立一個簡單的P2P應用程式,並實作節點之間的ping命令交換。以下是程式碼的關鍵部分:

  • 我們定義了一個自定義的NetworkBehaviour,其中包含了Ping行為。
  • 我們建立了一個新的Swarm例項,並將其組態為監聽本地地址。
  • 我們連線到一個遠端節點,並處理來自Swarm的事件。

發現對等節點

在本文中,我們將探討如何在P2P網路中發現對等節點。

對等節點發現機制

libp2p提供了多種對等節點發現機制,包括:

  • Kademlia

    Kademlia是一種分散式的雜湊表(DHT),用於在P2P網路中發現對等節點。

  • mDNS

    mDNS是一種根據DNS的對等節點發現機制,用於在本地網路中發現對等節點。

實作對等節點發現

use libp2p::{
    identity,
    PeerId,
    Swarm,
    NetworkBehaviour,
    SwarmEvent,
};
use libp2p::kademlia::{Kademlia, KademliaConfig, KademliaEvent};
use libp2p::swarm::NetworkBehaviourEventProcess;

// 定義一個自定義的NetworkBehaviour
#[derive(NetworkBehaviour)]
struct MyBehaviour {
    kademlia: Kademlia<MemoryStore>,
}

impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
    fn inject_event(&mut self, event: KademliaEvent) {
        match event {
            KademliaEvent::QueryResult { result, .. } => {
                match result {
                    QueryResult::GetClosestPeers(result) => {
                        println!("取得最近的對等節點:{:?}", result);
                    }
                    _ => {}
                }
            }
            _ => {}
        }
    }
}

fn main() {
    // 建立一個新的Swarm例項
    let local_key = identity::Keypair::generate_ed25519();
    let local_peer_id = PeerId::from(local_key.public());
    let store = MemoryStore::new(local_peer_id.clone());
    let mut swarm = Swarm::new(MyBehaviour {
        kademlia: Kademlia::with_config(local_peer_id, store, KademliaConfig::default()),
    });

    // 啟動Kademlia查詢
    swarm.kademlia.bootstrap().unwrap();

    // 處理Swarm事件
    loop {
        match swarm.next_event().await {
            SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(kademlia_event)) => {
                // 處理Kademlia事件
            }
            _ => {}
        }
    }
}

程式碼解析

上述範例展示瞭如何使用libp2p的Kademlia機制來發現對等節點。以下是程式碼的關鍵部分:

  • 我們定義了一個自定義的NetworkBehaviour,其中包含了Kademlia行為。
  • 我們建立了一個新的Swarm例項,並將其組態為使用Kademlia機制。
  • 我們啟動了一個Kademlia查詢,以取得最近的對等節點。