diff --git a/duva/src/adapters/io/tokio_stream.rs b/duva/src/adapters/io/tokio_stream.rs index 9c4139ee..9debe2fd 100644 --- a/duva/src/adapters/io/tokio_stream.rs +++ b/duva/src/adapters/io/tokio_stream.rs @@ -1,4 +1,3 @@ -use crate::domains::interface::{TRead, TWrite}; use crate::domains::peers::command::*; use crate::domains::peers::connections::connection_types::{ReadConnected, WriteConnected}; use crate::domains::query_io::SERDE_CONFIG; @@ -6,7 +5,7 @@ use crate::domains::{ IoError, TAsyncReadWrite, TReadBytes, TSerdeDynamicRead, TSerdeDynamicWrite, TSerdeRead, TSerdeWrite, }; -use crate::domains::{QueryIO, deserialize}; + use bytes::BytesMut; use std::fmt::Debug; use std::io::ErrorKind; @@ -16,13 +15,6 @@ use tokio::net::TcpStream; const BUFFER_SIZE: usize = 512; const INITIAL_CAPACITY: usize = 1024; -#[async_trait::async_trait] -impl TWrite for T { - async fn write(&mut self, io: QueryIO) -> Result<(), IoError> { - self.write_all(&io.serialize()).await.map_err(|e| io_error_from_kind(e.kind())) - } -} - #[async_trait::async_trait] impl TReadBytes for T { // TCP doesn't inherently delimit messages. @@ -58,30 +50,6 @@ impl TRead } } -#[async_trait::async_trait] -impl TRead for T { - async fn read_values(&mut self) -> Result, IoError> { - let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY); - self.read_bytes(&mut buffer).await?; - - let mut parsed_values = Vec::new(); - let mut remaining_buffer = buffer; - - while !remaining_buffer.is_empty() { - match deserialize(remaining_buffer.clone()) { - Ok((query_io, consumed)) => { - parsed_values.push(query_io); - remaining_buffer = remaining_buffer.split_off(consumed); - }, - Err(e) => { - return Err(IoError::Custom(format!("Parsing error: {e:?}"))); - }, - } - } - Ok(parsed_values) - } -} - #[async_trait::async_trait] impl TSerdeDynamicRead for T { async fn receive_peer_msgs(&mut self) -> Result, IoError> { @@ -189,8 +157,6 @@ impl From for IoError { #[cfg(test)] pub mod test_tokio_stream_impl { - use crate::types::BinBytes; - use super::*; #[derive(Debug, PartialEq, bincode::Encode, bincode::Decode)] struct TestMessage { @@ -250,47 +216,6 @@ pub mod test_tokio_stream_impl { assert_eq!(socket.ip().to_string(), "127.0.0.1") } - #[tokio::test] - async fn test_read_values() { - let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY); - // add a simple string to buffer - let sync_msg = "FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0"; - - buffer.extend_from_slice( - format!( - "${}\r\nFULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0\r\n", - sync_msg.len() - ) - .as_bytes(), - ); - - let peer_info_msg = "PEERS 127.0.0.1:6378"; - buffer.extend_from_slice( - format!("${}\r\nPEERS 127.0.0.1:6378\r\n", peer_info_msg.len()).as_bytes(), - ); - // add an integer to buffer - - let mut parsed_values = vec![]; - while !buffer.is_empty() { - if let Ok((query_io, consumed)) = deserialize(buffer.clone()) { - parsed_values.push(query_io); - - // * Remove the parsed portion from the buffer - buffer = buffer.split_off(consumed); - } - } - - assert_eq!(parsed_values.len(), 2); - assert_eq!( - parsed_values[0], - QueryIO::BulkString(BinBytes::new( - "FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0" - )) - ); - - assert_eq!(parsed_values[1], QueryIO::BulkString(BinBytes::new("PEERS 127.0.0.1:6378"))); - } - #[tokio::test] async fn test_deserialize_reads() { // 1. Arrange: Single message in one chunk diff --git a/duva/src/domains/cluster_actors/actor/tests/mod.rs b/duva/src/domains/cluster_actors/actor/tests/mod.rs index 3e1fb60e..95708004 100644 --- a/duva/src/domains/cluster_actors/actor/tests/mod.rs +++ b/duva/src/domains/cluster_actors/actor/tests/mod.rs @@ -9,7 +9,6 @@ use crate::CacheManager; use crate::Replication; use crate::ReplicationId; use crate::adapters::op_logs::memory_based::MemoryOpLogs; -use crate::domains::QueryIO; use crate::domains::TSerdeDynamicRead; use crate::domains::TSerdeDynamicWrite; use crate::domains::caches::actor::CacheCommandSender; @@ -23,10 +22,7 @@ use crate::domains::peers::connections::inbound::stream::InboundStream; use crate::domains::peers::service::PeerListener; use crate::types::Callback; -use crate::{ - domains::{IoError, TRead, TWrite}, - make_smart_pointer, -}; +use crate::{domains::IoError, make_smart_pointer}; use std::collections::VecDeque; use std::fs::OpenOptions; use std::sync::Arc; @@ -47,12 +43,6 @@ impl FakeReadWrite { Self(Arc::new(Mutex::new(VecDeque::new()))) } } -#[async_trait::async_trait] -impl TWrite for FakeReadWrite { - async fn write(&mut self, _io: QueryIO) -> Result<(), IoError> { - panic!() - } -} #[async_trait::async_trait] impl TSerdeDynamicRead for FakeReadWrite { @@ -80,13 +70,6 @@ impl TSerdeDynamicWrite for FakeReadWrite { } } -#[async_trait::async_trait] -impl TRead for FakeReadWrite { - async fn read_values(&mut self) -> Result, IoError> { - panic!() - } -} - pub(crate) struct Helper; impl Helper { // Helper function to create cache manager with con_idx diff --git a/duva/src/domains/interface.rs b/duva/src/domains/interface.rs index 7e8c3ec2..4f2b8bd6 100644 --- a/duva/src/domains/interface.rs +++ b/duva/src/domains/interface.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use crate::domains::{ - IoError, QueryIO, + IoError, peers::{ command::PeerMessage, connections::connection_types::{ReadConnected, WriteConnected}, @@ -9,33 +9,23 @@ use crate::domains::{ }; use bytes::BytesMut; -#[async_trait::async_trait] -pub trait TRead: Send + Sync + Debug + 'static { - async fn read_values(&mut self) -> Result, IoError>; -} - #[async_trait::async_trait] pub trait TReadBytes: Send + Sync + Debug + 'static { async fn read_bytes(&mut self, buf: &mut BytesMut) -> Result<(), IoError>; } #[async_trait::async_trait] -pub(crate) trait TSerdeDynamicRead: TRead + Send + Sync + Debug + 'static { +pub(crate) trait TSerdeDynamicRead: Send + Sync + Debug + 'static { async fn receive_peer_msgs(&mut self) -> Result, IoError>; async fn receive_connection_msgs(&mut self) -> Result; } #[async_trait::async_trait] -pub(crate) trait TSerdeDynamicWrite: TWrite + Send + Sync + Debug + 'static { +pub(crate) trait TSerdeDynamicWrite: Send + Sync + Debug + 'static { async fn send(&mut self, msg: PeerMessage) -> Result<(), IoError>; async fn send_connection_msg(&mut self, arg: &str) -> Result<(), IoError>; } -#[async_trait::async_trait] -pub(crate) trait TWrite: Send + Sync + Debug + 'static { - async fn write(&mut self, io: QueryIO) -> Result<(), IoError>; -} - pub trait TSerdeWrite { fn serialized_write( &mut self, diff --git a/duva/src/domains/mod.rs b/duva/src/domains/mod.rs index a879bb7c..d7facb5b 100644 --- a/duva/src/domains/mod.rs +++ b/duva/src/domains/mod.rs @@ -11,4 +11,3 @@ pub mod interface; pub use interface::*; pub mod query_io; pub(crate) use query_io::QueryIO; -pub(crate) use query_io::deserialize;