From 38b166623f6c3421a4a497beda69e3cabe7055a5 Mon Sep 17 00:00:00 2001 From: Migo Date: Thu, 13 Nov 2025 20:48:13 +0400 Subject: [PATCH 1/2] remove twrite --- duva/src/adapters/io/tokio_stream.rs | 9 +-------- duva/src/domains/cluster_actors/actor/tests/mod.rs | 8 +------- duva/src/domains/interface.rs | 7 +------ 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/duva/src/adapters/io/tokio_stream.rs b/duva/src/adapters/io/tokio_stream.rs index 9c4139ee..2f1d9d6d 100644 --- a/duva/src/adapters/io/tokio_stream.rs +++ b/duva/src/adapters/io/tokio_stream.rs @@ -1,4 +1,4 @@ -use crate::domains::interface::{TRead, TWrite}; +use crate::domains::interface::TRead; use crate::domains::peers::command::*; use crate::domains::peers::connections::connection_types::{ReadConnected, WriteConnected}; use crate::domains::query_io::SERDE_CONFIG; @@ -16,13 +16,6 @@ use tokio::net::TcpStream; const BUFFER_SIZE: usize = 512; const INITIAL_CAPACITY: usize = 1024; -#[async_trait::async_trait] -impl TWrite for T { - async fn write(&mut self, io: QueryIO) -> Result<(), IoError> { - self.write_all(&io.serialize()).await.map_err(|e| io_error_from_kind(e.kind())) - } -} - #[async_trait::async_trait] impl TReadBytes for T { // TCP doesn't inherently delimit messages. diff --git a/duva/src/domains/cluster_actors/actor/tests/mod.rs b/duva/src/domains/cluster_actors/actor/tests/mod.rs index 3e1fb60e..61d99e6f 100644 --- a/duva/src/domains/cluster_actors/actor/tests/mod.rs +++ b/duva/src/domains/cluster_actors/actor/tests/mod.rs @@ -24,7 +24,7 @@ use crate::domains::peers::connections::inbound::stream::InboundStream; use crate::domains::peers::service::PeerListener; use crate::types::Callback; use crate::{ - domains::{IoError, TRead, TWrite}, + domains::{IoError, TRead}, make_smart_pointer, }; use std::collections::VecDeque; @@ -47,12 +47,6 @@ impl FakeReadWrite { Self(Arc::new(Mutex::new(VecDeque::new()))) } } -#[async_trait::async_trait] -impl TWrite for FakeReadWrite { - async fn write(&mut self, _io: QueryIO) -> Result<(), IoError> { - panic!() - } -} #[async_trait::async_trait] impl TSerdeDynamicRead for FakeReadWrite { diff --git a/duva/src/domains/interface.rs b/duva/src/domains/interface.rs index 7e8c3ec2..a57cb5ef 100644 --- a/duva/src/domains/interface.rs +++ b/duva/src/domains/interface.rs @@ -26,16 +26,11 @@ pub(crate) trait TSerdeDynamicRead: TRead + Send + Sync + Debug + 'static { } #[async_trait::async_trait] -pub(crate) trait TSerdeDynamicWrite: TWrite + Send + Sync + Debug + 'static { +pub(crate) trait TSerdeDynamicWrite: Send + Sync + Debug + 'static { async fn send(&mut self, msg: PeerMessage) -> Result<(), IoError>; async fn send_connection_msg(&mut self, arg: &str) -> Result<(), IoError>; } -#[async_trait::async_trait] -pub(crate) trait TWrite: Send + Sync + Debug + 'static { - async fn write(&mut self, io: QueryIO) -> Result<(), IoError>; -} - pub trait TSerdeWrite { fn serialized_write( &mut self, From a23aa0172cb891d4e9994b61a6af7c82b0a64056 Mon Sep 17 00:00:00 2001 From: Migo Date: Thu, 13 Nov 2025 20:51:46 +0400 Subject: [PATCH 2/2] read values --- duva/src/adapters/io/tokio_stream.rs | 70 +------------------ .../domains/cluster_actors/actor/tests/mod.rs | 13 +--- duva/src/domains/interface.rs | 9 +-- duva/src/domains/mod.rs | 1 - 4 files changed, 4 insertions(+), 89 deletions(-) diff --git a/duva/src/adapters/io/tokio_stream.rs b/duva/src/adapters/io/tokio_stream.rs index 2f1d9d6d..9debe2fd 100644 --- a/duva/src/adapters/io/tokio_stream.rs +++ b/duva/src/adapters/io/tokio_stream.rs @@ -1,4 +1,3 @@ -use crate::domains::interface::TRead; use crate::domains::peers::command::*; use crate::domains::peers::connections::connection_types::{ReadConnected, WriteConnected}; use crate::domains::query_io::SERDE_CONFIG; @@ -6,7 +5,7 @@ use crate::domains::{ IoError, TAsyncReadWrite, TReadBytes, TSerdeDynamicRead, TSerdeDynamicWrite, TSerdeRead, TSerdeWrite, }; -use crate::domains::{QueryIO, deserialize}; + use bytes::BytesMut; use std::fmt::Debug; use std::io::ErrorKind; @@ -51,30 +50,6 @@ impl TRead } } -#[async_trait::async_trait] -impl TRead for T { - async fn read_values(&mut self) -> Result, IoError> { - let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY); - self.read_bytes(&mut buffer).await?; - - let mut parsed_values = Vec::new(); - let mut remaining_buffer = buffer; - - while !remaining_buffer.is_empty() { - match deserialize(remaining_buffer.clone()) { - Ok((query_io, consumed)) => { - parsed_values.push(query_io); - remaining_buffer = remaining_buffer.split_off(consumed); - }, - Err(e) => { - return Err(IoError::Custom(format!("Parsing error: {e:?}"))); - }, - } - } - Ok(parsed_values) - } -} - #[async_trait::async_trait] impl TSerdeDynamicRead for T { async fn receive_peer_msgs(&mut self) -> Result, IoError> { @@ -182,8 +157,6 @@ impl From for IoError { #[cfg(test)] pub mod test_tokio_stream_impl { - use crate::types::BinBytes; - use super::*; #[derive(Debug, PartialEq, bincode::Encode, bincode::Decode)] struct TestMessage { @@ -243,47 +216,6 @@ pub mod test_tokio_stream_impl { assert_eq!(socket.ip().to_string(), "127.0.0.1") } - #[tokio::test] - async fn test_read_values() { - let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY); - // add a simple string to buffer - let sync_msg = "FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0"; - - buffer.extend_from_slice( - format!( - "${}\r\nFULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0\r\n", - sync_msg.len() - ) - .as_bytes(), - ); - - let peer_info_msg = "PEERS 127.0.0.1:6378"; - buffer.extend_from_slice( - format!("${}\r\nPEERS 127.0.0.1:6378\r\n", peer_info_msg.len()).as_bytes(), - ); - // add an integer to buffer - - let mut parsed_values = vec![]; - while !buffer.is_empty() { - if let Ok((query_io, consumed)) = deserialize(buffer.clone()) { - parsed_values.push(query_io); - - // * Remove the parsed portion from the buffer - buffer = buffer.split_off(consumed); - } - } - - assert_eq!(parsed_values.len(), 2); - assert_eq!( - parsed_values[0], - QueryIO::BulkString(BinBytes::new( - "FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0" - )) - ); - - assert_eq!(parsed_values[1], QueryIO::BulkString(BinBytes::new("PEERS 127.0.0.1:6378"))); - } - #[tokio::test] async fn test_deserialize_reads() { // 1. Arrange: Single message in one chunk diff --git a/duva/src/domains/cluster_actors/actor/tests/mod.rs b/duva/src/domains/cluster_actors/actor/tests/mod.rs index 61d99e6f..95708004 100644 --- a/duva/src/domains/cluster_actors/actor/tests/mod.rs +++ b/duva/src/domains/cluster_actors/actor/tests/mod.rs @@ -9,7 +9,6 @@ use crate::CacheManager; use crate::Replication; use crate::ReplicationId; use crate::adapters::op_logs::memory_based::MemoryOpLogs; -use crate::domains::QueryIO; use crate::domains::TSerdeDynamicRead; use crate::domains::TSerdeDynamicWrite; use crate::domains::caches::actor::CacheCommandSender; @@ -23,10 +22,7 @@ use crate::domains::peers::connections::inbound::stream::InboundStream; use crate::domains::peers::service::PeerListener; use crate::types::Callback; -use crate::{ - domains::{IoError, TRead}, - make_smart_pointer, -}; +use crate::{domains::IoError, make_smart_pointer}; use std::collections::VecDeque; use std::fs::OpenOptions; use std::sync::Arc; @@ -74,13 +70,6 @@ impl TSerdeDynamicWrite for FakeReadWrite { } } -#[async_trait::async_trait] -impl TRead for FakeReadWrite { - async fn read_values(&mut self) -> Result, IoError> { - panic!() - } -} - pub(crate) struct Helper; impl Helper { // Helper function to create cache manager with con_idx diff --git a/duva/src/domains/interface.rs b/duva/src/domains/interface.rs index a57cb5ef..4f2b8bd6 100644 --- a/duva/src/domains/interface.rs +++ b/duva/src/domains/interface.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use crate::domains::{ - IoError, QueryIO, + IoError, peers::{ command::PeerMessage, connections::connection_types::{ReadConnected, WriteConnected}, @@ -9,18 +9,13 @@ use crate::domains::{ }; use bytes::BytesMut; -#[async_trait::async_trait] -pub trait TRead: Send + Sync + Debug + 'static { - async fn read_values(&mut self) -> Result, IoError>; -} - #[async_trait::async_trait] pub trait TReadBytes: Send + Sync + Debug + 'static { async fn read_bytes(&mut self, buf: &mut BytesMut) -> Result<(), IoError>; } #[async_trait::async_trait] -pub(crate) trait TSerdeDynamicRead: TRead + Send + Sync + Debug + 'static { +pub(crate) trait TSerdeDynamicRead: Send + Sync + Debug + 'static { async fn receive_peer_msgs(&mut self) -> Result, IoError>; async fn receive_connection_msgs(&mut self) -> Result; } diff --git a/duva/src/domains/mod.rs b/duva/src/domains/mod.rs index a879bb7c..d7facb5b 100644 --- a/duva/src/domains/mod.rs +++ b/duva/src/domains/mod.rs @@ -11,4 +11,3 @@ pub mod interface; pub use interface::*; pub mod query_io; pub(crate) use query_io::QueryIO; -pub(crate) use query_io::deserialize;