Skip to content

Implement Connection Management Layer #59

@thep2p

Description

@thep2p

Overview

The Go implementation has a comprehensive connection management system with Connection and ConnectionManager interfaces. This provides connection pooling, automatic reconnection, and graceful shutdown capabilities. The Rust implementation needs these abstractions for reliable network communication.

Background

Reference implementation: skipgraph-go/net/internal/connection.go

The connection layer provides:

  • Connection pooling and reuse
  • Automatic reconnection on failure
  • Graceful connection shutdown
  • Message framing and serialization

Requirements

1. Define Connection Traits

use async_trait::async_trait;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};

/// Represents a connection to a remote peer
#[async_trait]
pub trait Connection: Send + Sync {
    /// Get the remote address of this connection
    fn remote_addr(&self) -> Option<String>;
    
    /// Get the remote peer's identifier
    fn remote_id(&self) -> &Identifier;
    
    /// Send a message to the remote peer
    /// 
    /// # Errors
    /// - Returns IoError::UnexpectedEof if connection is closed
    /// - Other errors are benign and should not panic
    async fn send(&self, data: Vec<u8>) -> Result<(), ConnectionError>;
    
    /// Receive the next message from the remote peer
    /// 
    /// # Errors
    /// - Returns IoError::UnexpectedEof if connection is closed
    async fn next(&self) -> Result<Vec<u8>, ConnectionError>;
    
    /// Check if the connection is still active
    fn is_alive(&self) -> bool;
    
    /// Gracefully close the connection
    async fn close(self) -> Result<(), ConnectionError>;
}

/// Manages connections to remote peers
#[async_trait]
pub trait ConnectionManager: Send + Sync {
    /// Connect to a remote peer
    /// 
    /// Returns cached connection if already connected
    /// Cardinal rule: at most one connection per remote peer
    async fn connect(&self, peer_id: Identifier) -> Result<Arc<dyn Connection>, ConnectionError>;
    
    /// Get an existing connection if available
    async fn get_connection(&self, peer_id: &Identifier) -> Option<Arc<dyn Connection>>;
    
    /// Close connection to a specific peer
    async fn disconnect(&self, peer_id: &Identifier) -> Result<(), ConnectionError>;
    
    /// Close all connections
    async fn close_all(&self) -> Result<(), ConnectionError>;
    
    /// Get number of active connections
    async fn connection_count(&self) -> usize;
    
    /// List all connected peer IDs
    async fn connected_peers(&self) -> Vec<Identifier>;
}

2. Implement TCP Connection

use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::io::{BufReader, BufWriter};

pub struct TcpConnection {
    remote_id: Identifier,
    remote_addr: String,
    reader: Arc<Mutex<BufReader<OwnedReadHalf>>>,
    writer: Arc<Mutex<BufWriter<OwnedWriteHalf>>>,
    alive: Arc<AtomicBool>,
}

impl TcpConnection {
    pub async fn new(
        stream: TcpStream,
        remote_id: Identifier,
    ) -> Result<Self, ConnectionError> {
        let remote_addr = stream.peer_addr()?.to_string();
        let (read_half, write_half) = stream.into_split();
        
        Ok(Self {
            remote_id,
            remote_addr,
            reader: Arc::new(Mutex::new(BufReader::new(read_half))),
            writer: Arc::new(Mutex::new(BufWriter::new(write_half))),
            alive: Arc::new(AtomicBool::new(true)),
        })
    }
}

#[async_trait]
impl Connection for TcpConnection {
    fn remote_addr(&self) -> Option<String> {
        if self.is_alive() {
            Some(self.remote_addr.clone())
        } else {
            None
        }
    }
    
    fn remote_id(&self) -> &Identifier {
        &self.remote_id
    }
    
    async fn send(&self, data: Vec<u8>) -> Result<(), ConnectionError> {
        if !self.is_alive() {
            return Err(ConnectionError::ConnectionClosed);
        }
        
        let mut writer = self.writer.lock().await;
        
        // Frame the message with length prefix
        writer.write_u32(data.len() as u32).await?;
        writer.write_all(&data).await?;
        writer.flush().await?;
        
        Ok(())
    }
    
    async fn next(&self) -> Result<Vec<u8>, ConnectionError> {
        if !self.is_alive() {
            return Err(ConnectionError::ConnectionClosed);
        }
        
        let mut reader = self.reader.lock().await;
        
        // Read length prefix
        let len = reader.read_u32().await? as usize;
        
        // Read message data
        let mut data = vec![0u8; len];
        reader.read_exact(&mut data).await?;
        
        Ok(data)
    }
    
    fn is_alive(&self) -> bool {
        self.alive.load(Ordering::SeqCst)
    }
    
    async fn close(self) -> Result<(), ConnectionError> {
        self.alive.store(false, Ordering::SeqCst);
        // Writer and reader will be dropped
        Ok(())
    }
}

3. Implement Connection Manager

use tokio::sync::RwLock;
use std::collections::HashMap;

pub struct ConnectionManagerImpl {
    connections: Arc<RwLock<HashMap<Identifier, Arc<dyn Connection>>>>,
    address_resolver: Arc<dyn AddressResolver>,
    config: ConnectionConfig,
}

impl ConnectionManagerImpl {
    pub fn new(
        address_resolver: Arc<dyn AddressResolver>,
        config: ConnectionConfig,
    ) -> Self {
        Self {
            connections: Arc::new(RwLock::new(HashMap::new())),
            address_resolver,
            config,
        }
    }
    
    async fn establish_connection(&self, peer_id: &Identifier) -> Result<Arc<dyn Connection>, ConnectionError> {
        // Resolve peer address
        let address = self.address_resolver.resolve(peer_id).await?;
        
        // Connect with timeout
        let stream = tokio::time::timeout(
            self.config.connect_timeout,
            TcpStream::connect(&address)
        ).await??;
        
        // Configure TCP options
        stream.set_nodelay(true)?;
        stream.set_keepalive(Some(self.config.keepalive_interval))?;
        
        // Create connection
        let connection = Arc::new(TcpConnection::new(stream, peer_id.clone()).await?);
        
        // Spawn health checker
        self.spawn_health_checker(connection.clone());
        
        Ok(connection)
    }
    
    fn spawn_health_checker(&self, connection: Arc<dyn Connection>) {
        let connections = self.connections.clone();
        let peer_id = connection.remote_id().clone();
        
        tokio::spawn(async move {
            while connection.is_alive() {
                tokio::time::sleep(Duration::from_secs(30)).await;
                
                // Remove dead connections
                if !connection.is_alive() {
                    connections.write().await.remove(&peer_id);
                    break;
                }
            }
        });
    }
}

#[async_trait]
impl ConnectionManager for ConnectionManagerImpl {
    async fn connect(&self, peer_id: Identifier) -> Result<Arc<dyn Connection>, ConnectionError> {
        // Check for existing connection
        {
            let connections = self.connections.read().await;
            if let Some(conn) = connections.get(&peer_id) {
                if conn.is_alive() {
                    return Ok(conn.clone());
                }
            }
        }
        
        // Establish new connection
        let connection = self.establish_connection(&peer_id).await?;
        
        // Cache connection
        self.connections.write().await.insert(peer_id, connection.clone());
        
        Ok(connection)
    }
    
    async fn get_connection(&self, peer_id: &Identifier) -> Option<Arc<dyn Connection>> {
        self.connections.read().await.get(peer_id).cloned()
    }
    
    async fn disconnect(&self, peer_id: &Identifier) -> Result<(), ConnectionError> {
        if let Some(conn) = self.connections.write().await.remove(peer_id) {
            conn.close().await?;
        }
        Ok(())
    }
    
    async fn close_all(&self) -> Result<(), ConnectionError> {
        let connections = self.connections.write().await.drain().collect::<Vec<_>>();
        
        for (_, conn) in connections {
            conn.close().await?;
        }
        
        Ok(())
    }
    
    async fn connection_count(&self) -> usize {
        self.connections.read().await.len()
    }
    
    async fn connected_peers(&self) -> Vec<Identifier> {
        self.connections.read().await.keys().cloned().collect()
    }
}

4. Configuration and Address Resolution

#[derive(Clone, Debug)]
pub struct ConnectionConfig {
    pub connect_timeout: Duration,
    pub keepalive_interval: Duration,
    pub max_connections: usize,
    pub retry_attempts: u32,
    pub retry_delay: Duration,
}

impl Default for ConnectionConfig {
    fn default() -> Self {
        Self {
            connect_timeout: Duration::from_secs(10),
            keepalive_interval: Duration::from_secs(60),
            max_connections: 1000,
            retry_attempts: 3,
            retry_delay: Duration::from_secs(1),
        }
    }
}

/// Resolves peer identifiers to network addresses
#[async_trait]
pub trait AddressResolver: Send + Sync {
    async fn resolve(&self, peer_id: &Identifier) -> Result<SocketAddr, AddressError>;
}

Key Features

  • Connection Pooling: Reuse existing connections
  • Health Checking: Automatic detection of dead connections
  • Graceful Shutdown: Clean connection closure
  • Message Framing: Length-prefixed messages
  • Retry Logic: Configurable retry on connection failure

Testing Requirements

  • Test connection establishment
  • Test connection pooling
  • Test automatic reconnection
  • Test graceful shutdown
  • Test concurrent connections
  • Test error scenarios
  • Mock implementations for testing

Dependencies

  • tokio (async runtime, networking)
  • async-trait
  • bytes (for efficient byte handling)

Priority

High - Required for network layer implementation

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions