-
Notifications
You must be signed in to change notification settings - Fork 1
Open
Description
Overview
The Go implementation uses Protocol Buffers for network message serialization. This provides efficient, language-agnostic serialization with strong typing and backward compatibility. The Rust implementation needs protobuf support for compatibility and efficiency.
Background
Reference implementation: skipgraph-go/net/internal/connection/message.proto
Protocol Buffers provide:
- Efficient binary serialization
- Cross-language compatibility
- Schema evolution support
- Strong typing
Requirements
1. Define Proto Schema
Create src/network/proto/message.proto:
syntax = "proto3";
package skipgraph.network;
// Base message envelope
message Message {
// Message payload
bytes data = 1;
// Sender identifier (32 bytes hex string)
string sender = 2;
// Unix timestamp in nanoseconds
uint64 timestamp = 3;
// Message type identifier
string type_id = 4;
// Channel identifier
string channel = 5;
// Optional metadata
map<string, string> metadata = 6;
}
// Connection handshake
message Handshake {
// Node identifier
string node_id = 1;
// Protocol version
uint32 version = 2;
// Supported channels
repeated string channels = 3;
// Node capabilities
map<string, string> capabilities = 4;
}
// Skip graph specific messages
message SkipGraphMessage {
oneof message {
SearchRequest search_request = 1;
SearchResponse search_response = 2;
JoinRequest join_request = 3;
JoinResponse join_response = 4;
LeaveNotification leave_notification = 5;
UpdateNeighbors update_neighbors = 6;
}
}
message SearchRequest {
bytes target_id = 1;
uint32 level = 2;
string request_id = 3;
}
message SearchResponse {
string request_id = 1;
bool found = 2;
bytes node_id = 3;
string address = 4;
}
message JoinRequest {
bytes node_id = 1;
bytes membership_vector = 2;
string address = 3;
}
message JoinResponse {
bool accepted = 1;
repeated Neighbor neighbors = 2;
}
message LeaveNotification {
bytes node_id = 1;
uint32 level = 2;
}
message UpdateNeighbors {
uint32 level = 1;
repeated Neighbor neighbors = 2;
}
message Neighbor {
bytes node_id = 1;
string address = 2;
uint32 level = 3;
bool is_left = 4;
}2. Build Configuration
Update Cargo.toml:
[dependencies]
prost = "0.12"
prost-types = "0.12"
bytes = "1.5"
[build-dependencies]
prost-build = "0.12"Create build.rs:
use std::io::Result;
fn main() -> Result<()> {
prost_build::Config::new()
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.compile_protos(
&["src/network/proto/message.proto"],
&["src/network/proto/"],
)?;
Ok(())
}3. Message Serialization Layer
use prost::Message as ProstMessage;
use bytes::{Bytes, BytesMut};
pub mod proto {
include!(concat!(env!("OUT_DIR"), "/skipgraph.network.rs"));
}
/// Trait for protobuf serializable messages
pub trait ProtoMessage: Sized {
type Proto: ProstMessage + Default;
/// Convert to protobuf
fn to_proto(&self) -> Self::Proto;
/// Convert from protobuf
fn from_proto(proto: Self::Proto) -> Result<Self, DecodeError>;
/// Serialize to bytes
fn encode(&self) -> Result<Vec<u8>, EncodeError> {
let proto = self.to_proto();
let mut buf = BytesMut::with_capacity(proto.encoded_len());
proto.encode(&mut buf)?;
Ok(buf.freeze().to_vec())
}
/// Deserialize from bytes
fn decode(data: &[u8]) -> Result<Self, DecodeError> {
let proto = Self::Proto::decode(data)?;
Self::from_proto(proto)
}
}4. Implement Message Types
use crate::core::model::Identifier;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct NetworkMessage {
pub data: Vec<u8>,
pub sender: Identifier,
pub timestamp: u64,
pub type_id: String,
pub channel: Channel,
pub metadata: HashMap<String, String>,
}
impl ProtoMessage for NetworkMessage {
type Proto = proto::Message;
fn to_proto(&self) -> Self::Proto {
proto::Message {
data: self.data.clone(),
sender: hex::encode(&self.sender),
timestamp: self.timestamp,
type_id: self.type_id.clone(),
channel: self.channel.to_string(),
metadata: self.metadata.clone(),
}
}
fn from_proto(proto: Self::Proto) -> Result<Self, DecodeError> {
Ok(NetworkMessage {
data: proto.data,
sender: Identifier::from_hex(&proto.sender)?,
timestamp: proto.timestamp,
type_id: proto.type_id,
channel: Channel::from_str(&proto.channel)?,
metadata: proto.metadata,
})
}
}
impl NetworkMessage {
pub fn new(data: Vec<u8>, sender: Identifier, channel: Channel) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
Self {
data,
sender,
timestamp,
type_id: String::new(),
channel,
metadata: HashMap::new(),
}
}
}5. Integration with Connection Layer
impl TcpConnection {
/// Send a protobuf message
pub async fn send_proto<T: ProtoMessage>(&self, msg: &T) -> Result<(), ConnectionError> {
let data = msg.encode()
.map_err(|e| ConnectionError::SerializationError(e))?;
self.send(data).await
}
/// Receive a protobuf message
pub async fn recv_proto<T: ProtoMessage>(&self) -> Result<T, ConnectionError> {
let data = self.next().await?;
T::decode(&data)
.map_err(|e| ConnectionError::DeserializationError(e))
}
}6. Skip Graph Message Handling
#[derive(Debug, Clone)]
pub enum SkipGraphMsg {
SearchRequest(SearchRequest),
SearchResponse(SearchResponse),
JoinRequest(JoinRequest),
JoinResponse(JoinResponse),
LeaveNotification(LeaveNotification),
UpdateNeighbors(UpdateNeighbors),
}
impl ProtoMessage for SkipGraphMsg {
type Proto = proto::SkipGraphMessage;
fn to_proto(&self) -> Self::Proto {
use proto::skip_graph_message::Message as ProtoMsg;
let message = match self {
SkipGraphMsg::SearchRequest(req) => ProtoMsg::SearchRequest(req.to_proto()),
SkipGraphMsg::SearchResponse(res) => ProtoMsg::SearchResponse(res.to_proto()),
// ... other variants
};
proto::SkipGraphMessage {
message: Some(message),
}
}
fn from_proto(proto: Self::Proto) -> Result<Self, DecodeError> {
use proto::skip_graph_message::Message as ProtoMsg;
match proto.message {
Some(ProtoMsg::SearchRequest(req)) => {
Ok(SkipGraphMsg::SearchRequest(SearchRequest::from_proto(req)?))
}
// ... other variants
None => Err(DecodeError::MissingField("message")),
}
}
}Benefits
- Efficiency: Binary format is compact and fast
- Type Safety: Generated code provides compile-time type checking
- Compatibility: Same wire format as Go implementation
- Evolution: Can add fields without breaking compatibility
- Documentation: Proto files serve as API documentation
Testing Requirements
- Test serialization/deserialization roundtrip
- Test compatibility with Go implementation
- Test message size efficiency
- Test schema evolution
- Benchmark serialization performance
Dependencies
- prost (protobuf implementation)
- prost-build (code generation)
- bytes (efficient byte handling)
- serde (optional, for JSON conversion)
Priority
High - Required for network compatibility with Go implementation
Related Issues
- Required by: Implement Network Abstraction Layer with Conduit Pattern #58 (Network abstraction), Implement Connection Management Layer #59 (Connection management)
Metadata
Metadata
Assignees
Labels
No labels