From e2273d1ed67757dbf3e2b05d9c6445f49067f64e Mon Sep 17 00:00:00 2001 From: Migo Date: Sat, 29 Nov 2025 21:30:02 +0400 Subject: [PATCH 1/2] borrow decode --- duva-client/src/broker/mod.rs | 14 +++-- duva-client/src/broker/read_stream.rs | 6 +- duva/src/adapters/io/tokio_stream.rs | 62 +++++++++++++------ duva/src/domains/interface.rs | 16 +++-- duva/src/lib.rs | 4 +- duva/src/presentation/clients/authenticate.rs | 6 +- duva/src/presentation/clients/request.rs | 8 +-- duva/src/presentation/clients/stream.rs | 4 +- 8 files changed, 80 insertions(+), 40 deletions(-) diff --git a/duva-client/src/broker/mod.rs b/duva-client/src/broker/mod.rs index 648b7414..582bc46b 100644 --- a/duva-client/src/broker/mod.rs +++ b/duva-client/src/broker/mod.rs @@ -14,7 +14,8 @@ use duva::prelude::tokio::sync::mpsc::Receiver; use duva::prelude::tokio::sync::mpsc::Sender; use duva::prelude::uuid::Uuid; use duva::prelude::{ - ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses, ReplicationId, + BytesMut, ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses, + ReplicationId, }; use duva::prelude::{PeerIdentifier, tokio}; use duva::prelude::{Topology, anyhow}; @@ -115,8 +116,10 @@ impl Broker { conn_req: ConnectionRequest, ) -> anyhow::Result<(ServerStreamReader, ServerStreamWriter, ConnectionResponse)> { stream.serialized_write(ConnectionRequests::Authenticate(conn_req)).await?; // client_id not exist - - let ConnectionResponses::Authenticated(response) = stream.deserialized_read().await? else { + let mut buffer = BytesMut::new(); + let ConnectionResponses::Authenticated(response) = + stream.deserialized_read(&mut buffer).await? + else { bail!("Authentication failed"); }; @@ -161,7 +164,10 @@ impl Broker { async fn discover_leader_from(&mut self, follower: PeerIdentifier) -> anyhow::Result<()> { let mut stream = TcpStream::connect(follower.as_str()).await?; stream.serialized_write(ConnectionRequests::Discovery).await?; - let ConnectionResponses::Discovery { leader_id } = stream.deserialized_read().await? else { + let mut buffer = BytesMut::new(); + let ConnectionResponses::Discovery { leader_id } = + stream.deserialized_read(&mut buffer).await? + else { bail!("Discovery failed!"); }; diff --git a/duva-client/src/broker/read_stream.rs b/duva-client/src/broker/read_stream.rs index 0e15af24..69363900 100644 --- a/duva-client/src/broker/read_stream.rs +++ b/duva-client/src/broker/read_stream.rs @@ -1,8 +1,8 @@ use crate::broker::BrokerMessage; use duva::domains::TSerdeRead; -use duva::prelude::ReplicationId; use duva::prelude::tokio::{self, net::tcp::OwnedReadHalf, sync::oneshot}; +use duva::prelude::{BytesMut, ReplicationId}; pub struct ServerStreamReader(pub(crate) OwnedReadHalf); impl ServerStreamReader { @@ -15,9 +15,9 @@ impl ServerStreamReader { let future = async move { let controller_sender = controller_sender.clone(); - loop { - match self.0.deserialized_reads().await { + let mut buffer = BytesMut::new(); + match self.0.deserialized_reads(&mut buffer).await { Ok(server_responses) => { for res in server_responses { if controller_sender diff --git a/duva/src/adapters/io/tokio_stream.rs b/duva/src/adapters/io/tokio_stream.rs index 9debe2fd..bfc502f7 100644 --- a/duva/src/adapters/io/tokio_stream.rs +++ b/duva/src/adapters/io/tokio_stream.rs @@ -6,6 +6,9 @@ use crate::domains::{ TSerdeWrite, }; +use async_trait::async_trait; +use bincode::BorrowDecode; + use bytes::BytesMut; use std::fmt::Debug; use std::io::ErrorKind; @@ -65,7 +68,8 @@ impl TSerd Ok(parsed_values) } async fn receive_connection_msgs(&mut self) -> Result { - self.deserialized_read().await + let mut buffer = BytesMut::new(); + self.deserialized_read(&mut buffer).await } } @@ -92,35 +96,53 @@ impl TSer } } -impl TSerdeRead for T { - async fn deserialized_read(&mut self) -> Result +#[async_trait] +impl TSerdeRead for T +where + T: AsyncReadExt + Unpin + Sync + Send + Debug + 'static, +{ + async fn deserialized_read<'a, U>(&mut self, buffer: &'a mut BytesMut) -> Result where - U: bincode::Decode<()>, + U: BorrowDecode<'a, ()> + Send, { - let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY); - self.read_bytes(&mut buffer).await?; + self.read_bytes(buffer).await?; - let (request, _) = bincode::decode_from_slice(&buffer, SERDE_CONFIG) - .map_err(|e| IoError::Custom(e.to_string()))?; + let (request, _) = bincode::borrow_decode_from_slice(&buffer[..], SERDE_CONFIG).unwrap(); Ok(request) } - async fn deserialized_reads(&mut self) -> Result, IoError> + async fn deserialized_reads<'a, U>( + &mut self, + buffer: &'a mut BytesMut, + ) -> Result, IoError> where - U: bincode::Decode<()>, + U: BorrowDecode<'a, ()> + Send, { - let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY); - self.read_bytes(&mut buffer).await?; + // Read data from socket into buffer + self.read_bytes(buffer).await?; let mut parsed_values = Vec::new(); - while !buffer.is_empty() { - let (request, size) = bincode::decode_from_slice(&buffer, SERDE_CONFIG) - .map_err(|e| IoError::Custom(e.to_string()))?; - parsed_values.push(request); - buffer = buffer.split_off(size); + // Zero-copy slicing logic + let mut slice = &buffer[..]; + + // Note: In a real protocol, you need a loop that checks if + // there is enough data for a full frame before decoding. + // This simple loop assumes the buffer contains perfect packets. + while !slice.is_empty() { + match bincode::borrow_decode_from_slice(slice, SERDE_CONFIG) { + Ok((request, size)) => { + parsed_values.push(request); + slice = &slice[size..]; + }, + Err(_) => { + // Stop if we can't decode anymore (partial packet) + break; + }, + } } + Ok(parsed_values) } } @@ -226,7 +248,8 @@ pub mod test_tokio_stream_impl { let mut mock = MockAsyncStream::new(vec![encoded_msg.into()]); // 2. Act - let result: Result, IoError> = mock.deserialized_reads().await; + let mut buffer = BytesMut::new(); + let result: Result, IoError> = mock.deserialized_reads(&mut buffer).await; // 3. Assert let deserialized = result.unwrap(); @@ -247,7 +270,8 @@ pub mod test_tokio_stream_impl { let mut mock = MockAsyncStream::new(vec![raw_data]); // 2. Act - let result: Result, IoError> = mock.deserialized_reads().await; + let mut buffer = BytesMut::new(); + let result: Result, IoError> = mock.deserialized_reads(&mut buffer).await; // 3. Assert let deserialized = result.unwrap(); diff --git a/duva/src/domains/interface.rs b/duva/src/domains/interface.rs index 4f2b8bd6..b36cac0f 100644 --- a/duva/src/domains/interface.rs +++ b/duva/src/domains/interface.rs @@ -7,6 +7,8 @@ use crate::domains::{ connections::connection_types::{ReadConnected, WriteConnected}, }, }; +use bincode::BorrowDecode; +use bincode::config::Configuration; use bytes::BytesMut; #[async_trait::async_trait] @@ -33,14 +35,18 @@ pub trait TSerdeWrite { ) -> impl std::future::Future> + Send; } +#[async_trait::async_trait] pub trait TSerdeRead { - fn deserialized_read>( - &mut self, - ) -> impl std::future::Future> + Send; + async fn deserialized_read<'a, U>(&mut self, buffer: &'a mut BytesMut) -> Result + where + U: BorrowDecode<'a, ()> + Send; // 'U' lives as long as 'buffer' - fn deserialized_reads>( + async fn deserialized_reads<'a, U>( &mut self, - ) -> impl std::future::Future, IoError>> + Send; + buffer: &'a mut BytesMut, + ) -> Result, IoError> + where + U: BorrowDecode<'a, ()> + Send; } pub(crate) trait TAsyncReadWrite { diff --git a/duva/src/lib.rs b/duva/src/lib.rs index fbcf3dd3..116ab14b 100644 --- a/duva/src/lib.rs +++ b/duva/src/lib.rs @@ -12,6 +12,7 @@ use crate::domains::replications::*; use crate::domains::{TSerdeRead, TSerdeWrite}; use crate::signals::SignalHandler; use anyhow::{Context, Result}; +use bytes::BytesMut; pub use config::Environment; use domains::IoError; use domains::caches::cache_manager::CacheManager; @@ -235,7 +236,8 @@ impl StartUpFacade { async fn handle_client_stream(&self, stream: tokio::net::TcpStream) -> anyhow::Result<()> { let (mut read_half, write_half) = stream.into_split(); let mut writer = ClientStreamWriter(write_half); - let request = read_half.deserialized_read().await?; + let mut buffer = BytesMut::new(); + let request = read_half.deserialized_read(&mut buffer).await?; match request { ConnectionRequests::Discovery => { diff --git a/duva/src/presentation/clients/authenticate.rs b/duva/src/presentation/clients/authenticate.rs index c551ef3f..a3cd7744 100644 --- a/duva/src/presentation/clients/authenticate.rs +++ b/duva/src/presentation/clients/authenticate.rs @@ -19,18 +19,18 @@ impl ConnectionRequest { Ok((client_id.to_string(), self.request_id)) } } -#[derive(Debug, Clone, PartialEq, Eq, bincode::Decode, bincode::Encode)] +#[derive(Debug, Clone, PartialEq, Eq, bincode::BorrowDecode, bincode::Encode)] pub enum ConnectionRequests { Discovery, Authenticate(ConnectionRequest), } -#[derive(Debug, Clone, bincode::Decode, bincode::Encode)] +#[derive(Debug, Clone, bincode::BorrowDecode, bincode::Encode)] pub enum ConnectionResponses { Discovery { leader_id: PeerIdentifier }, Authenticated(ConnectionResponse), } -#[derive(Debug, Clone, Default, bincode::Decode, bincode::Encode)] +#[derive(Debug, Clone, Default, bincode::BorrowDecode, bincode::Encode)] pub struct ConnectionResponse { pub client_id: String, pub request_id: u64, diff --git a/duva/src/presentation/clients/request.rs b/duva/src/presentation/clients/request.rs index 6ca02df4..5394f9d3 100644 --- a/duva/src/presentation/clients/request.rs +++ b/duva/src/presentation/clients/request.rs @@ -12,19 +12,19 @@ use anyhow::Context; use chrono::{DateTime, Utc}; use std::str::FromStr; -#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] +#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)] pub struct SessionRequest { pub conn_offset: u64, pub action: ClientAction, } -#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)] +#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)] pub enum ClientAction { NonMutating(NonMutatingAction), Mutating(LogEntry), } -#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)] +#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)] pub enum NonMutatingAction { Ping, Echo(String), @@ -306,7 +306,7 @@ pub struct ClientRequest { pub(crate) conn_id: String, } -#[derive(Clone, Debug, bincode::Decode, bincode::Encode)] +#[derive(Clone, Debug, bincode::BorrowDecode, bincode::Encode)] pub enum ServerResponse { WriteRes { res: QueryIO, log_index: u64, conn_offset: u64 }, ReadRes { res: QueryIO, conn_offset: u64 }, diff --git a/duva/src/presentation/clients/stream.rs b/duva/src/presentation/clients/stream.rs index 1ef46040..f6baf183 100644 --- a/duva/src/presentation/clients/stream.rs +++ b/duva/src/presentation/clients/stream.rs @@ -11,6 +11,7 @@ use crate::prelude::ConnectionResponse; use crate::prelude::ConnectionResponses; use crate::presentation::clients::request::{ClientAction, ServerResponse, SessionRequest}; +use bytes::BytesMut; use tokio::{ net::tcp::{OwnedReadHalf, OwnedWriteHalf}, sync::mpsc::Sender, @@ -31,7 +32,8 @@ impl ClientStreamReader { ) { loop { // * extract queries - let query_ios = self.r.deserialized_reads::().await; + let mut buffer = BytesMut::new(); + let query_ios = self.r.deserialized_reads::(&mut buffer).await; if let Err(err) = query_ios { info!("{}", err); if err.should_break() { From ed0e9ef1cf3a4a1ca3dbdfb1bf9643baccb49f6c Mon Sep 17 00:00:00 2001 From: Migo Date: Sat, 29 Nov 2025 22:54:07 +0400 Subject: [PATCH 2/2] cleanup --- duva/src/adapters/io/tokio_stream.rs | 1 - duva/src/domains/interface.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/duva/src/adapters/io/tokio_stream.rs b/duva/src/adapters/io/tokio_stream.rs index bfc502f7..5818b68c 100644 --- a/duva/src/adapters/io/tokio_stream.rs +++ b/duva/src/adapters/io/tokio_stream.rs @@ -137,7 +137,6 @@ where slice = &slice[size..]; }, Err(_) => { - // Stop if we can't decode anymore (partial packet) break; }, } diff --git a/duva/src/domains/interface.rs b/duva/src/domains/interface.rs index b36cac0f..36c8bd93 100644 --- a/duva/src/domains/interface.rs +++ b/duva/src/domains/interface.rs @@ -8,7 +8,6 @@ use crate::domains::{ }, }; use bincode::BorrowDecode; -use bincode::config::Configuration; use bytes::BytesMut; #[async_trait::async_trait]