Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions duva-client/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use duva::prelude::tokio::sync::mpsc::Receiver;
use duva::prelude::tokio::sync::mpsc::Sender;
use duva::prelude::uuid::Uuid;
use duva::prelude::{
ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses, ReplicationId,
BytesMut, ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses,
ReplicationId,
};
use duva::prelude::{PeerIdentifier, tokio};
use duva::prelude::{Topology, anyhow};
Expand Down Expand Up @@ -115,8 +116,10 @@ impl Broker {
conn_req: ConnectionRequest,
) -> anyhow::Result<(ServerStreamReader, ServerStreamWriter, ConnectionResponse)> {
stream.serialized_write(ConnectionRequests::Authenticate(conn_req)).await?; // client_id not exist

let ConnectionResponses::Authenticated(response) = stream.deserialized_read().await? else {
let mut buffer = BytesMut::new();
let ConnectionResponses::Authenticated(response) =
stream.deserialized_read(&mut buffer).await?
else {
bail!("Authentication failed");
};

Expand Down Expand Up @@ -161,7 +164,10 @@ impl Broker {
async fn discover_leader_from(&mut self, follower: PeerIdentifier) -> anyhow::Result<()> {
let mut stream = TcpStream::connect(follower.as_str()).await?;
stream.serialized_write(ConnectionRequests::Discovery).await?;
let ConnectionResponses::Discovery { leader_id } = stream.deserialized_read().await? else {
let mut buffer = BytesMut::new();
let ConnectionResponses::Discovery { leader_id } =
stream.deserialized_read(&mut buffer).await?
else {
bail!("Discovery failed!");
};

Expand Down
6 changes: 3 additions & 3 deletions duva-client/src/broker/read_stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::broker::BrokerMessage;
use duva::domains::TSerdeRead;

use duva::prelude::ReplicationId;
use duva::prelude::tokio::{self, net::tcp::OwnedReadHalf, sync::oneshot};
use duva::prelude::{BytesMut, ReplicationId};

pub struct ServerStreamReader(pub(crate) OwnedReadHalf);
impl ServerStreamReader {
Expand All @@ -15,9 +15,9 @@ impl ServerStreamReader {

let future = async move {
let controller_sender = controller_sender.clone();

loop {
match self.0.deserialized_reads().await {
let mut buffer = BytesMut::new();
match self.0.deserialized_reads(&mut buffer).await {
Ok(server_responses) => {
for res in server_responses {
if controller_sender
Expand Down
61 changes: 42 additions & 19 deletions duva/src/adapters/io/tokio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use crate::domains::{
TSerdeWrite,
};

use async_trait::async_trait;
use bincode::BorrowDecode;

use bytes::BytesMut;
use std::fmt::Debug;
use std::io::ErrorKind;
Expand Down Expand Up @@ -65,7 +68,8 @@ impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerd
Ok(parsed_values)
}
async fn receive_connection_msgs(&mut self) -> Result<String, IoError> {
self.deserialized_read().await
let mut buffer = BytesMut::new();
self.deserialized_read(&mut buffer).await
}
}

Expand All @@ -92,35 +96,52 @@ impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSer
}
}

impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeRead for T {
async fn deserialized_read<U>(&mut self) -> Result<U, IoError>
#[async_trait]
impl<T> TSerdeRead for T
where
T: AsyncReadExt + Unpin + Sync + Send + Debug + 'static,
{
async fn deserialized_read<'a, U>(&mut self, buffer: &'a mut BytesMut) -> Result<U, IoError>
where
U: bincode::Decode<()>,
U: BorrowDecode<'a, ()> + Send,
{
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
self.read_bytes(&mut buffer).await?;
self.read_bytes(buffer).await?;

let (request, _) = bincode::decode_from_slice(&buffer, SERDE_CONFIG)
.map_err(|e| IoError::Custom(e.to_string()))?;
let (request, _) = bincode::borrow_decode_from_slice(&buffer[..], SERDE_CONFIG).unwrap();

Ok(request)
}

async fn deserialized_reads<U>(&mut self) -> Result<Vec<U>, IoError>
async fn deserialized_reads<'a, U>(
&mut self,
buffer: &'a mut BytesMut,
) -> Result<Vec<U>, IoError>
where
U: bincode::Decode<()>,
U: BorrowDecode<'a, ()> + Send,
{
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
self.read_bytes(&mut buffer).await?;
// Read data from socket into buffer
self.read_bytes(buffer).await?;

let mut parsed_values = Vec::new();

while !buffer.is_empty() {
let (request, size) = bincode::decode_from_slice(&buffer, SERDE_CONFIG)
.map_err(|e| IoError::Custom(e.to_string()))?;
parsed_values.push(request);
buffer = buffer.split_off(size);
// Zero-copy slicing logic
let mut slice = &buffer[..];

// Note: In a real protocol, you need a loop that checks if
// there is enough data for a full frame before decoding.
// This simple loop assumes the buffer contains perfect packets.
while !slice.is_empty() {
match bincode::borrow_decode_from_slice(slice, SERDE_CONFIG) {
Ok((request, size)) => {
parsed_values.push(request);
slice = &slice[size..];
},
Err(_) => {
break;
},
}
}

Ok(parsed_values)
}
}
Expand Down Expand Up @@ -226,7 +247,8 @@ pub mod test_tokio_stream_impl {
let mut mock = MockAsyncStream::new(vec![encoded_msg.into()]);

// 2. Act
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads().await;
let mut buffer = BytesMut::new();
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads(&mut buffer).await;

// 3. Assert
let deserialized = result.unwrap();
Expand All @@ -247,7 +269,8 @@ pub mod test_tokio_stream_impl {
let mut mock = MockAsyncStream::new(vec![raw_data]);

// 2. Act
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads().await;
let mut buffer = BytesMut::new();
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads(&mut buffer).await;

// 3. Assert
let deserialized = result.unwrap();
Expand Down
15 changes: 10 additions & 5 deletions duva/src/domains/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::domains::{
connections::connection_types::{ReadConnected, WriteConnected},
},
};
use bincode::BorrowDecode;
use bytes::BytesMut;

#[async_trait::async_trait]
Expand All @@ -33,14 +34,18 @@ pub trait TSerdeWrite {
) -> impl std::future::Future<Output = Result<(), IoError>> + Send;
}

#[async_trait::async_trait]
pub trait TSerdeRead {
fn deserialized_read<U: bincode::Decode<()>>(
&mut self,
) -> impl std::future::Future<Output = Result<U, IoError>> + Send;
async fn deserialized_read<'a, U>(&mut self, buffer: &'a mut BytesMut) -> Result<U, IoError>
where
U: BorrowDecode<'a, ()> + Send; // 'U' lives as long as 'buffer'

fn deserialized_reads<U: bincode::Decode<()>>(
async fn deserialized_reads<'a, U>(
&mut self,
) -> impl std::future::Future<Output = Result<Vec<U>, IoError>> + Send;
buffer: &'a mut BytesMut,
) -> Result<Vec<U>, IoError>
where
U: BorrowDecode<'a, ()> + Send;
}

pub(crate) trait TAsyncReadWrite {
Expand Down
4 changes: 3 additions & 1 deletion duva/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::domains::replications::*;
use crate::domains::{TSerdeRead, TSerdeWrite};
use crate::signals::SignalHandler;
use anyhow::{Context, Result};
use bytes::BytesMut;
pub use config::Environment;
use domains::IoError;
use domains::caches::cache_manager::CacheManager;
Expand Down Expand Up @@ -235,7 +236,8 @@ impl StartUpFacade {
async fn handle_client_stream(&self, stream: tokio::net::TcpStream) -> anyhow::Result<()> {
let (mut read_half, write_half) = stream.into_split();
let mut writer = ClientStreamWriter(write_half);
let request = read_half.deserialized_read().await?;
let mut buffer = BytesMut::new();
let request = read_half.deserialized_read(&mut buffer).await?;

match request {
ConnectionRequests::Discovery => {
Expand Down
6 changes: 3 additions & 3 deletions duva/src/presentation/clients/authenticate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ impl ConnectionRequest {
Ok((client_id.to_string(), self.request_id))
}
}
#[derive(Debug, Clone, PartialEq, Eq, bincode::Decode, bincode::Encode)]
#[derive(Debug, Clone, PartialEq, Eq, bincode::BorrowDecode, bincode::Encode)]
pub enum ConnectionRequests {
Discovery,
Authenticate(ConnectionRequest),
}
#[derive(Debug, Clone, bincode::Decode, bincode::Encode)]
#[derive(Debug, Clone, bincode::BorrowDecode, bincode::Encode)]
pub enum ConnectionResponses {
Discovery { leader_id: PeerIdentifier },
Authenticated(ConnectionResponse),
}

#[derive(Debug, Clone, Default, bincode::Decode, bincode::Encode)]
#[derive(Debug, Clone, Default, bincode::BorrowDecode, bincode::Encode)]
pub struct ConnectionResponse {
pub client_id: String,
pub request_id: u64,
Expand Down
8 changes: 4 additions & 4 deletions duva/src/presentation/clients/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use std::str::FromStr;

#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)]
#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)]
pub struct SessionRequest {
pub conn_offset: u64,
pub action: ClientAction,
}

#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)]
#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)]
pub enum ClientAction {
NonMutating(NonMutatingAction),
Mutating(LogEntry),
}

#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)]
#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)]
pub enum NonMutatingAction {
Ping,
Echo(String),
Expand Down Expand Up @@ -306,7 +306,7 @@ pub struct ClientRequest {
pub(crate) conn_id: String,
}

#[derive(Clone, Debug, bincode::Decode, bincode::Encode)]
#[derive(Clone, Debug, bincode::BorrowDecode, bincode::Encode)]
pub enum ServerResponse {
WriteRes { res: QueryIO, log_index: u64, conn_offset: u64 },
ReadRes { res: QueryIO, conn_offset: u64 },
Expand Down
4 changes: 3 additions & 1 deletion duva/src/presentation/clients/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::prelude::ConnectionResponse;
use crate::prelude::ConnectionResponses;
use crate::presentation::clients::request::{ClientAction, ServerResponse, SessionRequest};

use bytes::BytesMut;
use tokio::{
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
sync::mpsc::Sender,
Expand All @@ -31,7 +32,8 @@ impl ClientStreamReader {
) {
loop {
// * extract queries
let query_ios = self.r.deserialized_reads::<SessionRequest>().await;
let mut buffer = BytesMut::new();
let query_ios = self.r.deserialized_reads::<SessionRequest>(&mut buffer).await;
if let Err(err) = query_ios {
info!("{}", err);
if err.should_break() {
Expand Down
Loading