Skip to content

Implement Engine Abstraction for Domain-Specific Functionality #57

@thep2p

Description

@thep2p

Overview

The Go implementation has an Engine interface that represents separate domains of functionality in a skip graph node. Engines are components that also process network messages, providing modular protocol handling.

Background

Reference implementation: skipgraph-go/engines/engines.go

An Engine:

  • Is a component (Startable + ReadyDoneAware)
  • Processes incoming network messages
  • Represents a specific aspect of the skip graph protocol
  • Acts independently with minimal shared state

Requirements

1. Define MessageProcessor Trait

use async_trait::async_trait;

/// Channel types for message routing
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Channel {
    Consensus,
    SyncCommittee,
    PushTransactions,
    RequestTransactions,
    // Add more as needed
}

/// Network message abstraction
pub trait Message: Send + Sync {
    fn payload(&self) -> &[u8];
    fn type_id(&self) -> &str;
}

/// Processes incoming network messages
#[async_trait]
pub trait MessageProcessor: Send + Sync {
    /// Process an incoming message from the network
    /// 
    /// # Arguments
    /// * `channel` - The channel on which the message was received
    /// * `origin_id` - The identifier of the sender
    /// * `message` - The message received
    /// 
    /// # Important
    /// - Errors must be handled internally
    /// - Must not panic (DoS vulnerability)
    /// - Should be non-blocking
    async fn process_incoming_message(
        &self,
        channel: Channel,
        origin_id: Identifier,
        message: Box<dyn Message>,
    );
}

2. Define Engine Trait

use async_trait::async_trait;

/// An engine that handles a specific domain of skip graph functionality
#[async_trait]
pub trait Engine: Component + MessageProcessor + Send + Sync {
    /// Get the name of this engine for logging/debugging
    fn name(&self) -> &str;
    
    /// Get the channels this engine is interested in
    fn channels(&self) -> Vec<Channel>;
}

// Blanket implementation
impl<T> Engine for T 
where 
    T: Component + MessageProcessor + Send + Sync
{
    // Default implementations if needed
}

3. Base Engine Implementation

use std::sync::Arc;
use tokio::sync::mpsc;

/// Base implementation for engines
pub struct BaseEngine {
    name: String,
    component: BaseComponent,
    message_queue: mpsc::Sender<(Channel, Identifier, Box<dyn Message>)>,
    channels: Vec<Channel>,
}

impl BaseEngine {
    pub fn new(name: String, channels: Vec<Channel>) -> (Self, mpsc::Receiver<(Channel, Identifier, Box<dyn Message>)>) {
        let (tx, rx) = mpsc::channel(1000);
        (
            Self {
                name,
                component: BaseComponent::new(),
                message_queue: tx,
                channels,
            },
            rx
        )
    }
}

#[async_trait]
impl MessageProcessor for BaseEngine {
    async fn process_incoming_message(
        &self,
        channel: Channel,
        origin_id: Identifier,
        message: Box<dyn Message>,
    ) {
        // Queue message for processing
        // Non-blocking to prevent DoS
        let _ = self.message_queue.try_send((channel, origin_id, message));
    }
}

impl ReadyDoneAware for BaseEngine {
    fn ready(&self) -> watch::Receiver<bool> {
        self.component.ready()
    }
    
    fn done(&self) -> watch::Receiver<bool> {
        self.component.done()
    }
}

4. Example Engine Implementation

pub struct ConsensusEngine {
    base: BaseEngine,
    // consensus-specific fields
}

impl ConsensusEngine {
    pub fn new() -> Self {
        let (base, mut rx) = BaseEngine::new(
            "consensus".to_string(),
            vec![Channel::Consensus],
        );
        
        let engine = Self { base };
        
        // Spawn message processor
        let base_component = engine.base.component.clone();
        tokio::spawn(async move {
            while let Some((channel, origin_id, message)) = rx.recv().await {
                // Process consensus messages
                match process_consensus_message(channel, origin_id, message).await {
                    Ok(_) => {},
                    Err(e) => {
                        // Log error but don't panic
                        log::error!("Consensus engine error: {}", e);
                    }
                }
            }
            base_component.signal_done();
        });
        
        engine
    }
}

#[async_trait]
impl Startable for ConsensusEngine {
    async fn start(&self, ctx: Arc<dyn ThrowableContext>) {
        if let Err(e) = self.base.component.ensure_start_once() {
            panic!("{}", e);
        }
        
        // Initialize consensus state
        match initialize_consensus().await {
            Ok(_) => {
                self.base.component.signal_ready();
                
                // Run until cancelled
                let mut cancelled = ctx.cancelled();
                cancelled.changed().await.ok();
            }
            Err(e) => {
                ctx.throw_irrecoverable(Box::new(e));
            }
        }
    }
}

impl Engine for ConsensusEngine {
    fn name(&self) -> &str {
        &self.base.name
    }
    
    fn channels(&self) -> Vec<Channel> {
        self.base.channels.clone()
    }
}

5. Integration with Node

pub struct SkipGraphNode {
    engines: Vec<Arc<dyn Engine>>,
    network: Arc<dyn Network>,
}

impl SkipGraphNode {
    pub fn new() -> Self {
        let mut node = Self {
            engines: Vec::new(),
            network: create_network(),
        };
        
        // Register engines
        node.register_engine(Arc::new(ConsensusEngine::new()));
        node.register_engine(Arc::new(SyncEngine::new()));
        node.register_engine(Arc::new(TransactionEngine::new()));
        
        node
    }
    
    fn register_engine(&mut self, engine: Arc<dyn Engine>) {
        // Register engine with network for its channels
        for channel in engine.channels() {
            self.network.register_processor(channel, engine.clone());
        }
        self.engines.push(engine);
    }
    
    pub async fn start(&self, ctx: Arc<dyn ThrowableContext>) {
        // Start all engines
        for engine in &self.engines {
            engine.start(ctx.clone()).await;
        }
        
        // Wait for all engines to be ready
        for engine in &self.engines {
            wait_ready(engine.as_ref()).await;
        }
    }
}

Design Principles

  1. Independence: Engines operate independently
  2. Minimal Shared State: State sharing via dependency injection
  3. Non-blocking: Message processing must not block
  4. Error Resilience: Never panic on message processing
  5. Modular: Easy to add/remove engines

Testing Requirements

  • Test engine lifecycle
  • Test message processing
  • Test error handling (no panics)
  • Test multiple engines
  • Test engine registration
  • Mock network layer for testing

Dependencies

Priority

Medium - Required for modular protocol implementation

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions