-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Overview
The Go implementation has a comprehensive connection management system with Connection and ConnectionManager interfaces. This provides connection pooling, automatic reconnection, and graceful shutdown capabilities. The Rust implementation needs these abstractions for reliable network communication.
Background
Reference implementation: skipgraph-go/net/internal/connection.go
The connection layer provides:
- Connection pooling and reuse
- Automatic reconnection on failure
- Graceful connection shutdown
- Message framing and serialization
Requirements
1. Define Connection Traits
use async_trait::async_trait;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
/// Represents a connection to a remote peer
#[async_trait]
pub trait Connection: Send + Sync {
/// Get the remote address of this connection
fn remote_addr(&self) -> Option<String>;
/// Get the remote peer's identifier
fn remote_id(&self) -> &Identifier;
/// Send a message to the remote peer
///
/// # Errors
/// - Returns IoError::UnexpectedEof if connection is closed
/// - Other errors are benign and should not panic
async fn send(&self, data: Vec<u8>) -> Result<(), ConnectionError>;
/// Receive the next message from the remote peer
///
/// # Errors
/// - Returns IoError::UnexpectedEof if connection is closed
async fn next(&self) -> Result<Vec<u8>, ConnectionError>;
/// Check if the connection is still active
fn is_alive(&self) -> bool;
/// Gracefully close the connection
async fn close(self) -> Result<(), ConnectionError>;
}
/// Manages connections to remote peers
#[async_trait]
pub trait ConnectionManager: Send + Sync {
/// Connect to a remote peer
///
/// Returns cached connection if already connected
/// Cardinal rule: at most one connection per remote peer
async fn connect(&self, peer_id: Identifier) -> Result<Arc<dyn Connection>, ConnectionError>;
/// Get an existing connection if available
async fn get_connection(&self, peer_id: &Identifier) -> Option<Arc<dyn Connection>>;
/// Close connection to a specific peer
async fn disconnect(&self, peer_id: &Identifier) -> Result<(), ConnectionError>;
/// Close all connections
async fn close_all(&self) -> Result<(), ConnectionError>;
/// Get number of active connections
async fn connection_count(&self) -> usize;
/// List all connected peer IDs
async fn connected_peers(&self) -> Vec<Identifier>;
}2. Implement TCP Connection
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::io::{BufReader, BufWriter};
pub struct TcpConnection {
remote_id: Identifier,
remote_addr: String,
reader: Arc<Mutex<BufReader<OwnedReadHalf>>>,
writer: Arc<Mutex<BufWriter<OwnedWriteHalf>>>,
alive: Arc<AtomicBool>,
}
impl TcpConnection {
pub async fn new(
stream: TcpStream,
remote_id: Identifier,
) -> Result<Self, ConnectionError> {
let remote_addr = stream.peer_addr()?.to_string();
let (read_half, write_half) = stream.into_split();
Ok(Self {
remote_id,
remote_addr,
reader: Arc::new(Mutex::new(BufReader::new(read_half))),
writer: Arc::new(Mutex::new(BufWriter::new(write_half))),
alive: Arc::new(AtomicBool::new(true)),
})
}
}
#[async_trait]
impl Connection for TcpConnection {
fn remote_addr(&self) -> Option<String> {
if self.is_alive() {
Some(self.remote_addr.clone())
} else {
None
}
}
fn remote_id(&self) -> &Identifier {
&self.remote_id
}
async fn send(&self, data: Vec<u8>) -> Result<(), ConnectionError> {
if !self.is_alive() {
return Err(ConnectionError::ConnectionClosed);
}
let mut writer = self.writer.lock().await;
// Frame the message with length prefix
writer.write_u32(data.len() as u32).await?;
writer.write_all(&data).await?;
writer.flush().await?;
Ok(())
}
async fn next(&self) -> Result<Vec<u8>, ConnectionError> {
if !self.is_alive() {
return Err(ConnectionError::ConnectionClosed);
}
let mut reader = self.reader.lock().await;
// Read length prefix
let len = reader.read_u32().await? as usize;
// Read message data
let mut data = vec![0u8; len];
reader.read_exact(&mut data).await?;
Ok(data)
}
fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
async fn close(self) -> Result<(), ConnectionError> {
self.alive.store(false, Ordering::SeqCst);
// Writer and reader will be dropped
Ok(())
}
}3. Implement Connection Manager
use tokio::sync::RwLock;
use std::collections::HashMap;
pub struct ConnectionManagerImpl {
connections: Arc<RwLock<HashMap<Identifier, Arc<dyn Connection>>>>,
address_resolver: Arc<dyn AddressResolver>,
config: ConnectionConfig,
}
impl ConnectionManagerImpl {
pub fn new(
address_resolver: Arc<dyn AddressResolver>,
config: ConnectionConfig,
) -> Self {
Self {
connections: Arc::new(RwLock::new(HashMap::new())),
address_resolver,
config,
}
}
async fn establish_connection(&self, peer_id: &Identifier) -> Result<Arc<dyn Connection>, ConnectionError> {
// Resolve peer address
let address = self.address_resolver.resolve(peer_id).await?;
// Connect with timeout
let stream = tokio::time::timeout(
self.config.connect_timeout,
TcpStream::connect(&address)
).await??;
// Configure TCP options
stream.set_nodelay(true)?;
stream.set_keepalive(Some(self.config.keepalive_interval))?;
// Create connection
let connection = Arc::new(TcpConnection::new(stream, peer_id.clone()).await?);
// Spawn health checker
self.spawn_health_checker(connection.clone());
Ok(connection)
}
fn spawn_health_checker(&self, connection: Arc<dyn Connection>) {
let connections = self.connections.clone();
let peer_id = connection.remote_id().clone();
tokio::spawn(async move {
while connection.is_alive() {
tokio::time::sleep(Duration::from_secs(30)).await;
// Remove dead connections
if !connection.is_alive() {
connections.write().await.remove(&peer_id);
break;
}
}
});
}
}
#[async_trait]
impl ConnectionManager for ConnectionManagerImpl {
async fn connect(&self, peer_id: Identifier) -> Result<Arc<dyn Connection>, ConnectionError> {
// Check for existing connection
{
let connections = self.connections.read().await;
if let Some(conn) = connections.get(&peer_id) {
if conn.is_alive() {
return Ok(conn.clone());
}
}
}
// Establish new connection
let connection = self.establish_connection(&peer_id).await?;
// Cache connection
self.connections.write().await.insert(peer_id, connection.clone());
Ok(connection)
}
async fn get_connection(&self, peer_id: &Identifier) -> Option<Arc<dyn Connection>> {
self.connections.read().await.get(peer_id).cloned()
}
async fn disconnect(&self, peer_id: &Identifier) -> Result<(), ConnectionError> {
if let Some(conn) = self.connections.write().await.remove(peer_id) {
conn.close().await?;
}
Ok(())
}
async fn close_all(&self) -> Result<(), ConnectionError> {
let connections = self.connections.write().await.drain().collect::<Vec<_>>();
for (_, conn) in connections {
conn.close().await?;
}
Ok(())
}
async fn connection_count(&self) -> usize {
self.connections.read().await.len()
}
async fn connected_peers(&self) -> Vec<Identifier> {
self.connections.read().await.keys().cloned().collect()
}
}4. Configuration and Address Resolution
#[derive(Clone, Debug)]
pub struct ConnectionConfig {
pub connect_timeout: Duration,
pub keepalive_interval: Duration,
pub max_connections: usize,
pub retry_attempts: u32,
pub retry_delay: Duration,
}
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
connect_timeout: Duration::from_secs(10),
keepalive_interval: Duration::from_secs(60),
max_connections: 1000,
retry_attempts: 3,
retry_delay: Duration::from_secs(1),
}
}
}
/// Resolves peer identifiers to network addresses
#[async_trait]
pub trait AddressResolver: Send + Sync {
async fn resolve(&self, peer_id: &Identifier) -> Result<SocketAddr, AddressError>;
}Key Features
- Connection Pooling: Reuse existing connections
- Health Checking: Automatic detection of dead connections
- Graceful Shutdown: Clean connection closure
- Message Framing: Length-prefixed messages
- Retry Logic: Configurable retry on connection failure
Testing Requirements
- Test connection establishment
- Test connection pooling
- Test automatic reconnection
- Test graceful shutdown
- Test concurrent connections
- Test error scenarios
- Mock implementations for testing
Dependencies
- tokio (async runtime, networking)
- async-trait
- bytes (for efficient byte handling)
Priority
High - Required for network layer implementation
Related Issues
- Blocks: Implement Network Abstraction Layer with Conduit Pattern #58 (Network abstraction layer)
Metadata
Metadata
Assignees
Labels
No labels