-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Overview
The Go implementation has a comprehensive testing infrastructure in the unittest/mocknet package that provides powerful tools for testing distributed skip graph behavior. The Rust implementation needs similar testing capabilities for thorough testing of network interactions and distributed algorithms.
Background
Reference implementation: skipgraph-go/unittest/mocknet
The testing infrastructure provides:
- Network simulation without real sockets
- Deterministic message delivery
- Failure injection capabilities
- Network topology simulation
Requirements
1. Mock Network Core
use tokio::sync::{mpsc, RwLock};
use std::collections::HashMap;
use std::sync::Arc;
/// Simulated network for testing
pub struct MockNetwork {
nodes: Arc<RwLock<HashMap<Identifier, MockNode>>>,
underlay: Arc<MockUnderlay>,
config: MockNetworkConfig,
}
impl MockNetwork {
pub fn new(config: MockNetworkConfig) -> Self {
Self {
nodes: Arc::new(RwLock::new(HashMap::new())),
underlay: Arc::new(MockUnderlay::new()),
config,
}
}
/// Create a new node in the network
pub async fn create_node(&self, id: Identifier) -> Arc<MockNode> {
let node = Arc::new(MockNode::new(id.clone(), self.underlay.clone()));
self.nodes.write().await.insert(id, node.clone());
node
}
/// Remove a node from the network
pub async fn remove_node(&self, id: &Identifier) {
self.nodes.write().await.remove(id);
self.underlay.disconnect_node(id).await;
}
/// Simulate network partition
pub async fn partition(&self, group1: Vec<Identifier>, group2: Vec<Identifier>) {
self.underlay.create_partition(group1, group2).await;
}
/// Heal network partition
pub async fn heal_partition(&self) {
self.underlay.clear_partitions().await;
}
/// Get network statistics
pub async fn stats(&self) -> NetworkStats {
NetworkStats {
nodes: self.nodes.read().await.len(),
messages_sent: self.underlay.message_count().await,
bytes_transferred: self.underlay.bytes_transferred().await,
}
}
}
#[derive(Debug, Clone)]
pub struct MockNetworkConfig {
pub latency_ms: Option<u64>,
pub packet_loss_rate: f64,
pub bandwidth_limit: Option<usize>,
pub enable_ordering: bool,
}2. Mock Underlay (Network Simulation Layer)
/// Simulates the underlying network transport
pub struct MockUnderlay {
connections: Arc<RwLock<HashMap<(Identifier, Identifier), MockConnection>>>,
partitions: Arc<RwLock<Vec<(HashSet<Identifier>, HashSet<Identifier>)>>>,
stats: Arc<RwLock<UnderlayStats>>,
}
impl MockUnderlay {
/// Send a message between nodes
pub async fn send(
&self,
from: Identifier,
to: Identifier,
data: Vec<u8>,
) -> Result<(), MockNetworkError> {
// Check for network partition
if self.is_partitioned(&from, &to).await {
return Err(MockNetworkError::Partitioned);
}
// Get or create connection
let conn = self.get_connection(from.clone(), to.clone()).await?;
// Simulate network conditions
self.apply_network_conditions(&conn).await;
// Deliver message
conn.deliver(data).await;
// Update stats
self.stats.write().await.messages_sent += 1;
Ok(())
}
async fn apply_network_conditions(&self, conn: &MockConnection) {
// Simulate latency
if let Some(latency) = conn.config.latency_ms {
tokio::time::sleep(Duration::from_millis(latency)).await;
}
// Simulate packet loss
if rand::random::<f64>() < conn.config.packet_loss_rate {
// Drop the packet
return;
}
}
}3. Mock Node Implementation
pub struct MockNode {
id: Identifier,
underlay: Arc<MockUnderlay>,
processors: Arc<RwLock<HashMap<Channel, Arc<dyn MessageProcessor>>>>,
inbox: mpsc::Receiver<MockMessage>,
outbox: mpsc::Sender<MockMessage>,
}
impl MockNode {
pub fn new(id: Identifier, underlay: Arc<MockUnderlay>) -> Self {
let (tx, rx) = mpsc::channel(1000);
Self {
id,
underlay,
processors: Arc::new(RwLock::new(HashMap::new())),
inbox: rx,
outbox: tx,
}
}
}
#[async_trait]
impl Network for MockNode {
async fn register(
&self,
channel: Channel,
processor: Arc<dyn MessageProcessor>
) -> Result<Arc<dyn Conduit>, NetworkError> {
let mut processors = self.processors.write().await;
if processors.contains_key(&channel) {
return Err(NetworkError::ProcessorAlreadyRegistered(channel));
}
processors.insert(channel, processor);
Ok(Arc::new(MockConduit {
channel,
node_id: self.id.clone(),
underlay: self.underlay.clone(),
}))
}
fn local_id(&self) -> &Identifier {
&self.id
}
}4. Mock Conduit
pub struct MockConduit {
channel: Channel,
node_id: Identifier,
underlay: Arc<MockUnderlay>,
}
#[async_trait]
impl Conduit for MockConduit {
async fn send(&self, destination: Identifier, message: Message) -> Result<(), ConduitError> {
let envelope = MockMessage {
from: self.node_id.clone(),
to: destination.clone(),
channel: self.channel,
payload: message,
timestamp: SystemTime::now(),
};
let data = bincode::serialize(&envelope)
.map_err(|e| ConduitError::SerializationError(e))?;
self.underlay.send(self.node_id.clone(), destination, data).await
.map_err(|e| ConduitError::NetworkError(e))?;
Ok(())
}
fn channel(&self) -> Channel {
self.channel
}
}5. Test Utilities
pub mod test_utils {
use super::*;
/// Create a test network with N nodes
pub async fn create_test_network(n: usize) -> (MockNetwork, Vec<Arc<MockNode>>) {
let network = MockNetwork::new(MockNetworkConfig::default());
let mut nodes = Vec::new();
for i in 0..n {
let id = Identifier::from_seed(i as u64);
let node = network.create_node(id).await;
nodes.push(node);
}
(network, nodes)
}
/// Wait for all nodes to be ready
pub async fn wait_all_ready(nodes: &[Arc<MockNode>]) {
for node in nodes {
let mut ready = node.ready();
while !*ready.borrow() {
ready.changed().await.ok();
}
}
}
/// Assert that a message was received
pub async fn assert_message_received(
node: &MockNode,
channel: Channel,
timeout: Duration,
) -> MockMessage {
tokio::time::timeout(timeout, node.inbox.recv())
.await
.expect("Timeout waiting for message")
.expect("Channel closed")
}
/// Simulate Byzantine node behavior
pub struct ByzantineNode {
inner: Arc<MockNode>,
}
impl ByzantineNode {
pub fn new(node: Arc<MockNode>) -> Self {
Self { inner: node }
}
/// Send invalid messages
pub async fn send_invalid_message(&self, target: Identifier) {
// Send message with invalid signature, wrong format, etc.
}
/// Drop messages selectively
pub async fn drop_messages_from(&self, source: Identifier) {
// Configure node to drop messages from specific source
}
}
}6. Test Scenarios
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_message_delivery() {
let (network, nodes) = create_test_network(3).await;
// Register processors
let processor = Arc::new(TestProcessor::new());
let conduit = nodes[0].register(Channel::Test, processor.clone()).await.unwrap();
// Send message
conduit.send(nodes[1].local_id().clone(), test_message()).await.unwrap();
// Verify delivery
let msg = assert_message_received(&nodes[1], Channel::Test, Duration::from_secs(1)).await;
assert_eq!(msg.from, nodes[0].local_id());
}
#[tokio::test]
async fn test_network_partition() {
let (network, nodes) = create_test_network(4).await;
// Create partition
let group1 = vec![nodes[0].local_id().clone(), nodes[1].local_id().clone()];
let group2 = vec![nodes[2].local_id().clone(), nodes[3].local_id().clone()];
network.partition(group1, group2).await;
// Messages within partition succeed
// Messages across partition fail
// Heal partition
network.heal_partition().await;
// Messages across former partition succeed
}
#[tokio::test]
async fn test_byzantine_behavior() {
let (network, nodes) = create_test_network(5).await;
// Make one node Byzantine
let byzantine = ByzantineNode::new(nodes[2].clone());
// Test consensus with Byzantine node
// Verify protocol handles Byzantine behavior correctly
}
}Key Features
- Network Simulation: Test without real sockets
- Failure Injection: Simulate partitions, delays, packet loss
- Deterministic Testing: Reproducible test scenarios
- Byzantine Testing: Test protocol robustness
- Performance Metrics: Track messages and bandwidth
Benefits
- Fast, deterministic tests
- Test edge cases easily
- Simulate large networks
- Test failure scenarios
- No network setup required
Testing Requirements
- Unit tests for mock components
- Integration tests with skip graph
- Performance benchmarks
- Stress tests with many nodes
Dependencies
- tokio (async runtime)
- async-trait
- bincode (serialization for tests)
- rand (for probabilistic failures)
Priority
Medium - Essential for thorough testing
Related Issues
- Tests all network components (Implement Network Abstraction Layer with Conduit Pattern #58, Implement Connection Management Layer #59)
- Enables distributed algorithm testing
Metadata
Metadata
Assignees
Labels
No labels