-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Overview
The Go implementation has an Engine interface that represents separate domains of functionality in a skip graph node. Engines are components that also process network messages, providing modular protocol handling.
Background
Reference implementation: skipgraph-go/engines/engines.go
An Engine:
- Is a component (Startable + ReadyDoneAware)
- Processes incoming network messages
- Represents a specific aspect of the skip graph protocol
- Acts independently with minimal shared state
Requirements
1. Define MessageProcessor Trait
use async_trait::async_trait;
/// Channel types for message routing
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Channel {
Consensus,
SyncCommittee,
PushTransactions,
RequestTransactions,
// Add more as needed
}
/// Network message abstraction
pub trait Message: Send + Sync {
fn payload(&self) -> &[u8];
fn type_id(&self) -> &str;
}
/// Processes incoming network messages
#[async_trait]
pub trait MessageProcessor: Send + Sync {
/// Process an incoming message from the network
///
/// # Arguments
/// * `channel` - The channel on which the message was received
/// * `origin_id` - The identifier of the sender
/// * `message` - The message received
///
/// # Important
/// - Errors must be handled internally
/// - Must not panic (DoS vulnerability)
/// - Should be non-blocking
async fn process_incoming_message(
&self,
channel: Channel,
origin_id: Identifier,
message: Box<dyn Message>,
);
}2. Define Engine Trait
use async_trait::async_trait;
/// An engine that handles a specific domain of skip graph functionality
#[async_trait]
pub trait Engine: Component + MessageProcessor + Send + Sync {
/// Get the name of this engine for logging/debugging
fn name(&self) -> &str;
/// Get the channels this engine is interested in
fn channels(&self) -> Vec<Channel>;
}
// Blanket implementation
impl<T> Engine for T
where
T: Component + MessageProcessor + Send + Sync
{
// Default implementations if needed
}3. Base Engine Implementation
use std::sync::Arc;
use tokio::sync::mpsc;
/// Base implementation for engines
pub struct BaseEngine {
name: String,
component: BaseComponent,
message_queue: mpsc::Sender<(Channel, Identifier, Box<dyn Message>)>,
channels: Vec<Channel>,
}
impl BaseEngine {
pub fn new(name: String, channels: Vec<Channel>) -> (Self, mpsc::Receiver<(Channel, Identifier, Box<dyn Message>)>) {
let (tx, rx) = mpsc::channel(1000);
(
Self {
name,
component: BaseComponent::new(),
message_queue: tx,
channels,
},
rx
)
}
}
#[async_trait]
impl MessageProcessor for BaseEngine {
async fn process_incoming_message(
&self,
channel: Channel,
origin_id: Identifier,
message: Box<dyn Message>,
) {
// Queue message for processing
// Non-blocking to prevent DoS
let _ = self.message_queue.try_send((channel, origin_id, message));
}
}
impl ReadyDoneAware for BaseEngine {
fn ready(&self) -> watch::Receiver<bool> {
self.component.ready()
}
fn done(&self) -> watch::Receiver<bool> {
self.component.done()
}
}4. Example Engine Implementation
pub struct ConsensusEngine {
base: BaseEngine,
// consensus-specific fields
}
impl ConsensusEngine {
pub fn new() -> Self {
let (base, mut rx) = BaseEngine::new(
"consensus".to_string(),
vec![Channel::Consensus],
);
let engine = Self { base };
// Spawn message processor
let base_component = engine.base.component.clone();
tokio::spawn(async move {
while let Some((channel, origin_id, message)) = rx.recv().await {
// Process consensus messages
match process_consensus_message(channel, origin_id, message).await {
Ok(_) => {},
Err(e) => {
// Log error but don't panic
log::error!("Consensus engine error: {}", e);
}
}
}
base_component.signal_done();
});
engine
}
}
#[async_trait]
impl Startable for ConsensusEngine {
async fn start(&self, ctx: Arc<dyn ThrowableContext>) {
if let Err(e) = self.base.component.ensure_start_once() {
panic!("{}", e);
}
// Initialize consensus state
match initialize_consensus().await {
Ok(_) => {
self.base.component.signal_ready();
// Run until cancelled
let mut cancelled = ctx.cancelled();
cancelled.changed().await.ok();
}
Err(e) => {
ctx.throw_irrecoverable(Box::new(e));
}
}
}
}
impl Engine for ConsensusEngine {
fn name(&self) -> &str {
&self.base.name
}
fn channels(&self) -> Vec<Channel> {
self.base.channels.clone()
}
}5. Integration with Node
pub struct SkipGraphNode {
engines: Vec<Arc<dyn Engine>>,
network: Arc<dyn Network>,
}
impl SkipGraphNode {
pub fn new() -> Self {
let mut node = Self {
engines: Vec::new(),
network: create_network(),
};
// Register engines
node.register_engine(Arc::new(ConsensusEngine::new()));
node.register_engine(Arc::new(SyncEngine::new()));
node.register_engine(Arc::new(TransactionEngine::new()));
node
}
fn register_engine(&mut self, engine: Arc<dyn Engine>) {
// Register engine with network for its channels
for channel in engine.channels() {
self.network.register_processor(channel, engine.clone());
}
self.engines.push(engine);
}
pub async fn start(&self, ctx: Arc<dyn ThrowableContext>) {
// Start all engines
for engine in &self.engines {
engine.start(ctx.clone()).await;
}
// Wait for all engines to be ready
for engine in &self.engines {
wait_ready(engine.as_ref()).await;
}
}
}Design Principles
- Independence: Engines operate independently
- Minimal Shared State: State sharing via dependency injection
- Non-blocking: Message processing must not block
- Error Resilience: Never panic on message processing
- Modular: Easy to add/remove engines
Testing Requirements
- Test engine lifecycle
- Test message processing
- Test error handling (no panics)
- Test multiple engines
- Test engine registration
- Mock network layer for testing
Dependencies
- async-trait
- tokio
- Depends on: Component system (Implement ThrowableContext for Error Propagation #52-Implement ComponentManager for Hierarchical Component Organization #56)
Priority
Medium - Required for modular protocol implementation
Related Issues
- Depends on: Implement Component Trait Combining Startable and ReadyDoneAware #55 (Component trait)
- Enables: Modular skip graph protocol implementation
Metadata
Metadata
Assignees
Labels
No labels