Skip to content

Add Protocol Buffers Support for Network Messages #60

@thep2p

Description

@thep2p

Overview

The Go implementation uses Protocol Buffers for network message serialization. This provides efficient, language-agnostic serialization with strong typing and backward compatibility. The Rust implementation needs protobuf support for compatibility and efficiency.

Background

Reference implementation: skipgraph-go/net/internal/connection/message.proto

Protocol Buffers provide:

  • Efficient binary serialization
  • Cross-language compatibility
  • Schema evolution support
  • Strong typing

Requirements

1. Define Proto Schema

Create src/network/proto/message.proto:

syntax = "proto3";

package skipgraph.network;

// Base message envelope
message Message {
    // Message payload
    bytes data = 1;
    
    // Sender identifier (32 bytes hex string)
    string sender = 2;
    
    // Unix timestamp in nanoseconds
    uint64 timestamp = 3;
    
    // Message type identifier
    string type_id = 4;
    
    // Channel identifier
    string channel = 5;
    
    // Optional metadata
    map<string, string> metadata = 6;
}

// Connection handshake
message Handshake {
    // Node identifier
    string node_id = 1;
    
    // Protocol version
    uint32 version = 2;
    
    // Supported channels
    repeated string channels = 3;
    
    // Node capabilities
    map<string, string> capabilities = 4;
}

// Skip graph specific messages
message SkipGraphMessage {
    oneof message {
        SearchRequest search_request = 1;
        SearchResponse search_response = 2;
        JoinRequest join_request = 3;
        JoinResponse join_response = 4;
        LeaveNotification leave_notification = 5;
        UpdateNeighbors update_neighbors = 6;
    }
}

message SearchRequest {
    bytes target_id = 1;
    uint32 level = 2;
    string request_id = 3;
}

message SearchResponse {
    string request_id = 1;
    bool found = 2;
    bytes node_id = 3;
    string address = 4;
}

message JoinRequest {
    bytes node_id = 1;
    bytes membership_vector = 2;
    string address = 3;
}

message JoinResponse {
    bool accepted = 1;
    repeated Neighbor neighbors = 2;
}

message LeaveNotification {
    bytes node_id = 1;
    uint32 level = 2;
}

message UpdateNeighbors {
    uint32 level = 1;
    repeated Neighbor neighbors = 2;
}

message Neighbor {
    bytes node_id = 1;
    string address = 2;
    uint32 level = 3;
    bool is_left = 4;
}

2. Build Configuration

Update Cargo.toml:

[dependencies]
prost = "0.12"
prost-types = "0.12"
bytes = "1.5"

[build-dependencies]
prost-build = "0.12"

Create build.rs:

use std::io::Result;

fn main() -> Result<()> {
    prost_build::Config::new()
        .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
        .compile_protos(
            &["src/network/proto/message.proto"],
            &["src/network/proto/"],
        )?;
    Ok(())
}

3. Message Serialization Layer

use prost::Message as ProstMessage;
use bytes::{Bytes, BytesMut};

pub mod proto {
    include!(concat!(env!("OUT_DIR"), "/skipgraph.network.rs"));
}

/// Trait for protobuf serializable messages
pub trait ProtoMessage: Sized {
    type Proto: ProstMessage + Default;
    
    /// Convert to protobuf
    fn to_proto(&self) -> Self::Proto;
    
    /// Convert from protobuf
    fn from_proto(proto: Self::Proto) -> Result<Self, DecodeError>;
    
    /// Serialize to bytes
    fn encode(&self) -> Result<Vec<u8>, EncodeError> {
        let proto = self.to_proto();
        let mut buf = BytesMut::with_capacity(proto.encoded_len());
        proto.encode(&mut buf)?;
        Ok(buf.freeze().to_vec())
    }
    
    /// Deserialize from bytes
    fn decode(data: &[u8]) -> Result<Self, DecodeError> {
        let proto = Self::Proto::decode(data)?;
        Self::from_proto(proto)
    }
}

4. Implement Message Types

use crate::core::model::Identifier;
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Debug, Clone)]
pub struct NetworkMessage {
    pub data: Vec<u8>,
    pub sender: Identifier,
    pub timestamp: u64,
    pub type_id: String,
    pub channel: Channel,
    pub metadata: HashMap<String, String>,
}

impl ProtoMessage for NetworkMessage {
    type Proto = proto::Message;
    
    fn to_proto(&self) -> Self::Proto {
        proto::Message {
            data: self.data.clone(),
            sender: hex::encode(&self.sender),
            timestamp: self.timestamp,
            type_id: self.type_id.clone(),
            channel: self.channel.to_string(),
            metadata: self.metadata.clone(),
        }
    }
    
    fn from_proto(proto: Self::Proto) -> Result<Self, DecodeError> {
        Ok(NetworkMessage {
            data: proto.data,
            sender: Identifier::from_hex(&proto.sender)?,
            timestamp: proto.timestamp,
            type_id: proto.type_id,
            channel: Channel::from_str(&proto.channel)?,
            metadata: proto.metadata,
        })
    }
}

impl NetworkMessage {
    pub fn new(data: Vec<u8>, sender: Identifier, channel: Channel) -> Self {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_nanos() as u64;
        
        Self {
            data,
            sender,
            timestamp,
            type_id: String::new(),
            channel,
            metadata: HashMap::new(),
        }
    }
}

5. Integration with Connection Layer

impl TcpConnection {
    /// Send a protobuf message
    pub async fn send_proto<T: ProtoMessage>(&self, msg: &T) -> Result<(), ConnectionError> {
        let data = msg.encode()
            .map_err(|e| ConnectionError::SerializationError(e))?;
        self.send(data).await
    }
    
    /// Receive a protobuf message
    pub async fn recv_proto<T: ProtoMessage>(&self) -> Result<T, ConnectionError> {
        let data = self.next().await?;
        T::decode(&data)
            .map_err(|e| ConnectionError::DeserializationError(e))
    }
}

6. Skip Graph Message Handling

#[derive(Debug, Clone)]
pub enum SkipGraphMsg {
    SearchRequest(SearchRequest),
    SearchResponse(SearchResponse),
    JoinRequest(JoinRequest),
    JoinResponse(JoinResponse),
    LeaveNotification(LeaveNotification),
    UpdateNeighbors(UpdateNeighbors),
}

impl ProtoMessage for SkipGraphMsg {
    type Proto = proto::SkipGraphMessage;
    
    fn to_proto(&self) -> Self::Proto {
        use proto::skip_graph_message::Message as ProtoMsg;
        
        let message = match self {
            SkipGraphMsg::SearchRequest(req) => ProtoMsg::SearchRequest(req.to_proto()),
            SkipGraphMsg::SearchResponse(res) => ProtoMsg::SearchResponse(res.to_proto()),
            // ... other variants
        };
        
        proto::SkipGraphMessage {
            message: Some(message),
        }
    }
    
    fn from_proto(proto: Self::Proto) -> Result<Self, DecodeError> {
        use proto::skip_graph_message::Message as ProtoMsg;
        
        match proto.message {
            Some(ProtoMsg::SearchRequest(req)) => {
                Ok(SkipGraphMsg::SearchRequest(SearchRequest::from_proto(req)?))
            }
            // ... other variants
            None => Err(DecodeError::MissingField("message")),
        }
    }
}

Benefits

  • Efficiency: Binary format is compact and fast
  • Type Safety: Generated code provides compile-time type checking
  • Compatibility: Same wire format as Go implementation
  • Evolution: Can add fields without breaking compatibility
  • Documentation: Proto files serve as API documentation

Testing Requirements

  • Test serialization/deserialization roundtrip
  • Test compatibility with Go implementation
  • Test message size efficiency
  • Test schema evolution
  • Benchmark serialization performance

Dependencies

  • prost (protobuf implementation)
  • prost-build (code generation)
  • bytes (efficient byte handling)
  • serde (optional, for JSON conversion)

Priority

High - Required for network compatibility with Go implementation

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions