diff --git a/duva-client/src/broker/mod.rs b/duva-client/src/broker/mod.rs index baf4ffbf..648b7414 100644 --- a/duva-client/src/broker/mod.rs +++ b/duva-client/src/broker/mod.rs @@ -102,7 +102,7 @@ impl Broker { } else { context.callback(ServerResponse::Err { reason: "Failed to route command. Try again after ttl time".to_string(), - request_id: 0, + conn_offset: 0, }) }; }, @@ -292,7 +292,10 @@ impl Broker { // ! otherwise, server will not be able to process the next command match res { - ServerResponse::ReadRes { res: QueryIO::BulkString(..), request_id } => { + ServerResponse::ReadRes { + res: QueryIO::BulkString(..), + conn_offset: request_id, + } => { connection.request_id = connection.request_id.max(*request_id); }, ServerResponse::WriteRes { res: QueryIO::BulkString(..), log_index, .. } => { diff --git a/duva-client/src/broker/node_connections.rs b/duva-client/src/broker/node_connections.rs index 5ad142a3..e938e88a 100644 --- a/duva-client/src/broker/node_connections.rs +++ b/duva-client/src/broker/node_connections.rs @@ -97,7 +97,8 @@ impl NodeConnection { } pub(crate) async fn send(&self, client_action: ClientAction) -> anyhow::Result<()> { - let session_request = SessionRequest { request_id: self.request_id, action: client_action }; + let session_request = + SessionRequest { conn_offset: self.request_id, action: client_action }; self.writer .send(MsgToServer::Command(bincode::encode_to_vec(session_request, SERDE_CONFIG)?)) .await diff --git a/duva-client/src/command.rs b/duva-client/src/command.rs index 98827ee0..26a7926d 100644 --- a/duva-client/src/command.rs +++ b/duva-client/src/command.rs @@ -35,7 +35,7 @@ impl CommandQueue { let result = context .get_result() - .unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), request_id: 0 }); + .unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), conn_offset: 0 }); context.callback(result); } } @@ -84,11 +84,13 @@ impl InputContext { ClientAction::NonMutating(Keys { pattern: _ } | MGet { keys: _ }) => { let mut init = QueryIO::Array(Vec::with_capacity(iterator.len())); - while let Some(ServerResponse::ReadRes { res, request_id }) = iterator.next() { + while let Some(ServerResponse::ReadRes { res, conn_offset: request_id }) = + iterator.next() + { init = init.merge(res)?; highest_req_id = highest_req_id.max(request_id); } - Ok(ServerResponse::ReadRes { res: init, request_id: highest_req_id }) + Ok(ServerResponse::ReadRes { res: init, conn_offset: highest_req_id }) }, ClientAction::NonMutating(Exists { keys: _ }) => { @@ -96,7 +98,7 @@ impl InputContext { while let Some(ServerResponse::ReadRes { res: QueryIO::BulkString(byte), - request_id, + conn_offset: request_id, }) = iterator.next() { let num = String::from_utf8(byte.to_vec()) @@ -109,7 +111,7 @@ impl InputContext { Ok(ServerResponse::ReadRes { res: QueryIO::BulkString(BinBytes::new(count.to_string())), - request_id: highest_req_id, + conn_offset: highest_req_id, }) }, ClientAction::Mutating(LogEntry::Delete { keys: _ }) => { @@ -117,7 +119,7 @@ impl InputContext { while let Some(ServerResponse::WriteRes { res: QueryIO::BulkString(value), - request_id, + conn_offset: request_id, .. }) = iterator.next() { @@ -128,7 +130,7 @@ impl InputContext { Ok(ServerResponse::WriteRes { res: QueryIO::BulkString(BinBytes::new(count.to_string())), log_index: 0, // TODO - request_id: highest_req_id, + conn_offset: highest_req_id, }) }, _ => iterator.next().ok_or(anyhow::anyhow!("Expected exactly one result")), diff --git a/duva/src/adapters/op_logs/disk_based.rs b/duva/src/adapters/op_logs/disk_based.rs index 6d398dc9..157b62c1 100644 --- a/duva/src/adapters/op_logs/disk_based.rs +++ b/duva/src/adapters/op_logs/disk_based.rs @@ -547,7 +547,7 @@ mod tests { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, log_index: index, term, - session_req: None, + conn_offset: None, } } @@ -576,7 +576,7 @@ mod tests { let request = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }; let write_op = - WriteOperation { entry: request.clone(), log_index: 0, term: 0, session_req: None }; + WriteOperation { entry: request.clone(), log_index: 0, term: 0, conn_offset: None }; // WHEN op_logs.write_many(vec![write_op]).unwrap(); @@ -743,7 +743,7 @@ mod tests { }, log_index: i as u64, term: 1, - session_req: None, + conn_offset: None, }])?; } // Force rotation @@ -753,7 +753,7 @@ mod tests { entry: LogEntry::Set { entry: CacheEntry::new("new".to_string(), "value") }, log_index: 100, term: 1, - session_req: None, + conn_offset: None, }])?; // WHEN @@ -826,7 +826,7 @@ mod tests { }, log_index: start_index + i as u64, term, - session_req: None, + conn_offset: None, }) .collect() } @@ -1065,7 +1065,7 @@ mod tests { }, log_index: i as u64, term: 1, - session_req: None, + conn_offset: None, }) .collect(), )?; @@ -1084,7 +1084,7 @@ mod tests { entry: LogEntry::Set { entry: CacheEntry::new("new".to_string(), "value") }, log_index: 100, term: 1, - session_req: None, + conn_offset: None, }])?; // Verify index data in active segment @@ -1110,7 +1110,7 @@ mod tests { }, log_index: i as u64, term: 1, - session_req: None, + conn_offset: None, }) .collect(), )?; diff --git a/duva/src/domains/cluster_actors/actor.rs b/duva/src/domains/cluster_actors/actor.rs index 3ae867b4..6ac872ef 100644 --- a/duva/src/domains/cluster_actors/actor.rs +++ b/duva/src/domains/cluster_actors/actor.rs @@ -1,6 +1,6 @@ use super::ClusterCommand; -use super::ConsensusClientResponse; use super::ConsensusRequest; +use super::ConsensusResponse; use super::LazyOption; use super::hash_ring::HashRing; pub mod client_sessions; @@ -327,20 +327,20 @@ impl ClusterActor { pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusRequest) { if !self.replication.is_leader() { - req.callback.send(ConsensusClientResponse::Result { + req.callback.send(ConsensusResponse::Result { res: Err(anyhow::anyhow!("Write given to follower")), log_index: self.replication.last_log_index(), }); return; } - if self.client_sessions.is_processed(&req.session_req) { + if self.client_sessions.is_processed(&req.conn_offset) { // mapping between early returned values to client result let key = req.entry.all_keys().into_iter().map(String::from).collect(); - req.callback.send(ConsensusClientResponse::AlreadyProcessed { + req.callback.send(ConsensusResponse::AlreadyProcessed { key, // TODO : remove unwrap - request_id: req.session_req.unwrap().request_id, + request_id: req.conn_offset.unwrap().offset, }); return; }; @@ -358,14 +358,14 @@ impl ClusterActor { // To notify client's of what keys have been moved. // ! Still, client won't know where the key has been moved. The assumption here is client SHOULD have correct hashring information. let moved_keys = replids.except(&self.log_state().replid).join(" "); - req.callback.send(ConsensusClientResponse::Result { + req.callback.send(ConsensusResponse::Result { res: Err(anyhow::anyhow!("Moved! {moved_keys}")), log_index: self.replication.last_log_index(), }) }, Err(err) => { err!("{}", err); - req.callback.send(ConsensusClientResponse::Result { + req.callback.send(ConsensusResponse::Result { res: Err(anyhow::anyhow!(err)), log_index: self.replication.last_log_index(), }); @@ -377,7 +377,7 @@ impl ClusterActor { let log_index = self.replication.write_single_entry( req.entry, self.log_state().term, - req.session_req.clone(), + req.conn_offset.clone(), ); let repl_cnt = self.replicas().count(); @@ -387,11 +387,11 @@ impl ClusterActor { self.replication.increase_con_idx_by(1); let _ = self.replication.flush(); let res = self.commit_entry(entry.entry, log_index).await; - req.callback.send(ConsensusClientResponse::Result { res, log_index }); + req.callback.send(ConsensusResponse::Result { res, log_index }); return; } self.consensus_tracker - .insert(log_index, LogConsensusVoting::new(req.callback, repl_cnt, req.session_req)); + .insert(log_index, LogConsensusVoting::new(req.callback, repl_cnt, req.conn_offset)); if let Some(send_in_mills) = send_in_mills { tokio::spawn({ @@ -954,12 +954,12 @@ impl ClusterActor { } self.replication.increase_con_idx_by(1); - self.client_sessions.set_response(voting.session_req.take()); + self.client_sessions.set_response(voting.conn_offset.take()); let log_entry = self.replication.read_at(log_index).unwrap(); let res = self.commit_entry(log_entry.entry, log_index).await; let _ = self.replication.flush(); - voting.callback.send(ConsensusClientResponse::Result { res, log_index }); + voting.callback.send(ConsensusResponse::Result { res, log_index }); } async fn commit_entry(&mut self, entry: LogEntry, index: u64) -> anyhow::Result { @@ -1343,7 +1343,7 @@ impl ClusterActor { let req = ConsensusRequest { entry: LogEntry::MSet { entries: migrate_batch.entries.clone() }, callback, - session_req: None, + conn_offset: None, }; self.make_consensus_in_batch(req).await; @@ -1387,7 +1387,7 @@ impl ClusterActor { let req = ConsensusRequest { entry: LogEntry::Delete { keys: pending_migration_batch.keys.clone() }, callback, - session_req: None, + conn_offset: None, }; self.make_consensus_in_batch(req).await; @@ -1428,7 +1428,7 @@ impl ClusterActor { while let Some(req) = pending_reqs.pop_front() { if let Err(err) = handler - .send(ClusterCommand::Client(ClientMessage::LeaderReqConsensus(req))) + .send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(req))) .await { error!("{}", err) diff --git a/duva/src/domains/cluster_actors/actor/client_sessions.rs b/duva/src/domains/cluster_actors/actor/client_sessions.rs index d8ff7625..065048bc 100644 --- a/duva/src/domains/cluster_actors/actor/client_sessions.rs +++ b/duva/src/domains/cluster_actors/actor/client_sessions.rs @@ -1,4 +1,4 @@ -use crate::{make_smart_pointer, presentation::clients::request::ClientReq}; +use crate::{domains::cluster_actors::ConnectionOffset, make_smart_pointer}; use chrono::{DateTime, Utc}; use std::collections::HashMap; @@ -13,22 +13,22 @@ pub(crate) struct Session { } impl ClientSessions { - pub(crate) fn is_processed(&self, req: &Option) -> bool { + pub(crate) fn is_processed(&self, req: &Option) -> bool { if let Some(session_req) = req - && let Some(session) = self.get(&session_req.client_id) + && let Some(session) = self.get(&session_req.conn_id) && let Some(res) = session.processed_req_id.as_ref() { - return *res == session_req.request_id; + return *res == session_req.offset; } false } - pub(crate) fn set_response(&mut self, session_req: Option) { - let Some(session_req) = session_req else { return }; + pub(crate) fn set_response(&mut self, conn_offset: Option) { + let Some(conn_offset) = conn_offset else { return }; let entry = self - .entry(session_req.client_id) + .entry(conn_offset.conn_id) .or_insert(Session { last_accessed: Default::default(), processed_req_id: None }); entry.last_accessed = Utc::now(); - entry.processed_req_id = Some(session_req.request_id); + entry.processed_req_id = Some(conn_offset.offset); } } diff --git a/duva/src/domains/cluster_actors/actor/tests/elections.rs b/duva/src/domains/cluster_actors/actor/tests/elections.rs index 7192c564..e4f76f8f 100644 --- a/duva/src/domains/cluster_actors/actor/tests/elections.rs +++ b/duva/src/domains/cluster_actors/actor/tests/elections.rs @@ -99,7 +99,7 @@ async fn test_vote_election_deny_vote_older_log() { log_index: initial_term + 2, term: initial_term, entry: LogEntry::Set { entry: CacheEntry::new("k".to_string(), "v") }, - session_req: None, + conn_offset: None, }]) .unwrap(); // Follower log: idx 2, term 2 @@ -189,7 +189,7 @@ async fn test_receive_election_vote_candidate_wins_election() { entry: LogEntry::NoOp, log_index: candidate_actor.replication.last_log_index(), term: candidate_actor.replication.last_log_term(), - session_req: None, + conn_offset: None, }], ..Default::default() }; @@ -231,12 +231,12 @@ async fn test_become_candidate_not_allow_write_request_processing() { // GIVEN: A candidate actor let mut candidate_actor = Helper::cluster_actor(ReplicationRole::Follower).await; let (tx, rx) = Callback::create(); - let session_req = ClientReq::new(1, "client1".to_string()); + let session_req = ConnectionOffset::new(1, "client1".to_string()); let consensus_request = ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("key".to_string(), "value") }, callback: tx, - session_req: Some(session_req), + conn_offset: Some(session_req), }; // WHEN: A write request is received candidate_actor.become_candidate(); @@ -248,5 +248,5 @@ async fn test_become_candidate_not_allow_write_request_processing() { assert!(value.is_null()); let res = rx.0.await.unwrap(); - assert!(matches!(res, ConsensusClientResponse::Result { res: Err(_), log_index: _ })) + assert!(matches!(res, ConsensusResponse::Result { res: Err(_), log_index: _ })) } diff --git a/duva/src/domains/cluster_actors/actor/tests/mod.rs b/duva/src/domains/cluster_actors/actor/tests/mod.rs index 6d6f5be7..3e1fb60e 100644 --- a/duva/src/domains/cluster_actors/actor/tests/mod.rs +++ b/duva/src/domains/cluster_actors/actor/tests/mod.rs @@ -15,7 +15,6 @@ use crate::domains::TSerdeDynamicWrite; use crate::domains::caches::actor::CacheCommandSender; use crate::domains::caches::cache_objects::CacheEntry; use crate::domains::caches::command::CacheCommand; -use crate::presentation::clients::request::ClientReq; use crate::domains::peers::command::HeartBeat; @@ -50,7 +49,7 @@ impl FakeReadWrite { } #[async_trait::async_trait] impl TWrite for FakeReadWrite { - async fn write(&mut self, io: QueryIO) -> Result<(), IoError> { + async fn write(&mut self, _io: QueryIO) -> Result<(), IoError> { panic!() } } @@ -76,7 +75,7 @@ impl TSerdeDynamicWrite for FakeReadWrite { Ok(()) } - async fn send_connection_msg(&mut self, arg: &str) -> Result<(), IoError> { + async fn send_connection_msg(&mut self, _arg: &str) -> Result<(), IoError> { Ok(()) } } @@ -139,7 +138,7 @@ impl Helper { log_index: index_num, entry: LogEntry::Set { entry: CacheEntry::new(key, value) }, term, - session_req: None, + conn_offset: None, } } pub(crate) fn session_write( @@ -147,13 +146,13 @@ impl Helper { term: u64, key: &str, value: &str, - session_req: ClientReq, + session_req: ConnectionOffset, ) -> WriteOperation { WriteOperation { log_index: index_num, entry: LogEntry::Set { entry: CacheEntry::new(key, value) }, term, - session_req: Some(session_req), + conn_offset: Some(session_req), } } @@ -238,13 +237,13 @@ impl Helper { } fn consensus_request( - callback: Callback, - session_req: Option, + callback: Callback, + session_req: Option, ) -> ConsensusRequest { ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, callback, - session_req, + conn_offset: session_req, } } } diff --git a/duva/src/domains/cluster_actors/actor/tests/replications.rs b/duva/src/domains/cluster_actors/actor/tests/replications.rs index 996b3671..7b0761d6 100644 --- a/duva/src/domains/cluster_actors/actor/tests/replications.rs +++ b/duva/src/domains/cluster_actors/actor/tests/replications.rs @@ -1,5 +1,3 @@ -use crate::presentation::clients::request::ClientReq; - use super::*; #[test] @@ -156,10 +154,10 @@ async fn replicate_stores_only_latest_session_per_client() { let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Follower).await; let target_client = uuid::Uuid::now_v7().to_string(); - let session1 = ClientReq::new(1, uuid::Uuid::now_v7().to_string()); - let session2 = ClientReq::new(1, target_client.clone()); + let session1 = ConnectionOffset::new(1, uuid::Uuid::now_v7().to_string()); + let session2 = ConnectionOffset::new(1, target_client.clone()); // ! For the same client, hold only one request - let session3 = ClientReq::new(2, target_client); + let session3 = ConnectionOffset::new(2, target_client); let heartbeat = Helper::heartbeat( 0, @@ -432,12 +430,12 @@ async fn req_consensus_inserts_consensus_voting() { let (callback, _) = Callback::create(); let client_id = Uuid::now_v7().to_string(); - let session_request = ClientReq::new(1, client_id); + let session_request = ConnectionOffset::new(1, client_id); let w_req = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }; let consensus_request = ConsensusRequest { entry: w_req.clone(), callback, - session_req: Some(session_request.clone()), + conn_offset: Some(session_request.clone()), }; // WHEN @@ -448,7 +446,7 @@ async fn req_consensus_inserts_consensus_voting() { assert_eq!(leader_c_actor.log_state().last_log_index, 0); // * buffer assert_eq!( - leader_c_actor.consensus_tracker.get(&1).unwrap().session_req.as_ref().unwrap().clone(), //* session_request_is_saved_on_tracker + leader_c_actor.consensus_tracker.get(&1).unwrap().conn_offset.as_ref().unwrap().clone(), //* session_request_is_saved_on_tracker session_request ); } @@ -459,7 +457,7 @@ async fn test_leader_req_consensus_early_return_when_already_processed_session_r let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Leader).await; let client_id = Uuid::now_v7().to_string(); - let client_req = ClientReq::new(1, client_id); + let client_req = ConnectionOffset::new(1, client_id); // WHEN - session request is already processed cluster_actor.client_sessions.set_response(Some(client_req.clone())); @@ -467,10 +465,10 @@ async fn test_leader_req_consensus_early_return_when_already_processed_session_r tokio::spawn(cluster_actor.handle()); let (callback, rx) = Callback::create(); handler - .send(ClusterCommand::Client(ClientMessage::LeaderReqConsensus(ConsensusRequest { + .send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, callback, - session_req: Some(client_req), + conn_offset: Some(client_req), }))) .await .unwrap(); @@ -502,7 +500,7 @@ async fn test_consensus_voting_deleted_when_consensus_reached() { let (client_request_sender, client_wait) = Callback::create(); let client_id = Uuid::now_v7().to_string(); - let client_request = ClientReq::new(3, client_id); + let client_request = ConnectionOffset::new(3, client_id); let consensus_request = Helper::consensus_request(client_request_sender, Some(client_request.clone())); @@ -617,7 +615,7 @@ async fn test_leader_req_consensus_with_processed_session() { let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Leader).await; let client_id = Uuid::now_v7().to_string(); - let session_req = ClientReq::new(1, client_id); + let session_req = ConnectionOffset::new(1, client_id); // Mark the session as already processed cluster_actor.client_sessions.set_response(Some(session_req.clone())); @@ -627,7 +625,7 @@ async fn test_leader_req_consensus_with_processed_session() { let consensus_request = ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("test_key".to_string(), "test_value") }, callback: tx, - session_req: Some(session_req), + conn_offset: Some(session_req), }; cluster_actor.leader_req_consensus(consensus_request).await; @@ -637,7 +635,7 @@ async fn test_leader_req_consensus_with_processed_session() { assert_eq!(cluster_actor.log_state().last_log_index, 0); // Verify the response indicates already processed - let ConsensusClientResponse::AlreadyProcessed { key, request_id: 1 } = rx.recv().await else { + let ConsensusResponse::AlreadyProcessed { key, request_id: 1 } = rx.recv().await else { panic!("Expected AlreadyProcessed response"); }; assert_eq!(key, vec!["test_key".to_string()]); diff --git a/duva/src/domains/cluster_actors/command.rs b/duva/src/domains/cluster_actors/command.rs index 4bd2a7f5..69bbc60b 100644 --- a/duva/src/domains/cluster_actors/command.rs +++ b/duva/src/domains/cluster_actors/command.rs @@ -7,7 +7,6 @@ use crate::domains::peers::peer::Peer; use crate::domains::replications::*; use crate::prelude::PeerIdentifier; -use crate::presentation::clients::request::ClientReq; use crate::types::{Callback, CallbackAwaiter}; use std::str::FromStr; @@ -15,7 +14,7 @@ use std::str::FromStr; pub enum ClusterCommand { ConnectionReq(ConnectionMessage), Scheduler(SchedulerMessage), - Client(ClientMessage), + Client(ClusterClientRequest), Peer(PeerCommand), ShutdownGracefully(Callback<()>), } @@ -53,12 +52,12 @@ impl From for ClusterCommand { } #[derive(Debug, PartialEq, Eq)] -pub enum ClientMessage { +pub enum ClusterClientRequest { GetPeers(Callback>), ReplicationState(Callback), Forget(PeerIdentifier, Callback>), ReplicaOf(PeerIdentifier, Callback>), - LeaderReqConsensus(ConsensusRequest), + MakeConsensus(ConsensusRequest), ClusterNodes(Callback>), GetRoles(Callback>), SubscribeToTopologyChange(Callback>), @@ -69,23 +68,33 @@ pub enum ClientMessage { GetLeaderId(Callback>), } -impl From for ClusterCommand { - fn from(msg: ClientMessage) -> Self { +impl From for ClusterCommand { + fn from(msg: ClusterClientRequest) -> Self { ClusterCommand::Client(msg) } } +#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] +pub struct ConnectionOffset { + pub(crate) offset: u64, + pub(crate) conn_id: String, +} +impl ConnectionOffset { + pub(crate) fn new(offset: u64, conn_id: String) -> Self { + Self { offset, conn_id } + } +} + #[derive(Debug, PartialEq, Eq)] pub(crate) struct ConsensusRequest { pub(crate) entry: LogEntry, - pub(crate) callback: Callback, - pub(crate) session_req: Option, + pub(crate) callback: Callback, + pub(crate) conn_offset: Option, } #[derive(Debug)] -pub(crate) enum ConsensusClientResponse { +pub(crate) enum ConsensusResponse { AlreadyProcessed { key: Vec, request_id: u64 }, - Result { res: anyhow::Result, log_index: u64 }, } diff --git a/duva/src/domains/cluster_actors/queue.rs b/duva/src/domains/cluster_actors/queue.rs index 3aa0579f..4b79a7fc 100644 --- a/duva/src/domains/cluster_actors/queue.rs +++ b/duva/src/domains/cluster_actors/queue.rs @@ -1,6 +1,6 @@ use crate::{ domains::{ - cluster_actors::{ClientMessage, ClusterCommand, ConnectionMessage, LazyOption}, + cluster_actors::{ClusterClientRequest, ClusterCommand, ConnectionMessage, LazyOption}, replications::state::ReplicationState, }, prelude::{PeerIdentifier, Topology}, @@ -52,19 +52,19 @@ impl ClusterActorSender { pub(crate) async fn wait_for_acceptance(&self) { let (tx, rx) = Callback::create(); - let _ = self.send(ClientMessage::CanEnter(tx)).await; + let _ = self.send(ClusterClientRequest::CanEnter(tx)).await; rx.wait().await; } pub(crate) async fn route_get_peers(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetPeers(tx)).await?; + self.send(ClusterClientRequest::GetPeers(tx)).await?; let peers = rx.recv().await; Ok(peers) } pub(crate) async fn route_get_topology(&self) -> anyhow::Result { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetTopology(tx)).await?; + self.send(ClusterClientRequest::GetTopology(tx)).await?; let peers = rx.recv().await; Ok(peers) } @@ -81,13 +81,13 @@ impl ClusterActorSender { pub(crate) async fn route_get_node_state(&self) -> anyhow::Result { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ReplicationState(tx)).await?; + self.send(ClusterClientRequest::ReplicationState(tx)).await?; Ok(rx.recv().await) } pub(crate) async fn route_get_leader_id(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetLeaderId(tx)).await?; + self.send(ClusterClientRequest::GetLeaderId(tx)).await?; Ok(rx.recv().await) } @@ -113,7 +113,7 @@ impl ClusterActorSender { peer_identifier: PeerIdentifier, ) -> anyhow::Result { let (tx, rx) = Callback::create(); - self.send(ClientMessage::Forget(peer_identifier, tx)).await?; + self.send(ClusterClientRequest::Forget(peer_identifier, tx)).await?; let Some(_) = rx.recv().await else { return Ok(false) }; Ok(true) } @@ -123,7 +123,7 @@ impl ClusterActorSender { peer_identifier: PeerIdentifier, ) -> anyhow::Result<()> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ReplicaOf(peer_identifier, tx)).await?; + self.send(ClusterClientRequest::ReplicaOf(peer_identifier, tx)).await?; rx.recv().await } @@ -133,24 +133,24 @@ impl ClusterActorSender { lazy_option: LazyOption, ) -> anyhow::Result<()> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ClusterMeet(peer_identifier, lazy_option, tx)).await?; + self.send(ClusterClientRequest::ClusterMeet(peer_identifier, lazy_option, tx)).await?; rx.recv().await } pub(crate) async fn route_cluster_reshard(&self) -> anyhow::Result<()> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ClusterReshard(tx)).await?; + self.send(ClusterClientRequest::ClusterReshard(tx)).await?; rx.recv().await } pub(crate) async fn route_cluster_nodes(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ClusterNodes(tx)).await?; + self.send(ClusterClientRequest::ClusterNodes(tx)).await?; Ok(rx.recv().await) } pub(crate) async fn route_get_roles(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetRoles(tx)).await?; + self.send(ClusterClientRequest::GetRoles(tx)).await?; Ok(rx.recv().await.into_iter().map(|(id, role)| format!("{}:{}", id.0, role)).collect()) } @@ -158,7 +158,7 @@ impl ClusterActorSender { &self, ) -> tokio::sync::broadcast::Receiver { let (tx, rx) = Callback::create(); - let _ = self.send(ClientMessage::SubscribeToTopologyChange(tx)).await; + let _ = self.send(ClusterClientRequest::SubscribeToTopologyChange(tx)).await; rx.recv().await } } diff --git a/duva/src/domains/cluster_actors/service.rs b/duva/src/domains/cluster_actors/service.rs index 0535bcef..5b961a39 100644 --- a/duva/src/domains/cluster_actors/service.rs +++ b/duva/src/domains/cluster_actors/service.rs @@ -1,5 +1,5 @@ -use crate::domains::cluster_actors::ClientMessage; use crate::domains::cluster_actors::ClusterActor; +use crate::domains::cluster_actors::ClusterClientRequest; use crate::domains::cluster_actors::ClusterCommand; use crate::domains::cluster_actors::ConnectionMessage; use crate::domains::cluster_actors::SchedulerMessage; @@ -73,8 +73,8 @@ impl ClusterActor { } #[instrument(level = tracing::Level::DEBUG, skip(self, client_message))] - async fn process_client_message(&mut self, client_message: ClientMessage) { - use ClientMessage::*; + async fn process_client_message(&mut self, client_message: ClusterClientRequest) { + use ClusterClientRequest::*; match client_message { CanEnter(callback) => { @@ -96,7 +96,7 @@ impl ClusterActor { callback.send(None); } }, - LeaderReqConsensus(req) => { + MakeConsensus(req) => { self.leader_req_consensus(req).await; }, ReplicaOf(peer_addr, callback) => { diff --git a/duva/src/domains/replications/consensus/log.rs b/duva/src/domains/replications/consensus/log.rs index bb08e90b..7bfe086f 100644 --- a/duva/src/domains/replications/consensus/log.rs +++ b/duva/src/domains/replications/consensus/log.rs @@ -1,7 +1,9 @@ use crate::{ - domains::{cluster_actors::ConsensusClientResponse, peers::identifier::PeerIdentifier}, + domains::{ + cluster_actors::{ConnectionOffset, ConsensusResponse}, + peers::identifier::PeerIdentifier, + }, make_smart_pointer, - presentation::clients::request::ClientReq, types::Callback, }; use std::collections::HashMap; @@ -14,17 +16,17 @@ make_smart_pointer!(LogConsensusTracker, HashMap); #[derive(Debug)] pub struct LogConsensusVoting { pub(crate) voters: Vec, - pub(crate) callback: Callback, + pub(crate) callback: Callback, pub(crate) cnt: u8, - pub(crate) session_req: Option, + pub(crate) conn_offset: Option, } impl LogConsensusVoting { pub(crate) fn new( - callback: Callback, + callback: Callback, replica_count: usize, - session_req: Option, + conn_offset: Option, ) -> Self { - Self { callback, cnt: 1, voters: Vec::with_capacity(replica_count), session_req } + Self { callback, cnt: 1, voters: Vec::with_capacity(replica_count), conn_offset } } pub(crate) fn increase_vote(&mut self, voter: PeerIdentifier) { diff --git a/duva/src/domains/replications/operation.rs b/duva/src/domains/replications/operation.rs index 64df4a10..3e799fce 100644 --- a/duva/src/domains/replications/operation.rs +++ b/duva/src/domains/replications/operation.rs @@ -1,6 +1,5 @@ -use crate::{ - domains::{caches::cache_objects::CacheEntry, query_io::SERDE_CONFIG}, - presentation::clients::request::ClientReq, +use crate::domains::{ + caches::cache_objects::CacheEntry, cluster_actors::ConnectionOffset, query_io::SERDE_CONFIG, }; use bytes::Bytes; @@ -9,7 +8,7 @@ pub struct WriteOperation { pub(crate) entry: LogEntry, pub(crate) log_index: u64, pub(crate) term: u64, - pub(crate) session_req: Option, + pub(crate) conn_offset: Option, } /// Operations that appear in the Append-Only File (WAL). diff --git a/duva/src/domains/replications/replication.rs b/duva/src/domains/replications/replication.rs index 26e741eb..aeef6b22 100644 --- a/duva/src/domains/replications/replication.rs +++ b/duva/src/domains/replications/replication.rs @@ -1,5 +1,6 @@ use super::*; +use crate::domains::cluster_actors::ConnectionOffset; use crate::domains::peers::command::HeartBeat; use crate::domains::peers::command::RejectionReason; use crate::domains::peers::command::ReplicationAck; @@ -7,7 +8,7 @@ use crate::domains::peers::command::ReplicationAck; use crate::domains::peers::command::RequestVote; use crate::domains::peers::identifier::PeerIdentifier; use crate::err; -use crate::presentation::clients::request::ClientReq; + use std::fmt::Display; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -175,13 +176,13 @@ impl Replication { &mut self, entry: LogEntry, current_term: u64, - session_req: Option, + conn_offset: Option, ) -> u64 { let op = WriteOperation { entry, log_index: self.last_log_index() + 1, term: current_term, - session_req, + conn_offset, }; self.in_mem_buffer.push(op); @@ -294,15 +295,15 @@ impl Replication { operations: Vec, prev_log_index: u64, prev_log_term: u64, - session_reqs: &mut Vec, + conn_offsets: &mut Vec, ) -> Result { let mut entries = Vec::with_capacity(operations.len()); let last_log_index = self.state.last_log_index; for mut log in operations { if log.log_index > last_log_index { - if let Some(session_req) = log.session_req.take() { - session_reqs.push(session_req); + if let Some(session_req) = log.conn_offset.take() { + conn_offsets.push(session_req); } entries.push(log); } diff --git a/duva/src/lib.rs b/duva/src/lib.rs index bed8d815..fd72780c 100644 --- a/duva/src/lib.rs +++ b/duva/src/lib.rs @@ -255,7 +255,7 @@ impl StartUpFacade { cache_manager: self.cache_manager.clone(), }; tokio::spawn( - ClientStreamReader { client_id, r: read_half } + ClientStreamReader { conn_id: client_id, r: read_half } .handle_client_stream(client_controller, writer.run(observer)), ); }, diff --git a/duva/src/presentation/clients/controller.rs b/duva/src/presentation/clients/controller.rs index 58cb2f9e..0767406d 100644 --- a/duva/src/presentation/clients/controller.rs +++ b/duva/src/presentation/clients/controller.rs @@ -3,11 +3,13 @@ use crate::domains::QueryIO; use crate::domains::caches::cache_manager::CacheManager; use crate::domains::caches::cache_objects::{CacheEntry, CacheValue, TypedValue}; use crate::domains::cluster_actors::queue::ClusterActorSender; -use crate::domains::cluster_actors::{ClientMessage, ConsensusClientResponse, ConsensusRequest}; +use crate::domains::cluster_actors::{ + ClusterClientRequest, ConnectionOffset, ConsensusRequest, ConsensusResponse, +}; use crate::domains::replications::LogEntry; use crate::domains::saves::actor::SaveTarget; use crate::prelude::PeerIdentifier; -use crate::presentation::clients::request::{ClientReq, NonMutatingAction, ServerResponse}; +use crate::presentation::clients::request::{NonMutatingAction, ServerResponse}; use crate::types::{BinBytes, Callback}; use tracing::info; @@ -133,31 +135,31 @@ impl ClientController { }; info!("{response:?}"); - Ok(ServerResponse::ReadRes { res: response, request_id }) + Ok(ServerResponse::ReadRes { res: response, conn_offset: request_id }) } pub(crate) async fn handle_mutating( &self, - session_req: ClientReq, + conn_offset: u64, + conn_id: String, entry: LogEntry, ) -> anyhow::Result { // * Consensus / Persisting logs let (callback, res) = Callback::create(); - let request_id = session_req.request_id; self.cluster_actor_sender - .send(ClientMessage::LeaderReqConsensus(ConsensusRequest { + .send(ClusterClientRequest::MakeConsensus(ConsensusRequest { entry, callback, - session_req: Some(session_req), + conn_offset: Some(ConnectionOffset::new(conn_offset, conn_id)), })) .await?; let result = match res.recv().await { - ConsensusClientResponse::Result { res, log_index } => { - ServerResponse::WriteRes { res: res?, log_index, request_id } + ConsensusResponse::Result { res, log_index } => { + ServerResponse::WriteRes { res: res?, log_index, conn_offset } }, - ConsensusClientResponse::AlreadyProcessed { key: keys, request_id } => { + ConsensusResponse::AlreadyProcessed { key: keys, request_id } => { // * Conversion! request has already been processed so we need to convert it to get let action = NonMutatingAction::MGet { keys }; self.handle_non_mutating(action, request_id).await? diff --git a/duva/src/presentation/clients/request.rs b/duva/src/presentation/clients/request.rs index 6fe4b593..6ca02df4 100644 --- a/duva/src/presentation/clients/request.rs +++ b/duva/src/presentation/clients/request.rs @@ -14,21 +14,10 @@ use std::str::FromStr; #[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] pub struct SessionRequest { - pub request_id: u64, + pub conn_offset: u64, pub action: ClientAction, } -#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] -pub struct ClientReq { - pub(crate) request_id: u64, - pub(crate) client_id: String, -} -impl ClientReq { - pub(crate) fn new(request_id: u64, client_id: String) -> Self { - Self { request_id, client_id } - } -} - #[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)] pub enum ClientAction { NonMutating(NonMutatingAction), @@ -313,23 +302,24 @@ pub fn extract_expiry(expiry: &str) -> anyhow::Result { #[derive(Clone, Debug)] pub struct ClientRequest { pub(crate) action: ClientAction, - pub(crate) session_req: ClientReq, + pub(crate) conn_offset: u64, + pub(crate) conn_id: String, } #[derive(Clone, Debug, bincode::Decode, bincode::Encode)] pub enum ServerResponse { - WriteRes { res: QueryIO, log_index: u64, request_id: u64 }, - ReadRes { res: QueryIO, request_id: u64 }, + WriteRes { res: QueryIO, log_index: u64, conn_offset: u64 }, + ReadRes { res: QueryIO, conn_offset: u64 }, TopologyChange(Topology), - Err { reason: String, request_id: u64 }, + Err { reason: String, conn_offset: u64 }, } impl ServerResponse { pub fn request_id(&self) -> Option { match self { - ServerResponse::WriteRes { request_id, .. } - | ServerResponse::ReadRes { request_id, .. } - | ServerResponse::Err { request_id, .. } => Some(*request_id), + ServerResponse::WriteRes { conn_offset, .. } + | ServerResponse::ReadRes { conn_offset, .. } + | ServerResponse::Err { conn_offset, .. } => Some(*conn_offset), ServerResponse::TopologyChange(..) => None, } diff --git a/duva/src/presentation/clients/stream.rs b/duva/src/presentation/clients/stream.rs index 32736509..1ef46040 100644 --- a/duva/src/presentation/clients/stream.rs +++ b/duva/src/presentation/clients/stream.rs @@ -9,9 +9,7 @@ use crate::make_smart_pointer; use crate::prelude::ConnectionRequest; use crate::prelude::ConnectionResponse; use crate::prelude::ConnectionResponses; -use crate::presentation::clients::request::{ - ClientAction, ClientReq, ServerResponse, SessionRequest, -}; +use crate::presentation::clients::request::{ClientAction, ServerResponse, SessionRequest}; use tokio::{ net::tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -21,11 +19,11 @@ use tracing::{error, info, instrument}; pub struct ClientStreamReader { pub(crate) r: OwnedReadHalf, - pub(crate) client_id: String, + pub(crate) conn_id: String, } impl ClientStreamReader { - #[instrument(level = tracing::Level::DEBUG, skip(self, handler, stream_writer_sender),fields(client_id= %self.client_id))] + #[instrument(level = tracing::Level::DEBUG, skip(self, handler, stream_writer_sender),fields(client_id= %self.conn_id))] pub(crate) async fn handle_client_stream( mut self, handler: ClientController, @@ -40,7 +38,7 @@ impl ClientStreamReader { return; } let _ = stream_writer_sender - .send(ServerResponse::Err { reason: err.to_string(), request_id: 0 }) + .send(ServerResponse::Err { reason: err.to_string(), conn_offset: 0 }) .await; continue; } @@ -49,7 +47,8 @@ impl ClientStreamReader { let requests = query_ios.unwrap().into_iter().map(|query_io| { Ok(ClientRequest { action: query_io.action, - session_req: ClientReq::new(query_io.request_id, self.client_id.clone()), + conn_offset: query_io.conn_offset, + conn_id: self.conn_id.clone(), }) }); @@ -57,25 +56,24 @@ impl ClientStreamReader { match req { Err(err) => { let _ = stream_writer_sender - .send(ServerResponse::Err { reason: err, request_id: 0 }) + .send(ServerResponse::Err { reason: err, conn_offset: 0 }) .await; break; }, - Ok(ClientRequest { action, session_req }) => { - let request_id = session_req.request_id; + Ok(ClientRequest { action, conn_offset, conn_id }) => { // * processing part let result = match action { ClientAction::NonMutating(non_mutating_action) => { - handler.handle_non_mutating(non_mutating_action, request_id).await + handler.handle_non_mutating(non_mutating_action, conn_offset).await }, ClientAction::Mutating(log_entry) => { - handler.handle_mutating(session_req, log_entry).await + handler.handle_mutating(conn_offset, conn_id, log_entry).await }, }; let response = result.unwrap_or_else(|e| { error!("failure on state change / query {e}"); - ServerResponse::Err { reason: e.to_string(), request_id } + ServerResponse::Err { reason: e.to_string(), conn_offset } }); if stream_writer_sender.send(response).await.is_err() { return;