-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Overview
The Go implementation has a comprehensive Network interface that provides high-level abstractions for network communication. This includes the Conduit pattern for sending messages and channel-based message routing. The Rust implementation currently only has a mock network and needs the full network abstraction layer.
Background
Reference implementation: skipgraph-go/net/network.go
The Network layer provides:
- Component lifecycle management (Startable + ReadyDoneAware)
- Channel-based message routing
- MessageProcessor registration
- Conduit abstraction for sending messages
Requirements
1. Define Core Network Traits
use async_trait::async_trait;
use std::sync::Arc;
/// Channel types for message routing
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Channel {
Test,
Consensus,
SyncCommittee,
PushTransactions,
RequestTransactions,
// Add more as needed
}
/// High-level network abstraction
#[async_trait]
pub trait Network: Component + Send + Sync {
/// Register a MessageProcessor for a specific channel
///
/// # Errors
/// Returns error if a processor is already registered for the channel
async fn register(
&self,
channel: Channel,
processor: Arc<dyn MessageProcessor>
) -> Result<Arc<dyn Conduit>, NetworkError>;
/// Get the local node's identifier
fn local_id(&self) -> &Identifier;
/// Get network metrics
fn metrics(&self) -> NetworkMetrics;
}
/// Conduit for sending messages to other nodes
#[async_trait]
pub trait Conduit: Send + Sync {
/// Send a message to a specific node
///
/// # Arguments
/// * `destination` - The identifier of the destination node
/// * `message` - The message to send
///
/// # Errors
/// Returns benign errors that should not crash the node
async fn send(&self, destination: Identifier, message: Message) -> Result<(), ConduitError>;
/// Broadcast a message to multiple nodes
async fn broadcast(&self, destinations: Vec<Identifier>, message: Message) -> Vec<Result<(), ConduitError>>;
/// Get the channel this conduit is associated with
fn channel(&self) -> Channel;
}2. Implement Network Struct
use tokio::sync::RwLock;
use std::collections::HashMap;
pub struct NetworkImpl {
local_id: Identifier,
component: BaseComponent,
processors: Arc<RwLock<HashMap<Channel, Arc<dyn MessageProcessor>>>>,
connection_manager: Arc<dyn ConnectionManager>,
metrics: Arc<RwLock<NetworkMetrics>>,
}
impl NetworkImpl {
pub fn new(
local_id: Identifier,
connection_manager: Arc<dyn ConnectionManager>,
) -> Self {
Self {
local_id,
component: BaseComponent::new(),
processors: Arc::new(RwLock::new(HashMap::new())),
connection_manager,
metrics: Arc::new(RwLock::new(NetworkMetrics::default())),
}
}
/// Start the message receiver loop
async fn start_receiver(&self, ctx: Arc<dyn ThrowableContext>) {
// Listen for incoming connections
// Route messages to appropriate processors based on channel
}
}
#[async_trait]
impl Network for NetworkImpl {
async fn register(
&self,
channel: Channel,
processor: Arc<dyn MessageProcessor>
) -> Result<Arc<dyn Conduit>, NetworkError> {
let mut processors = self.processors.write().await;
if processors.contains_key(&channel) {
return Err(NetworkError::ProcessorAlreadyRegistered(channel));
}
processors.insert(channel, processor);
// Create and return a conduit for this channel
Ok(Arc::new(ConduitImpl {
channel,
network: Arc::new(self.clone()),
}))
}
fn local_id(&self) -> &Identifier {
&self.local_id
}
fn metrics(&self) -> NetworkMetrics {
self.metrics.read().await.clone()
}
}3. Implement Conduit
struct ConduitImpl {
channel: Channel,
network: Arc<NetworkImpl>,
}
#[async_trait]
impl Conduit for ConduitImpl {
async fn send(&self, destination: Identifier, message: Message) -> Result<(), ConduitError> {
// Get or establish connection
let connection = self.network.connection_manager
.connect(destination)
.await
.map_err(|e| ConduitError::ConnectionFailed(e))?;
// Serialize and send message
let envelope = MessageEnvelope {
channel: self.channel,
sender: self.network.local_id.clone(),
payload: message,
};
connection.send(envelope).await
.map_err(|e| ConduitError::SendFailed(e))?;
// Update metrics
self.network.metrics.write().await.messages_sent += 1;
Ok(())
}
async fn broadcast(&self, destinations: Vec<Identifier>, message: Message) -> Vec<Result<(), ConduitError>> {
let futures = destinations.into_iter().map(|dest| {
self.send(dest, message.clone())
});
futures::future::join_all(futures).await
}
fn channel(&self) -> Channel {
self.channel
}
}4. Message Types
/// Network message
#[derive(Clone, Debug)]
pub struct Message {
pub payload: Vec<u8>,
pub metadata: HashMap<String, String>,
}
/// Message envelope for network transmission
#[derive(Clone, Debug, Serialize, Deserialize)]
struct MessageEnvelope {
pub channel: Channel,
pub sender: Identifier,
pub payload: Message,
pub timestamp: u64,
}5. Usage Example
async fn setup_network_layer() -> Result<(), Box<dyn Error>> {
let network = Arc::new(NetworkImpl::new(
local_id,
connection_manager,
));
// Register processors for different channels
let consensus_engine = Arc::new(ConsensusEngine::new());
let consensus_conduit = network.register(
Channel::Consensus,
consensus_engine.clone()
).await?;
let sync_engine = Arc::new(SyncEngine::new());
let sync_conduit = network.register(
Channel::SyncCommittee,
sync_engine.clone()
).await?;
// Start network
network.start(ctx).await;
// Send messages
consensus_conduit.send(peer_id, consensus_message).await?;
Ok(())
}Design Principles
- Channel Isolation: Each channel has exactly one processor
- Error Resilience: Network errors are benign, don't crash the node
- Connection Reuse: Connections are cached and reused
- Metrics: Track network statistics for monitoring
- Async/Await: Fully async implementation using tokio
Testing Requirements
- Test processor registration
- Test message routing to correct processors
- Test connection establishment and reuse
- Test error handling
- Test concurrent message sending
- Mock implementations for testing
Dependencies
- tokio (async runtime)
- async-trait
- serde (for serialization)
- Depends on: Component system (Implement ThrowableContext for Error Propagation #52-Implement ComponentManager for Hierarchical Component Organization #56), Connection Management
Priority
High - Core infrastructure needed for network communication
Related Issues
- Depends on: Implement Component Trait Combining Startable and ReadyDoneAware #55 (Component trait)
- Blocks: Full network implementation
Metadata
Metadata
Assignees
Labels
No labels