Skip to content

Implement Network Abstraction Layer with Conduit Pattern #58

@thep2p

Description

@thep2p

Overview

The Go implementation has a comprehensive Network interface that provides high-level abstractions for network communication. This includes the Conduit pattern for sending messages and channel-based message routing. The Rust implementation currently only has a mock network and needs the full network abstraction layer.

Background

Reference implementation: skipgraph-go/net/network.go

The Network layer provides:

  • Component lifecycle management (Startable + ReadyDoneAware)
  • Channel-based message routing
  • MessageProcessor registration
  • Conduit abstraction for sending messages

Requirements

1. Define Core Network Traits

use async_trait::async_trait;
use std::sync::Arc;

/// Channel types for message routing
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Channel {
    Test,
    Consensus,
    SyncCommittee,
    PushTransactions,
    RequestTransactions,
    // Add more as needed
}

/// High-level network abstraction
#[async_trait]
pub trait Network: Component + Send + Sync {
    /// Register a MessageProcessor for a specific channel
    /// 
    /// # Errors
    /// Returns error if a processor is already registered for the channel
    async fn register(
        &self,
        channel: Channel,
        processor: Arc<dyn MessageProcessor>
    ) -> Result<Arc<dyn Conduit>, NetworkError>;
    
    /// Get the local node's identifier
    fn local_id(&self) -> &Identifier;
    
    /// Get network metrics
    fn metrics(&self) -> NetworkMetrics;
}

/// Conduit for sending messages to other nodes
#[async_trait]
pub trait Conduit: Send + Sync {
    /// Send a message to a specific node
    /// 
    /// # Arguments
    /// * `destination` - The identifier of the destination node
    /// * `message` - The message to send
    /// 
    /// # Errors
    /// Returns benign errors that should not crash the node
    async fn send(&self, destination: Identifier, message: Message) -> Result<(), ConduitError>;
    
    /// Broadcast a message to multiple nodes
    async fn broadcast(&self, destinations: Vec<Identifier>, message: Message) -> Vec<Result<(), ConduitError>>;
    
    /// Get the channel this conduit is associated with
    fn channel(&self) -> Channel;
}

2. Implement Network Struct

use tokio::sync::RwLock;
use std::collections::HashMap;

pub struct NetworkImpl {
    local_id: Identifier,
    component: BaseComponent,
    processors: Arc<RwLock<HashMap<Channel, Arc<dyn MessageProcessor>>>>,
    connection_manager: Arc<dyn ConnectionManager>,
    metrics: Arc<RwLock<NetworkMetrics>>,
}

impl NetworkImpl {
    pub fn new(
        local_id: Identifier,
        connection_manager: Arc<dyn ConnectionManager>,
    ) -> Self {
        Self {
            local_id,
            component: BaseComponent::new(),
            processors: Arc::new(RwLock::new(HashMap::new())),
            connection_manager,
            metrics: Arc::new(RwLock::new(NetworkMetrics::default())),
        }
    }
    
    /// Start the message receiver loop
    async fn start_receiver(&self, ctx: Arc<dyn ThrowableContext>) {
        // Listen for incoming connections
        // Route messages to appropriate processors based on channel
    }
}

#[async_trait]
impl Network for NetworkImpl {
    async fn register(
        &self,
        channel: Channel,
        processor: Arc<dyn MessageProcessor>
    ) -> Result<Arc<dyn Conduit>, NetworkError> {
        let mut processors = self.processors.write().await;
        
        if processors.contains_key(&channel) {
            return Err(NetworkError::ProcessorAlreadyRegistered(channel));
        }
        
        processors.insert(channel, processor);
        
        // Create and return a conduit for this channel
        Ok(Arc::new(ConduitImpl {
            channel,
            network: Arc::new(self.clone()),
        }))
    }
    
    fn local_id(&self) -> &Identifier {
        &self.local_id
    }
    
    fn metrics(&self) -> NetworkMetrics {
        self.metrics.read().await.clone()
    }
}

3. Implement Conduit

struct ConduitImpl {
    channel: Channel,
    network: Arc<NetworkImpl>,
}

#[async_trait]
impl Conduit for ConduitImpl {
    async fn send(&self, destination: Identifier, message: Message) -> Result<(), ConduitError> {
        // Get or establish connection
        let connection = self.network.connection_manager
            .connect(destination)
            .await
            .map_err(|e| ConduitError::ConnectionFailed(e))?;
        
        // Serialize and send message
        let envelope = MessageEnvelope {
            channel: self.channel,
            sender: self.network.local_id.clone(),
            payload: message,
        };
        
        connection.send(envelope).await
            .map_err(|e| ConduitError::SendFailed(e))?;
        
        // Update metrics
        self.network.metrics.write().await.messages_sent += 1;
        
        Ok(())
    }
    
    async fn broadcast(&self, destinations: Vec<Identifier>, message: Message) -> Vec<Result<(), ConduitError>> {
        let futures = destinations.into_iter().map(|dest| {
            self.send(dest, message.clone())
        });
        
        futures::future::join_all(futures).await
    }
    
    fn channel(&self) -> Channel {
        self.channel
    }
}

4. Message Types

/// Network message
#[derive(Clone, Debug)]
pub struct Message {
    pub payload: Vec<u8>,
    pub metadata: HashMap<String, String>,
}

/// Message envelope for network transmission
#[derive(Clone, Debug, Serialize, Deserialize)]
struct MessageEnvelope {
    pub channel: Channel,
    pub sender: Identifier,
    pub payload: Message,
    pub timestamp: u64,
}

5. Usage Example

async fn setup_network_layer() -> Result<(), Box<dyn Error>> {
    let network = Arc::new(NetworkImpl::new(
        local_id,
        connection_manager,
    ));
    
    // Register processors for different channels
    let consensus_engine = Arc::new(ConsensusEngine::new());
    let consensus_conduit = network.register(
        Channel::Consensus,
        consensus_engine.clone()
    ).await?;
    
    let sync_engine = Arc::new(SyncEngine::new());
    let sync_conduit = network.register(
        Channel::SyncCommittee,
        sync_engine.clone()
    ).await?;
    
    // Start network
    network.start(ctx).await;
    
    // Send messages
    consensus_conduit.send(peer_id, consensus_message).await?;
    
    Ok(())
}

Design Principles

  1. Channel Isolation: Each channel has exactly one processor
  2. Error Resilience: Network errors are benign, don't crash the node
  3. Connection Reuse: Connections are cached and reused
  4. Metrics: Track network statistics for monitoring
  5. Async/Await: Fully async implementation using tokio

Testing Requirements

  • Test processor registration
  • Test message routing to correct processors
  • Test connection establishment and reuse
  • Test error handling
  • Test concurrent message sending
  • Mock implementations for testing

Dependencies

Priority

High - Core infrastructure needed for network communication

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions