Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 1 addition & 76 deletions duva/src/adapters/io/tokio_stream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::domains::interface::{TRead, TWrite};
use crate::domains::peers::command::*;
use crate::domains::peers::connections::connection_types::{ReadConnected, WriteConnected};
use crate::domains::query_io::SERDE_CONFIG;
use crate::domains::{
IoError, TAsyncReadWrite, TReadBytes, TSerdeDynamicRead, TSerdeDynamicWrite, TSerdeRead,
TSerdeWrite,
};
use crate::domains::{QueryIO, deserialize};

use bytes::BytesMut;
use std::fmt::Debug;
use std::io::ErrorKind;
Expand All @@ -16,13 +15,6 @@ use tokio::net::TcpStream;
const BUFFER_SIZE: usize = 512;
const INITIAL_CAPACITY: usize = 1024;

#[async_trait::async_trait]
impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TWrite for T {
async fn write(&mut self, io: QueryIO) -> Result<(), IoError> {
self.write_all(&io.serialize()).await.map_err(|e| io_error_from_kind(e.kind()))
}
}

#[async_trait::async_trait]
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TReadBytes for T {
// TCP doesn't inherently delimit messages.
Expand Down Expand Up @@ -58,30 +50,6 @@ impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TRead
}
}

#[async_trait::async_trait]
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TRead for T {
async fn read_values(&mut self) -> Result<Vec<QueryIO>, IoError> {
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
self.read_bytes(&mut buffer).await?;

let mut parsed_values = Vec::new();
let mut remaining_buffer = buffer;

while !remaining_buffer.is_empty() {
match deserialize(remaining_buffer.clone()) {
Ok((query_io, consumed)) => {
parsed_values.push(query_io);
remaining_buffer = remaining_buffer.split_off(consumed);
},
Err(e) => {
return Err(IoError::Custom(format!("Parsing error: {e:?}")));
},
}
}
Ok(parsed_values)
}
}

#[async_trait::async_trait]
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeDynamicRead for T {
async fn receive_peer_msgs(&mut self) -> Result<Vec<PeerMessage>, IoError> {
Expand Down Expand Up @@ -189,8 +157,6 @@ impl From<ErrorKind> for IoError {

#[cfg(test)]
pub mod test_tokio_stream_impl {
use crate::types::BinBytes;

use super::*;
#[derive(Debug, PartialEq, bincode::Encode, bincode::Decode)]
struct TestMessage {
Expand Down Expand Up @@ -250,47 +216,6 @@ pub mod test_tokio_stream_impl {
assert_eq!(socket.ip().to_string(), "127.0.0.1")
}

#[tokio::test]
async fn test_read_values() {
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
// add a simple string to buffer
let sync_msg = "FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0";

buffer.extend_from_slice(
format!(
"${}\r\nFULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0\r\n",
sync_msg.len()
)
.as_bytes(),
);

let peer_info_msg = "PEERS 127.0.0.1:6378";
buffer.extend_from_slice(
format!("${}\r\nPEERS 127.0.0.1:6378\r\n", peer_info_msg.len()).as_bytes(),
);
// add an integer to buffer

let mut parsed_values = vec![];
while !buffer.is_empty() {
if let Ok((query_io, consumed)) = deserialize(buffer.clone()) {
parsed_values.push(query_io);

// * Remove the parsed portion from the buffer
buffer = buffer.split_off(consumed);
}
}

assert_eq!(parsed_values.len(), 2);
assert_eq!(
parsed_values[0],
QueryIO::BulkString(BinBytes::new(
"FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0"
))
);

assert_eq!(parsed_values[1], QueryIO::BulkString(BinBytes::new("PEERS 127.0.0.1:6378")));
}

#[tokio::test]
async fn test_deserialize_reads() {
// 1. Arrange: Single message in one chunk
Expand Down
19 changes: 1 addition & 18 deletions duva/src/domains/cluster_actors/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::CacheManager;
use crate::Replication;
use crate::ReplicationId;
use crate::adapters::op_logs::memory_based::MemoryOpLogs;
use crate::domains::QueryIO;
use crate::domains::TSerdeDynamicRead;
use crate::domains::TSerdeDynamicWrite;
use crate::domains::caches::actor::CacheCommandSender;
Expand All @@ -23,10 +22,7 @@ use crate::domains::peers::connections::inbound::stream::InboundStream;

use crate::domains::peers::service::PeerListener;
use crate::types::Callback;
use crate::{
domains::{IoError, TRead, TWrite},
make_smart_pointer,
};
use crate::{domains::IoError, make_smart_pointer};
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::sync::Arc;
Expand All @@ -47,12 +43,6 @@ impl FakeReadWrite {
Self(Arc::new(Mutex::new(VecDeque::new())))
}
}
#[async_trait::async_trait]
impl TWrite for FakeReadWrite {
async fn write(&mut self, _io: QueryIO) -> Result<(), IoError> {
panic!()
}
}

#[async_trait::async_trait]
impl TSerdeDynamicRead for FakeReadWrite {
Expand Down Expand Up @@ -80,13 +70,6 @@ impl TSerdeDynamicWrite for FakeReadWrite {
}
}

#[async_trait::async_trait]
impl TRead for FakeReadWrite {
async fn read_values(&mut self) -> Result<Vec<QueryIO>, IoError> {
panic!()
}
}

pub(crate) struct Helper;
impl Helper {
// Helper function to create cache manager with con_idx
Expand Down
16 changes: 3 additions & 13 deletions duva/src/domains/interface.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,31 @@
use std::fmt::Debug;

use crate::domains::{
IoError, QueryIO,
IoError,
peers::{
command::PeerMessage,
connections::connection_types::{ReadConnected, WriteConnected},
},
};
use bytes::BytesMut;

#[async_trait::async_trait]
pub trait TRead: Send + Sync + Debug + 'static {
async fn read_values(&mut self) -> Result<Vec<QueryIO>, IoError>;
}

#[async_trait::async_trait]
pub trait TReadBytes: Send + Sync + Debug + 'static {
async fn read_bytes(&mut self, buf: &mut BytesMut) -> Result<(), IoError>;
}

#[async_trait::async_trait]
pub(crate) trait TSerdeDynamicRead: TRead + Send + Sync + Debug + 'static {
pub(crate) trait TSerdeDynamicRead: Send + Sync + Debug + 'static {
async fn receive_peer_msgs(&mut self) -> Result<Vec<PeerMessage>, IoError>;
async fn receive_connection_msgs(&mut self) -> Result<String, IoError>;
}

#[async_trait::async_trait]
pub(crate) trait TSerdeDynamicWrite: TWrite + Send + Sync + Debug + 'static {
pub(crate) trait TSerdeDynamicWrite: Send + Sync + Debug + 'static {
async fn send(&mut self, msg: PeerMessage) -> Result<(), IoError>;
async fn send_connection_msg(&mut self, arg: &str) -> Result<(), IoError>;
}

#[async_trait::async_trait]
pub(crate) trait TWrite: Send + Sync + Debug + 'static {
async fn write(&mut self, io: QueryIO) -> Result<(), IoError>;
}

pub trait TSerdeWrite {
fn serialized_write(
&mut self,
Expand Down
1 change: 0 additions & 1 deletion duva/src/domains/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ pub mod interface;
pub use interface::*;
pub mod query_io;
pub(crate) use query_io::QueryIO;
pub(crate) use query_io::deserialize;
Loading