Skip to content

Enhance Testing Infrastructure with Comprehensive Mock Network #62

@thep2p

Description

@thep2p

Overview

The Go implementation has a comprehensive testing infrastructure in the unittest/mocknet package that provides powerful tools for testing distributed skip graph behavior. The Rust implementation needs similar testing capabilities for thorough testing of network interactions and distributed algorithms.

Background

Reference implementation: skipgraph-go/unittest/mocknet

The testing infrastructure provides:

  • Network simulation without real sockets
  • Deterministic message delivery
  • Failure injection capabilities
  • Network topology simulation

Requirements

1. Mock Network Core

use tokio::sync::{mpsc, RwLock};
use std::collections::HashMap;
use std::sync::Arc;

/// Simulated network for testing
pub struct MockNetwork {
    nodes: Arc<RwLock<HashMap<Identifier, MockNode>>>,
    underlay: Arc<MockUnderlay>,
    config: MockNetworkConfig,
}

impl MockNetwork {
    pub fn new(config: MockNetworkConfig) -> Self {
        Self {
            nodes: Arc::new(RwLock::new(HashMap::new())),
            underlay: Arc::new(MockUnderlay::new()),
            config,
        }
    }
    
    /// Create a new node in the network
    pub async fn create_node(&self, id: Identifier) -> Arc<MockNode> {
        let node = Arc::new(MockNode::new(id.clone(), self.underlay.clone()));
        self.nodes.write().await.insert(id, node.clone());
        node
    }
    
    /// Remove a node from the network
    pub async fn remove_node(&self, id: &Identifier) {
        self.nodes.write().await.remove(id);
        self.underlay.disconnect_node(id).await;
    }
    
    /// Simulate network partition
    pub async fn partition(&self, group1: Vec<Identifier>, group2: Vec<Identifier>) {
        self.underlay.create_partition(group1, group2).await;
    }
    
    /// Heal network partition
    pub async fn heal_partition(&self) {
        self.underlay.clear_partitions().await;
    }
    
    /// Get network statistics
    pub async fn stats(&self) -> NetworkStats {
        NetworkStats {
            nodes: self.nodes.read().await.len(),
            messages_sent: self.underlay.message_count().await,
            bytes_transferred: self.underlay.bytes_transferred().await,
        }
    }
}

#[derive(Debug, Clone)]
pub struct MockNetworkConfig {
    pub latency_ms: Option<u64>,
    pub packet_loss_rate: f64,
    pub bandwidth_limit: Option<usize>,
    pub enable_ordering: bool,
}

2. Mock Underlay (Network Simulation Layer)

/// Simulates the underlying network transport
pub struct MockUnderlay {
    connections: Arc<RwLock<HashMap<(Identifier, Identifier), MockConnection>>>,
    partitions: Arc<RwLock<Vec<(HashSet<Identifier>, HashSet<Identifier>)>>>,
    stats: Arc<RwLock<UnderlayStats>>,
}

impl MockUnderlay {
    /// Send a message between nodes
    pub async fn send(
        &self,
        from: Identifier,
        to: Identifier,
        data: Vec<u8>,
    ) -> Result<(), MockNetworkError> {
        // Check for network partition
        if self.is_partitioned(&from, &to).await {
            return Err(MockNetworkError::Partitioned);
        }
        
        // Get or create connection
        let conn = self.get_connection(from.clone(), to.clone()).await?;
        
        // Simulate network conditions
        self.apply_network_conditions(&conn).await;
        
        // Deliver message
        conn.deliver(data).await;
        
        // Update stats
        self.stats.write().await.messages_sent += 1;
        
        Ok(())
    }
    
    async fn apply_network_conditions(&self, conn: &MockConnection) {
        // Simulate latency
        if let Some(latency) = conn.config.latency_ms {
            tokio::time::sleep(Duration::from_millis(latency)).await;
        }
        
        // Simulate packet loss
        if rand::random::<f64>() < conn.config.packet_loss_rate {
            // Drop the packet
            return;
        }
    }
}

3. Mock Node Implementation

pub struct MockNode {
    id: Identifier,
    underlay: Arc<MockUnderlay>,
    processors: Arc<RwLock<HashMap<Channel, Arc<dyn MessageProcessor>>>>,
    inbox: mpsc::Receiver<MockMessage>,
    outbox: mpsc::Sender<MockMessage>,
}

impl MockNode {
    pub fn new(id: Identifier, underlay: Arc<MockUnderlay>) -> Self {
        let (tx, rx) = mpsc::channel(1000);
        Self {
            id,
            underlay,
            processors: Arc::new(RwLock::new(HashMap::new())),
            inbox: rx,
            outbox: tx,
        }
    }
}

#[async_trait]
impl Network for MockNode {
    async fn register(
        &self,
        channel: Channel,
        processor: Arc<dyn MessageProcessor>
    ) -> Result<Arc<dyn Conduit>, NetworkError> {
        let mut processors = self.processors.write().await;
        
        if processors.contains_key(&channel) {
            return Err(NetworkError::ProcessorAlreadyRegistered(channel));
        }
        
        processors.insert(channel, processor);
        
        Ok(Arc::new(MockConduit {
            channel,
            node_id: self.id.clone(),
            underlay: self.underlay.clone(),
        }))
    }
    
    fn local_id(&self) -> &Identifier {
        &self.id
    }
}

4. Mock Conduit

pub struct MockConduit {
    channel: Channel,
    node_id: Identifier,
    underlay: Arc<MockUnderlay>,
}

#[async_trait]
impl Conduit for MockConduit {
    async fn send(&self, destination: Identifier, message: Message) -> Result<(), ConduitError> {
        let envelope = MockMessage {
            from: self.node_id.clone(),
            to: destination.clone(),
            channel: self.channel,
            payload: message,
            timestamp: SystemTime::now(),
        };
        
        let data = bincode::serialize(&envelope)
            .map_err(|e| ConduitError::SerializationError(e))?;
        
        self.underlay.send(self.node_id.clone(), destination, data).await
            .map_err(|e| ConduitError::NetworkError(e))?;
        
        Ok(())
    }
    
    fn channel(&self) -> Channel {
        self.channel
    }
}

5. Test Utilities

pub mod test_utils {
    use super::*;
    
    /// Create a test network with N nodes
    pub async fn create_test_network(n: usize) -> (MockNetwork, Vec<Arc<MockNode>>) {
        let network = MockNetwork::new(MockNetworkConfig::default());
        let mut nodes = Vec::new();
        
        for i in 0..n {
            let id = Identifier::from_seed(i as u64);
            let node = network.create_node(id).await;
            nodes.push(node);
        }
        
        (network, nodes)
    }
    
    /// Wait for all nodes to be ready
    pub async fn wait_all_ready(nodes: &[Arc<MockNode>]) {
        for node in nodes {
            let mut ready = node.ready();
            while !*ready.borrow() {
                ready.changed().await.ok();
            }
        }
    }
    
    /// Assert that a message was received
    pub async fn assert_message_received(
        node: &MockNode,
        channel: Channel,
        timeout: Duration,
    ) -> MockMessage {
        tokio::time::timeout(timeout, node.inbox.recv())
            .await
            .expect("Timeout waiting for message")
            .expect("Channel closed")
    }
    
    /// Simulate Byzantine node behavior
    pub struct ByzantineNode {
        inner: Arc<MockNode>,
    }
    
    impl ByzantineNode {
        pub fn new(node: Arc<MockNode>) -> Self {
            Self { inner: node }
        }
        
        /// Send invalid messages
        pub async fn send_invalid_message(&self, target: Identifier) {
            // Send message with invalid signature, wrong format, etc.
        }
        
        /// Drop messages selectively
        pub async fn drop_messages_from(&self, source: Identifier) {
            // Configure node to drop messages from specific source
        }
    }
}

6. Test Scenarios

#[cfg(test)]
mod tests {
    use super::*;
    
    #[tokio::test]
    async fn test_message_delivery() {
        let (network, nodes) = create_test_network(3).await;
        
        // Register processors
        let processor = Arc::new(TestProcessor::new());
        let conduit = nodes[0].register(Channel::Test, processor.clone()).await.unwrap();
        
        // Send message
        conduit.send(nodes[1].local_id().clone(), test_message()).await.unwrap();
        
        // Verify delivery
        let msg = assert_message_received(&nodes[1], Channel::Test, Duration::from_secs(1)).await;
        assert_eq!(msg.from, nodes[0].local_id());
    }
    
    #[tokio::test]
    async fn test_network_partition() {
        let (network, nodes) = create_test_network(4).await;
        
        // Create partition
        let group1 = vec![nodes[0].local_id().clone(), nodes[1].local_id().clone()];
        let group2 = vec![nodes[2].local_id().clone(), nodes[3].local_id().clone()];
        network.partition(group1, group2).await;
        
        // Messages within partition succeed
        // Messages across partition fail
        
        // Heal partition
        network.heal_partition().await;
        
        // Messages across former partition succeed
    }
    
    #[tokio::test]
    async fn test_byzantine_behavior() {
        let (network, nodes) = create_test_network(5).await;
        
        // Make one node Byzantine
        let byzantine = ByzantineNode::new(nodes[2].clone());
        
        // Test consensus with Byzantine node
        // Verify protocol handles Byzantine behavior correctly
    }
}

Key Features

  • Network Simulation: Test without real sockets
  • Failure Injection: Simulate partitions, delays, packet loss
  • Deterministic Testing: Reproducible test scenarios
  • Byzantine Testing: Test protocol robustness
  • Performance Metrics: Track messages and bandwidth

Benefits

  • Fast, deterministic tests
  • Test edge cases easily
  • Simulate large networks
  • Test failure scenarios
  • No network setup required

Testing Requirements

  • Unit tests for mock components
  • Integration tests with skip graph
  • Performance benchmarks
  • Stress tests with many nodes

Dependencies

  • tokio (async runtime)
  • async-trait
  • bincode (serialization for tests)
  • rand (for probabilistic failures)

Priority

Medium - Essential for thorough testing

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions