From b8732c3507ee1d963670f5f52dc2a05d31f3ab06 Mon Sep 17 00:00:00 2001 From: Migo Date: Thu, 13 Nov 2025 20:31:14 +0400 Subject: [PATCH 1/3] connection offset --- duva/src/adapters/op_logs/disk_based.rs | 16 ++++----- duva/src/domains/cluster_actors/actor.rs | 28 ++++++++-------- .../cluster_actors/actor/client_sessions.rs | 16 ++++----- .../cluster_actors/actor/tests/elections.rs | 10 +++--- .../domains/cluster_actors/actor/tests/mod.rs | 19 +++++------ .../actor/tests/replications.rs | 33 ++++++++----------- duva/src/domains/cluster_actors/command.rs | 27 ++++++++++----- duva/src/domains/cluster_actors/queue.rs | 26 +++++++-------- duva/src/domains/cluster_actors/service.rs | 8 ++--- duva/src/domains/peers/command.rs | 8 ++--- .../src/domains/replications/consensus/log.rs | 12 ++++--- duva/src/domains/replications/operation.rs | 7 ++-- duva/src/domains/replications/replication.rs | 13 ++++---- duva/src/presentation/clients/controller.rs | 14 ++++---- duva/src/presentation/clients/request.rs | 15 ++------- duva/src/presentation/clients/stream.rs | 9 +++-- 16 files changed, 128 insertions(+), 133 deletions(-) diff --git a/duva/src/adapters/op_logs/disk_based.rs b/duva/src/adapters/op_logs/disk_based.rs index 6d398dc9..157b62c1 100644 --- a/duva/src/adapters/op_logs/disk_based.rs +++ b/duva/src/adapters/op_logs/disk_based.rs @@ -547,7 +547,7 @@ mod tests { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, log_index: index, term, - session_req: None, + conn_offset: None, } } @@ -576,7 +576,7 @@ mod tests { let request = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }; let write_op = - WriteOperation { entry: request.clone(), log_index: 0, term: 0, session_req: None }; + WriteOperation { entry: request.clone(), log_index: 0, term: 0, conn_offset: None }; // WHEN op_logs.write_many(vec![write_op]).unwrap(); @@ -743,7 +743,7 @@ mod tests { }, log_index: i as u64, term: 1, - session_req: None, + conn_offset: None, }])?; } // Force rotation @@ -753,7 +753,7 @@ mod tests { entry: LogEntry::Set { entry: CacheEntry::new("new".to_string(), "value") }, log_index: 100, term: 1, - session_req: None, + conn_offset: None, }])?; // WHEN @@ -826,7 +826,7 @@ mod tests { }, log_index: start_index + i as u64, term, - session_req: None, + conn_offset: None, }) .collect() } @@ -1065,7 +1065,7 @@ mod tests { }, log_index: i as u64, term: 1, - session_req: None, + conn_offset: None, }) .collect(), )?; @@ -1084,7 +1084,7 @@ mod tests { entry: LogEntry::Set { entry: CacheEntry::new("new".to_string(), "value") }, log_index: 100, term: 1, - session_req: None, + conn_offset: None, }])?; // Verify index data in active segment @@ -1110,7 +1110,7 @@ mod tests { }, log_index: i as u64, term: 1, - session_req: None, + conn_offset: None, }) .collect(), )?; diff --git a/duva/src/domains/cluster_actors/actor.rs b/duva/src/domains/cluster_actors/actor.rs index 3ae867b4..4b15800c 100644 --- a/duva/src/domains/cluster_actors/actor.rs +++ b/duva/src/domains/cluster_actors/actor.rs @@ -1,6 +1,6 @@ use super::ClusterCommand; use super::ConsensusClientResponse; -use super::ConsensusRequest; +use super::ConsensusReq; use super::LazyOption; use super::hash_ring::HashRing; pub mod client_sessions; @@ -325,7 +325,7 @@ impl ClusterActor { } } - pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusRequest) { + pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusReq) { if !self.replication.is_leader() { req.callback.send(ConsensusClientResponse::Result { res: Err(anyhow::anyhow!("Write given to follower")), @@ -334,13 +334,13 @@ impl ClusterActor { return; } - if self.client_sessions.is_processed(&req.session_req) { + if self.client_sessions.is_processed(&req.conn_offset) { // mapping between early returned values to client result let key = req.entry.all_keys().into_iter().map(String::from).collect(); req.callback.send(ConsensusClientResponse::AlreadyProcessed { key, // TODO : remove unwrap - request_id: req.session_req.unwrap().request_id, + request_id: req.conn_offset.unwrap().offset, }); return; }; @@ -373,11 +373,11 @@ impl ClusterActor { } } - async fn req_consensus(&mut self, req: ConsensusRequest, send_in_mills: Option) { + async fn req_consensus(&mut self, req: ConsensusReq, send_in_mills: Option) { let log_index = self.replication.write_single_entry( req.entry, self.log_state().term, - req.session_req.clone(), + req.conn_offset.clone(), ); let repl_cnt = self.replicas().count(); @@ -391,7 +391,7 @@ impl ClusterActor { return; } self.consensus_tracker - .insert(log_index, LogConsensusVoting::new(req.callback, repl_cnt, req.session_req)); + .insert(log_index, LogConsensusVoting::new(req.callback, repl_cnt, req.conn_offset)); if let Some(send_in_mills) = send_in_mills { tokio::spawn({ @@ -954,7 +954,7 @@ impl ClusterActor { } self.replication.increase_con_idx_by(1); - self.client_sessions.set_response(voting.session_req.take()); + self.client_sessions.set_response(voting.conn_offset.take()); let log_entry = self.replication.read_at(log_index).unwrap(); let res = self.commit_entry(log_entry.entry, log_index).await; let _ = self.replication.flush(); @@ -1340,10 +1340,10 @@ impl ClusterActor { } let (callback, rx) = Callback::create(); - let req = ConsensusRequest { + let req = ConsensusReq { entry: LogEntry::MSet { entries: migrate_batch.entries.clone() }, callback, - session_req: None, + conn_offset: None, }; self.make_consensus_in_batch(req).await; @@ -1365,7 +1365,7 @@ impl ClusterActor { }); } - async fn make_consensus_in_batch(&mut self, req: ConsensusRequest) { + async fn make_consensus_in_batch(&mut self, req: ConsensusReq) { self.req_consensus(req, None).await; let _ = self.replication.flush(); self.send_rpc().await; @@ -1384,10 +1384,10 @@ impl ClusterActor { // make consensus request for delete let (callback, rx) = Callback::create(); - let req = ConsensusRequest { + let req = ConsensusReq { entry: LogEntry::Delete { keys: pending_migration_batch.keys.clone() }, callback, - session_req: None, + conn_offset: None, }; self.make_consensus_in_batch(req).await; @@ -1428,7 +1428,7 @@ impl ClusterActor { while let Some(req) = pending_reqs.pop_front() { if let Err(err) = handler - .send(ClusterCommand::Client(ClientMessage::LeaderReqConsensus(req))) + .send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(req))) .await { error!("{}", err) diff --git a/duva/src/domains/cluster_actors/actor/client_sessions.rs b/duva/src/domains/cluster_actors/actor/client_sessions.rs index d8ff7625..065048bc 100644 --- a/duva/src/domains/cluster_actors/actor/client_sessions.rs +++ b/duva/src/domains/cluster_actors/actor/client_sessions.rs @@ -1,4 +1,4 @@ -use crate::{make_smart_pointer, presentation::clients::request::ClientReq}; +use crate::{domains::cluster_actors::ConnectionOffset, make_smart_pointer}; use chrono::{DateTime, Utc}; use std::collections::HashMap; @@ -13,22 +13,22 @@ pub(crate) struct Session { } impl ClientSessions { - pub(crate) fn is_processed(&self, req: &Option) -> bool { + pub(crate) fn is_processed(&self, req: &Option) -> bool { if let Some(session_req) = req - && let Some(session) = self.get(&session_req.client_id) + && let Some(session) = self.get(&session_req.conn_id) && let Some(res) = session.processed_req_id.as_ref() { - return *res == session_req.request_id; + return *res == session_req.offset; } false } - pub(crate) fn set_response(&mut self, session_req: Option) { - let Some(session_req) = session_req else { return }; + pub(crate) fn set_response(&mut self, conn_offset: Option) { + let Some(conn_offset) = conn_offset else { return }; let entry = self - .entry(session_req.client_id) + .entry(conn_offset.conn_id) .or_insert(Session { last_accessed: Default::default(), processed_req_id: None }); entry.last_accessed = Utc::now(); - entry.processed_req_id = Some(session_req.request_id); + entry.processed_req_id = Some(conn_offset.offset); } } diff --git a/duva/src/domains/cluster_actors/actor/tests/elections.rs b/duva/src/domains/cluster_actors/actor/tests/elections.rs index 7192c564..089ee443 100644 --- a/duva/src/domains/cluster_actors/actor/tests/elections.rs +++ b/duva/src/domains/cluster_actors/actor/tests/elections.rs @@ -99,7 +99,7 @@ async fn test_vote_election_deny_vote_older_log() { log_index: initial_term + 2, term: initial_term, entry: LogEntry::Set { entry: CacheEntry::new("k".to_string(), "v") }, - session_req: None, + conn_offset: None, }]) .unwrap(); // Follower log: idx 2, term 2 @@ -189,7 +189,7 @@ async fn test_receive_election_vote_candidate_wins_election() { entry: LogEntry::NoOp, log_index: candidate_actor.replication.last_log_index(), term: candidate_actor.replication.last_log_term(), - session_req: None, + conn_offset: None, }], ..Default::default() }; @@ -231,12 +231,12 @@ async fn test_become_candidate_not_allow_write_request_processing() { // GIVEN: A candidate actor let mut candidate_actor = Helper::cluster_actor(ReplicationRole::Follower).await; let (tx, rx) = Callback::create(); - let session_req = ClientReq::new(1, "client1".to_string()); + let session_req = ConnectionOffset::new(1, "client1".to_string()); - let consensus_request = ConsensusRequest { + let consensus_request = ConsensusReq { entry: LogEntry::Set { entry: CacheEntry::new("key".to_string(), "value") }, callback: tx, - session_req: Some(session_req), + conn_offset: Some(session_req), }; // WHEN: A write request is received candidate_actor.become_candidate(); diff --git a/duva/src/domains/cluster_actors/actor/tests/mod.rs b/duva/src/domains/cluster_actors/actor/tests/mod.rs index 6d6f5be7..20506643 100644 --- a/duva/src/domains/cluster_actors/actor/tests/mod.rs +++ b/duva/src/domains/cluster_actors/actor/tests/mod.rs @@ -15,7 +15,6 @@ use crate::domains::TSerdeDynamicWrite; use crate::domains::caches::actor::CacheCommandSender; use crate::domains::caches::cache_objects::CacheEntry; use crate::domains::caches::command::CacheCommand; -use crate::presentation::clients::request::ClientReq; use crate::domains::peers::command::HeartBeat; @@ -50,7 +49,7 @@ impl FakeReadWrite { } #[async_trait::async_trait] impl TWrite for FakeReadWrite { - async fn write(&mut self, io: QueryIO) -> Result<(), IoError> { + async fn write(&mut self, _io: QueryIO) -> Result<(), IoError> { panic!() } } @@ -76,7 +75,7 @@ impl TSerdeDynamicWrite for FakeReadWrite { Ok(()) } - async fn send_connection_msg(&mut self, arg: &str) -> Result<(), IoError> { + async fn send_connection_msg(&mut self, _arg: &str) -> Result<(), IoError> { Ok(()) } } @@ -139,7 +138,7 @@ impl Helper { log_index: index_num, entry: LogEntry::Set { entry: CacheEntry::new(key, value) }, term, - session_req: None, + conn_offset: None, } } pub(crate) fn session_write( @@ -147,13 +146,13 @@ impl Helper { term: u64, key: &str, value: &str, - session_req: ClientReq, + session_req: ConnectionOffset, ) -> WriteOperation { WriteOperation { log_index: index_num, entry: LogEntry::Set { entry: CacheEntry::new(key, value) }, term, - session_req: Some(session_req), + conn_offset: Some(session_req), } } @@ -239,12 +238,12 @@ impl Helper { fn consensus_request( callback: Callback, - session_req: Option, - ) -> ConsensusRequest { - ConsensusRequest { + session_req: Option, + ) -> ConsensusReq { + ConsensusReq { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, callback, - session_req, + conn_offset: session_req, } } } diff --git a/duva/src/domains/cluster_actors/actor/tests/replications.rs b/duva/src/domains/cluster_actors/actor/tests/replications.rs index 996b3671..0137c7dc 100644 --- a/duva/src/domains/cluster_actors/actor/tests/replications.rs +++ b/duva/src/domains/cluster_actors/actor/tests/replications.rs @@ -1,5 +1,3 @@ -use crate::presentation::clients::request::ClientReq; - use super::*; #[test] @@ -156,10 +154,10 @@ async fn replicate_stores_only_latest_session_per_client() { let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Follower).await; let target_client = uuid::Uuid::now_v7().to_string(); - let session1 = ClientReq::new(1, uuid::Uuid::now_v7().to_string()); - let session2 = ClientReq::new(1, target_client.clone()); + let session1 = ConnectionOffset::new(1, uuid::Uuid::now_v7().to_string()); + let session2 = ConnectionOffset::new(1, target_client.clone()); // ! For the same client, hold only one request - let session3 = ClientReq::new(2, target_client); + let session3 = ConnectionOffset::new(2, target_client); let heartbeat = Helper::heartbeat( 0, @@ -432,13 +430,10 @@ async fn req_consensus_inserts_consensus_voting() { let (callback, _) = Callback::create(); let client_id = Uuid::now_v7().to_string(); - let session_request = ClientReq::new(1, client_id); + let session_request = ConnectionOffset::new(1, client_id); let w_req = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }; - let consensus_request = ConsensusRequest { - entry: w_req.clone(), - callback, - session_req: Some(session_request.clone()), - }; + let consensus_request = + ConsensusReq { entry: w_req.clone(), callback, conn_offset: Some(session_request.clone()) }; // WHEN leader_c_actor.req_consensus(consensus_request, None).await; @@ -448,7 +443,7 @@ async fn req_consensus_inserts_consensus_voting() { assert_eq!(leader_c_actor.log_state().last_log_index, 0); // * buffer assert_eq!( - leader_c_actor.consensus_tracker.get(&1).unwrap().session_req.as_ref().unwrap().clone(), //* session_request_is_saved_on_tracker + leader_c_actor.consensus_tracker.get(&1).unwrap().conn_offset.as_ref().unwrap().clone(), //* session_request_is_saved_on_tracker session_request ); } @@ -459,7 +454,7 @@ async fn test_leader_req_consensus_early_return_when_already_processed_session_r let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Leader).await; let client_id = Uuid::now_v7().to_string(); - let client_req = ClientReq::new(1, client_id); + let client_req = ConnectionOffset::new(1, client_id); // WHEN - session request is already processed cluster_actor.client_sessions.set_response(Some(client_req.clone())); @@ -467,10 +462,10 @@ async fn test_leader_req_consensus_early_return_when_already_processed_session_r tokio::spawn(cluster_actor.handle()); let (callback, rx) = Callback::create(); handler - .send(ClusterCommand::Client(ClientMessage::LeaderReqConsensus(ConsensusRequest { + .send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(ConsensusReq { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, callback, - session_req: Some(client_req), + conn_offset: Some(client_req), }))) .await .unwrap(); @@ -502,7 +497,7 @@ async fn test_consensus_voting_deleted_when_consensus_reached() { let (client_request_sender, client_wait) = Callback::create(); let client_id = Uuid::now_v7().to_string(); - let client_request = ClientReq::new(3, client_id); + let client_request = ConnectionOffset::new(3, client_id); let consensus_request = Helper::consensus_request(client_request_sender, Some(client_request.clone())); @@ -617,17 +612,17 @@ async fn test_leader_req_consensus_with_processed_session() { let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Leader).await; let client_id = Uuid::now_v7().to_string(); - let session_req = ClientReq::new(1, client_id); + let session_req = ConnectionOffset::new(1, client_id); // Mark the session as already processed cluster_actor.client_sessions.set_response(Some(session_req.clone())); // WHEN - send request with already processed session let (tx, rx) = Callback::create(); - let consensus_request = ConsensusRequest { + let consensus_request = ConsensusReq { entry: LogEntry::Set { entry: CacheEntry::new("test_key".to_string(), "test_value") }, callback: tx, - session_req: Some(session_req), + conn_offset: Some(session_req), }; cluster_actor.leader_req_consensus(consensus_request).await; diff --git a/duva/src/domains/cluster_actors/command.rs b/duva/src/domains/cluster_actors/command.rs index 4bd2a7f5..de42985a 100644 --- a/duva/src/domains/cluster_actors/command.rs +++ b/duva/src/domains/cluster_actors/command.rs @@ -7,7 +7,6 @@ use crate::domains::peers::peer::Peer; use crate::domains::replications::*; use crate::prelude::PeerIdentifier; -use crate::presentation::clients::request::ClientReq; use crate::types::{Callback, CallbackAwaiter}; use std::str::FromStr; @@ -15,7 +14,7 @@ use std::str::FromStr; pub enum ClusterCommand { ConnectionReq(ConnectionMessage), Scheduler(SchedulerMessage), - Client(ClientMessage), + Client(ClusterClientRequest), Peer(PeerCommand), ShutdownGracefully(Callback<()>), } @@ -53,12 +52,12 @@ impl From for ClusterCommand { } #[derive(Debug, PartialEq, Eq)] -pub enum ClientMessage { +pub enum ClusterClientRequest { GetPeers(Callback>), ReplicationState(Callback), Forget(PeerIdentifier, Callback>), ReplicaOf(PeerIdentifier, Callback>), - LeaderReqConsensus(ConsensusRequest), + MakeConsensus(ConsensusReq), ClusterNodes(Callback>), GetRoles(Callback>), SubscribeToTopologyChange(Callback>), @@ -69,23 +68,33 @@ pub enum ClientMessage { GetLeaderId(Callback>), } -impl From for ClusterCommand { - fn from(msg: ClientMessage) -> Self { +impl From for ClusterCommand { + fn from(msg: ClusterClientRequest) -> Self { ClusterCommand::Client(msg) } } #[derive(Debug, PartialEq, Eq)] -pub(crate) struct ConsensusRequest { +pub(crate) struct ConsensusReq { pub(crate) entry: LogEntry, pub(crate) callback: Callback, - pub(crate) session_req: Option, + pub(crate) conn_offset: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] +pub struct ConnectionOffset { + pub(crate) offset: u64, + pub(crate) conn_id: String, +} +impl ConnectionOffset { + pub(crate) fn new(offset: u64, conn_id: String) -> Self { + Self { offset, conn_id } + } } #[derive(Debug)] pub(crate) enum ConsensusClientResponse { AlreadyProcessed { key: Vec, request_id: u64 }, - Result { res: anyhow::Result, log_index: u64 }, } diff --git a/duva/src/domains/cluster_actors/queue.rs b/duva/src/domains/cluster_actors/queue.rs index 3aa0579f..4b79a7fc 100644 --- a/duva/src/domains/cluster_actors/queue.rs +++ b/duva/src/domains/cluster_actors/queue.rs @@ -1,6 +1,6 @@ use crate::{ domains::{ - cluster_actors::{ClientMessage, ClusterCommand, ConnectionMessage, LazyOption}, + cluster_actors::{ClusterClientRequest, ClusterCommand, ConnectionMessage, LazyOption}, replications::state::ReplicationState, }, prelude::{PeerIdentifier, Topology}, @@ -52,19 +52,19 @@ impl ClusterActorSender { pub(crate) async fn wait_for_acceptance(&self) { let (tx, rx) = Callback::create(); - let _ = self.send(ClientMessage::CanEnter(tx)).await; + let _ = self.send(ClusterClientRequest::CanEnter(tx)).await; rx.wait().await; } pub(crate) async fn route_get_peers(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetPeers(tx)).await?; + self.send(ClusterClientRequest::GetPeers(tx)).await?; let peers = rx.recv().await; Ok(peers) } pub(crate) async fn route_get_topology(&self) -> anyhow::Result { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetTopology(tx)).await?; + self.send(ClusterClientRequest::GetTopology(tx)).await?; let peers = rx.recv().await; Ok(peers) } @@ -81,13 +81,13 @@ impl ClusterActorSender { pub(crate) async fn route_get_node_state(&self) -> anyhow::Result { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ReplicationState(tx)).await?; + self.send(ClusterClientRequest::ReplicationState(tx)).await?; Ok(rx.recv().await) } pub(crate) async fn route_get_leader_id(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetLeaderId(tx)).await?; + self.send(ClusterClientRequest::GetLeaderId(tx)).await?; Ok(rx.recv().await) } @@ -113,7 +113,7 @@ impl ClusterActorSender { peer_identifier: PeerIdentifier, ) -> anyhow::Result { let (tx, rx) = Callback::create(); - self.send(ClientMessage::Forget(peer_identifier, tx)).await?; + self.send(ClusterClientRequest::Forget(peer_identifier, tx)).await?; let Some(_) = rx.recv().await else { return Ok(false) }; Ok(true) } @@ -123,7 +123,7 @@ impl ClusterActorSender { peer_identifier: PeerIdentifier, ) -> anyhow::Result<()> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ReplicaOf(peer_identifier, tx)).await?; + self.send(ClusterClientRequest::ReplicaOf(peer_identifier, tx)).await?; rx.recv().await } @@ -133,24 +133,24 @@ impl ClusterActorSender { lazy_option: LazyOption, ) -> anyhow::Result<()> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ClusterMeet(peer_identifier, lazy_option, tx)).await?; + self.send(ClusterClientRequest::ClusterMeet(peer_identifier, lazy_option, tx)).await?; rx.recv().await } pub(crate) async fn route_cluster_reshard(&self) -> anyhow::Result<()> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ClusterReshard(tx)).await?; + self.send(ClusterClientRequest::ClusterReshard(tx)).await?; rx.recv().await } pub(crate) async fn route_cluster_nodes(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::ClusterNodes(tx)).await?; + self.send(ClusterClientRequest::ClusterNodes(tx)).await?; Ok(rx.recv().await) } pub(crate) async fn route_get_roles(&self) -> anyhow::Result> { let (tx, rx) = Callback::create(); - self.send(ClientMessage::GetRoles(tx)).await?; + self.send(ClusterClientRequest::GetRoles(tx)).await?; Ok(rx.recv().await.into_iter().map(|(id, role)| format!("{}:{}", id.0, role)).collect()) } @@ -158,7 +158,7 @@ impl ClusterActorSender { &self, ) -> tokio::sync::broadcast::Receiver { let (tx, rx) = Callback::create(); - let _ = self.send(ClientMessage::SubscribeToTopologyChange(tx)).await; + let _ = self.send(ClusterClientRequest::SubscribeToTopologyChange(tx)).await; rx.recv().await } } diff --git a/duva/src/domains/cluster_actors/service.rs b/duva/src/domains/cluster_actors/service.rs index 0535bcef..5b961a39 100644 --- a/duva/src/domains/cluster_actors/service.rs +++ b/duva/src/domains/cluster_actors/service.rs @@ -1,5 +1,5 @@ -use crate::domains::cluster_actors::ClientMessage; use crate::domains::cluster_actors::ClusterActor; +use crate::domains::cluster_actors::ClusterClientRequest; use crate::domains::cluster_actors::ClusterCommand; use crate::domains::cluster_actors::ConnectionMessage; use crate::domains::cluster_actors::SchedulerMessage; @@ -73,8 +73,8 @@ impl ClusterActor { } #[instrument(level = tracing::Level::DEBUG, skip(self, client_message))] - async fn process_client_message(&mut self, client_message: ClientMessage) { - use ClientMessage::*; + async fn process_client_message(&mut self, client_message: ClusterClientRequest) { + use ClusterClientRequest::*; match client_message { CanEnter(callback) => { @@ -96,7 +96,7 @@ impl ClusterActor { callback.send(None); } }, - LeaderReqConsensus(req) => { + MakeConsensus(req) => { self.leader_req_consensus(req).await; }, ReplicaOf(peer_addr, callback) => { diff --git a/duva/src/domains/peers/command.rs b/duva/src/domains/peers/command.rs index bcf2694d..2e450538 100644 --- a/duva/src/domains/peers/command.rs +++ b/duva/src/domains/peers/command.rs @@ -1,7 +1,7 @@ use crate::{ domains::{ caches::cache_objects::CacheEntry, - cluster_actors::{ClusterCommand, ConsensusRequest, hash_ring::HashRing}, + cluster_actors::{ClusterCommand, ConsensusReq, hash_ring::HashRing}, replications::{ReplicationId, ReplicationState, WriteOperation}, }, prelude::PeerIdentifier, @@ -188,12 +188,12 @@ pub(crate) struct QueuedKeysToMigrate { #[derive(Debug, Default)] pub(crate) struct PendingRequests { - requests: VecDeque, + requests: VecDeque, batches: HashMap, pub(crate) callbacks: Vec>, } impl PendingRequests { - pub(crate) fn add_req(&mut self, req: ConsensusRequest) { + pub(crate) fn add_req(&mut self, req: ConsensusReq) { self.requests.push_back(req); } pub(crate) fn store_batch(&mut self, id: BatchId, batch: QueuedKeysToMigrate) { @@ -202,7 +202,7 @@ impl PendingRequests { pub(crate) fn pop_batch(&mut self, id: &BatchId) -> Option { self.batches.remove(id) } - pub(crate) fn extract_requests(&mut self) -> VecDeque { + pub(crate) fn extract_requests(&mut self) -> VecDeque { std::mem::take(&mut self.requests) } diff --git a/duva/src/domains/replications/consensus/log.rs b/duva/src/domains/replications/consensus/log.rs index bb08e90b..487ebc86 100644 --- a/duva/src/domains/replications/consensus/log.rs +++ b/duva/src/domains/replications/consensus/log.rs @@ -1,7 +1,9 @@ use crate::{ - domains::{cluster_actors::ConsensusClientResponse, peers::identifier::PeerIdentifier}, + domains::{ + cluster_actors::{ConnectionOffset, ConsensusClientResponse}, + peers::identifier::PeerIdentifier, + }, make_smart_pointer, - presentation::clients::request::ClientReq, types::Callback, }; use std::collections::HashMap; @@ -16,15 +18,15 @@ pub struct LogConsensusVoting { pub(crate) voters: Vec, pub(crate) callback: Callback, pub(crate) cnt: u8, - pub(crate) session_req: Option, + pub(crate) conn_offset: Option, } impl LogConsensusVoting { pub(crate) fn new( callback: Callback, replica_count: usize, - session_req: Option, + conn_offset: Option, ) -> Self { - Self { callback, cnt: 1, voters: Vec::with_capacity(replica_count), session_req } + Self { callback, cnt: 1, voters: Vec::with_capacity(replica_count), conn_offset } } pub(crate) fn increase_vote(&mut self, voter: PeerIdentifier) { diff --git a/duva/src/domains/replications/operation.rs b/duva/src/domains/replications/operation.rs index 64df4a10..3e799fce 100644 --- a/duva/src/domains/replications/operation.rs +++ b/duva/src/domains/replications/operation.rs @@ -1,6 +1,5 @@ -use crate::{ - domains::{caches::cache_objects::CacheEntry, query_io::SERDE_CONFIG}, - presentation::clients::request::ClientReq, +use crate::domains::{ + caches::cache_objects::CacheEntry, cluster_actors::ConnectionOffset, query_io::SERDE_CONFIG, }; use bytes::Bytes; @@ -9,7 +8,7 @@ pub struct WriteOperation { pub(crate) entry: LogEntry, pub(crate) log_index: u64, pub(crate) term: u64, - pub(crate) session_req: Option, + pub(crate) conn_offset: Option, } /// Operations that appear in the Append-Only File (WAL). diff --git a/duva/src/domains/replications/replication.rs b/duva/src/domains/replications/replication.rs index 26e741eb..aeef6b22 100644 --- a/duva/src/domains/replications/replication.rs +++ b/duva/src/domains/replications/replication.rs @@ -1,5 +1,6 @@ use super::*; +use crate::domains::cluster_actors::ConnectionOffset; use crate::domains::peers::command::HeartBeat; use crate::domains::peers::command::RejectionReason; use crate::domains::peers::command::ReplicationAck; @@ -7,7 +8,7 @@ use crate::domains::peers::command::ReplicationAck; use crate::domains::peers::command::RequestVote; use crate::domains::peers::identifier::PeerIdentifier; use crate::err; -use crate::presentation::clients::request::ClientReq; + use std::fmt::Display; use std::sync::Arc; use std::sync::atomic::AtomicU64; @@ -175,13 +176,13 @@ impl Replication { &mut self, entry: LogEntry, current_term: u64, - session_req: Option, + conn_offset: Option, ) -> u64 { let op = WriteOperation { entry, log_index: self.last_log_index() + 1, term: current_term, - session_req, + conn_offset, }; self.in_mem_buffer.push(op); @@ -294,15 +295,15 @@ impl Replication { operations: Vec, prev_log_index: u64, prev_log_term: u64, - session_reqs: &mut Vec, + conn_offsets: &mut Vec, ) -> Result { let mut entries = Vec::with_capacity(operations.len()); let last_log_index = self.state.last_log_index; for mut log in operations { if log.log_index > last_log_index { - if let Some(session_req) = log.session_req.take() { - session_reqs.push(session_req); + if let Some(session_req) = log.conn_offset.take() { + conn_offsets.push(session_req); } entries.push(log); } diff --git a/duva/src/presentation/clients/controller.rs b/duva/src/presentation/clients/controller.rs index 58cb2f9e..b1a99204 100644 --- a/duva/src/presentation/clients/controller.rs +++ b/duva/src/presentation/clients/controller.rs @@ -3,11 +3,13 @@ use crate::domains::QueryIO; use crate::domains::caches::cache_manager::CacheManager; use crate::domains::caches::cache_objects::{CacheEntry, CacheValue, TypedValue}; use crate::domains::cluster_actors::queue::ClusterActorSender; -use crate::domains::cluster_actors::{ClientMessage, ConsensusClientResponse, ConsensusRequest}; +use crate::domains::cluster_actors::{ + ClusterClientRequest, ConnectionOffset, ConsensusClientResponse, ConsensusReq, +}; use crate::domains::replications::LogEntry; use crate::domains::saves::actor::SaveTarget; use crate::prelude::PeerIdentifier; -use crate::presentation::clients::request::{ClientReq, NonMutatingAction, ServerResponse}; +use crate::presentation::clients::request::{NonMutatingAction, ServerResponse}; use crate::types::{BinBytes, Callback}; use tracing::info; @@ -138,18 +140,18 @@ impl ClientController { pub(crate) async fn handle_mutating( &self, - session_req: ClientReq, + session_req: ConnectionOffset, entry: LogEntry, ) -> anyhow::Result { // * Consensus / Persisting logs let (callback, res) = Callback::create(); - let request_id = session_req.request_id; + let request_id = session_req.offset; self.cluster_actor_sender - .send(ClientMessage::LeaderReqConsensus(ConsensusRequest { + .send(ClusterClientRequest::MakeConsensus(ConsensusReq { entry, callback, - session_req: Some(session_req), + conn_offset: Some(session_req), })) .await?; diff --git a/duva/src/presentation/clients/request.rs b/duva/src/presentation/clients/request.rs index 6fe4b593..feff999d 100644 --- a/duva/src/presentation/clients/request.rs +++ b/duva/src/presentation/clients/request.rs @@ -2,7 +2,7 @@ use crate::{ domains::{ QueryIO, caches::cache_objects::CacheEntry, - cluster_actors::LazyOption, + cluster_actors::{ConnectionOffset, LazyOption}, peers::identifier::{PeerIdentifier, TPeerAddress}, replications::LogEntry, }, @@ -18,17 +18,6 @@ pub struct SessionRequest { pub action: ClientAction, } -#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] -pub struct ClientReq { - pub(crate) request_id: u64, - pub(crate) client_id: String, -} -impl ClientReq { - pub(crate) fn new(request_id: u64, client_id: String) -> Self { - Self { request_id, client_id } - } -} - #[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)] pub enum ClientAction { NonMutating(NonMutatingAction), @@ -313,7 +302,7 @@ pub fn extract_expiry(expiry: &str) -> anyhow::Result { #[derive(Clone, Debug)] pub struct ClientRequest { pub(crate) action: ClientAction, - pub(crate) session_req: ClientReq, + pub(crate) session_req: ConnectionOffset, } #[derive(Clone, Debug, bincode::Decode, bincode::Encode)] diff --git a/duva/src/presentation/clients/stream.rs b/duva/src/presentation/clients/stream.rs index 32736509..76c3321c 100644 --- a/duva/src/presentation/clients/stream.rs +++ b/duva/src/presentation/clients/stream.rs @@ -1,5 +1,6 @@ use super::{ClientController, request::ClientRequest}; use crate::domains::TSerdeRead; +use crate::domains::cluster_actors::ConnectionOffset; use crate::domains::cluster_actors::queue::ClusterActorSender; use crate::domains::cluster_actors::topology::Topology; use crate::domains::interface::TSerdeWrite; @@ -9,9 +10,7 @@ use crate::make_smart_pointer; use crate::prelude::ConnectionRequest; use crate::prelude::ConnectionResponse; use crate::prelude::ConnectionResponses; -use crate::presentation::clients::request::{ - ClientAction, ClientReq, ServerResponse, SessionRequest, -}; +use crate::presentation::clients::request::{ClientAction, ServerResponse, SessionRequest}; use tokio::{ net::tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -49,7 +48,7 @@ impl ClientStreamReader { let requests = query_ios.unwrap().into_iter().map(|query_io| { Ok(ClientRequest { action: query_io.action, - session_req: ClientReq::new(query_io.request_id, self.client_id.clone()), + session_req: ConnectionOffset::new(query_io.request_id, self.client_id.clone()), }) }); @@ -62,7 +61,7 @@ impl ClientStreamReader { break; }, Ok(ClientRequest { action, session_req }) => { - let request_id = session_req.request_id; + let request_id = session_req.offset; // * processing part let result = match action { ClientAction::NonMutating(non_mutating_action) => { From 4e60ff09ae159f1947e0fa50d9e2e2edcd48e730 Mon Sep 17 00:00:00 2001 From: Migo Date: Thu, 13 Nov 2025 20:43:50 +0400 Subject: [PATCH 2/3] connection offset --- duva-client/src/broker/mod.rs | 7 +++-- duva-client/src/broker/node_connections.rs | 3 ++- duva-client/src/command.rs | 16 +++++++----- duva/src/domains/cluster_actors/actor.rs | 26 +++++++++---------- .../cluster_actors/actor/tests/elections.rs | 4 +-- .../domains/cluster_actors/actor/tests/mod.rs | 6 ++--- .../actor/tests/replications.rs | 8 +++--- duva/src/domains/cluster_actors/command.rs | 18 ++++++------- duva/src/domains/peers/command.rs | 8 +++--- .../src/domains/replications/consensus/log.rs | 6 ++--- duva/src/lib.rs | 2 +- duva/src/presentation/clients/controller.rs | 18 ++++++------- duva/src/presentation/clients/request.rs | 19 +++++++------- duva/src/presentation/clients/stream.rs | 21 +++++++-------- 14 files changed, 84 insertions(+), 78 deletions(-) diff --git a/duva-client/src/broker/mod.rs b/duva-client/src/broker/mod.rs index baf4ffbf..648b7414 100644 --- a/duva-client/src/broker/mod.rs +++ b/duva-client/src/broker/mod.rs @@ -102,7 +102,7 @@ impl Broker { } else { context.callback(ServerResponse::Err { reason: "Failed to route command. Try again after ttl time".to_string(), - request_id: 0, + conn_offset: 0, }) }; }, @@ -292,7 +292,10 @@ impl Broker { // ! otherwise, server will not be able to process the next command match res { - ServerResponse::ReadRes { res: QueryIO::BulkString(..), request_id } => { + ServerResponse::ReadRes { + res: QueryIO::BulkString(..), + conn_offset: request_id, + } => { connection.request_id = connection.request_id.max(*request_id); }, ServerResponse::WriteRes { res: QueryIO::BulkString(..), log_index, .. } => { diff --git a/duva-client/src/broker/node_connections.rs b/duva-client/src/broker/node_connections.rs index 5ad142a3..e938e88a 100644 --- a/duva-client/src/broker/node_connections.rs +++ b/duva-client/src/broker/node_connections.rs @@ -97,7 +97,8 @@ impl NodeConnection { } pub(crate) async fn send(&self, client_action: ClientAction) -> anyhow::Result<()> { - let session_request = SessionRequest { request_id: self.request_id, action: client_action }; + let session_request = + SessionRequest { conn_offset: self.request_id, action: client_action }; self.writer .send(MsgToServer::Command(bincode::encode_to_vec(session_request, SERDE_CONFIG)?)) .await diff --git a/duva-client/src/command.rs b/duva-client/src/command.rs index 98827ee0..26a7926d 100644 --- a/duva-client/src/command.rs +++ b/duva-client/src/command.rs @@ -35,7 +35,7 @@ impl CommandQueue { let result = context .get_result() - .unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), request_id: 0 }); + .unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), conn_offset: 0 }); context.callback(result); } } @@ -84,11 +84,13 @@ impl InputContext { ClientAction::NonMutating(Keys { pattern: _ } | MGet { keys: _ }) => { let mut init = QueryIO::Array(Vec::with_capacity(iterator.len())); - while let Some(ServerResponse::ReadRes { res, request_id }) = iterator.next() { + while let Some(ServerResponse::ReadRes { res, conn_offset: request_id }) = + iterator.next() + { init = init.merge(res)?; highest_req_id = highest_req_id.max(request_id); } - Ok(ServerResponse::ReadRes { res: init, request_id: highest_req_id }) + Ok(ServerResponse::ReadRes { res: init, conn_offset: highest_req_id }) }, ClientAction::NonMutating(Exists { keys: _ }) => { @@ -96,7 +98,7 @@ impl InputContext { while let Some(ServerResponse::ReadRes { res: QueryIO::BulkString(byte), - request_id, + conn_offset: request_id, }) = iterator.next() { let num = String::from_utf8(byte.to_vec()) @@ -109,7 +111,7 @@ impl InputContext { Ok(ServerResponse::ReadRes { res: QueryIO::BulkString(BinBytes::new(count.to_string())), - request_id: highest_req_id, + conn_offset: highest_req_id, }) }, ClientAction::Mutating(LogEntry::Delete { keys: _ }) => { @@ -117,7 +119,7 @@ impl InputContext { while let Some(ServerResponse::WriteRes { res: QueryIO::BulkString(value), - request_id, + conn_offset: request_id, .. }) = iterator.next() { @@ -128,7 +130,7 @@ impl InputContext { Ok(ServerResponse::WriteRes { res: QueryIO::BulkString(BinBytes::new(count.to_string())), log_index: 0, // TODO - request_id: highest_req_id, + conn_offset: highest_req_id, }) }, _ => iterator.next().ok_or(anyhow::anyhow!("Expected exactly one result")), diff --git a/duva/src/domains/cluster_actors/actor.rs b/duva/src/domains/cluster_actors/actor.rs index 4b15800c..6ac872ef 100644 --- a/duva/src/domains/cluster_actors/actor.rs +++ b/duva/src/domains/cluster_actors/actor.rs @@ -1,6 +1,6 @@ use super::ClusterCommand; -use super::ConsensusClientResponse; -use super::ConsensusReq; +use super::ConsensusRequest; +use super::ConsensusResponse; use super::LazyOption; use super::hash_ring::HashRing; pub mod client_sessions; @@ -325,9 +325,9 @@ impl ClusterActor { } } - pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusReq) { + pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusRequest) { if !self.replication.is_leader() { - req.callback.send(ConsensusClientResponse::Result { + req.callback.send(ConsensusResponse::Result { res: Err(anyhow::anyhow!("Write given to follower")), log_index: self.replication.last_log_index(), }); @@ -337,7 +337,7 @@ impl ClusterActor { if self.client_sessions.is_processed(&req.conn_offset) { // mapping between early returned values to client result let key = req.entry.all_keys().into_iter().map(String::from).collect(); - req.callback.send(ConsensusClientResponse::AlreadyProcessed { + req.callback.send(ConsensusResponse::AlreadyProcessed { key, // TODO : remove unwrap request_id: req.conn_offset.unwrap().offset, @@ -358,14 +358,14 @@ impl ClusterActor { // To notify client's of what keys have been moved. // ! Still, client won't know where the key has been moved. The assumption here is client SHOULD have correct hashring information. let moved_keys = replids.except(&self.log_state().replid).join(" "); - req.callback.send(ConsensusClientResponse::Result { + req.callback.send(ConsensusResponse::Result { res: Err(anyhow::anyhow!("Moved! {moved_keys}")), log_index: self.replication.last_log_index(), }) }, Err(err) => { err!("{}", err); - req.callback.send(ConsensusClientResponse::Result { + req.callback.send(ConsensusResponse::Result { res: Err(anyhow::anyhow!(err)), log_index: self.replication.last_log_index(), }); @@ -373,7 +373,7 @@ impl ClusterActor { } } - async fn req_consensus(&mut self, req: ConsensusReq, send_in_mills: Option) { + async fn req_consensus(&mut self, req: ConsensusRequest, send_in_mills: Option) { let log_index = self.replication.write_single_entry( req.entry, self.log_state().term, @@ -387,7 +387,7 @@ impl ClusterActor { self.replication.increase_con_idx_by(1); let _ = self.replication.flush(); let res = self.commit_entry(entry.entry, log_index).await; - req.callback.send(ConsensusClientResponse::Result { res, log_index }); + req.callback.send(ConsensusResponse::Result { res, log_index }); return; } self.consensus_tracker @@ -959,7 +959,7 @@ impl ClusterActor { let res = self.commit_entry(log_entry.entry, log_index).await; let _ = self.replication.flush(); - voting.callback.send(ConsensusClientResponse::Result { res, log_index }); + voting.callback.send(ConsensusResponse::Result { res, log_index }); } async fn commit_entry(&mut self, entry: LogEntry, index: u64) -> anyhow::Result { @@ -1340,7 +1340,7 @@ impl ClusterActor { } let (callback, rx) = Callback::create(); - let req = ConsensusReq { + let req = ConsensusRequest { entry: LogEntry::MSet { entries: migrate_batch.entries.clone() }, callback, conn_offset: None, @@ -1365,7 +1365,7 @@ impl ClusterActor { }); } - async fn make_consensus_in_batch(&mut self, req: ConsensusReq) { + async fn make_consensus_in_batch(&mut self, req: ConsensusRequest) { self.req_consensus(req, None).await; let _ = self.replication.flush(); self.send_rpc().await; @@ -1384,7 +1384,7 @@ impl ClusterActor { // make consensus request for delete let (callback, rx) = Callback::create(); - let req = ConsensusReq { + let req = ConsensusRequest { entry: LogEntry::Delete { keys: pending_migration_batch.keys.clone() }, callback, conn_offset: None, diff --git a/duva/src/domains/cluster_actors/actor/tests/elections.rs b/duva/src/domains/cluster_actors/actor/tests/elections.rs index 089ee443..e4f76f8f 100644 --- a/duva/src/domains/cluster_actors/actor/tests/elections.rs +++ b/duva/src/domains/cluster_actors/actor/tests/elections.rs @@ -233,7 +233,7 @@ async fn test_become_candidate_not_allow_write_request_processing() { let (tx, rx) = Callback::create(); let session_req = ConnectionOffset::new(1, "client1".to_string()); - let consensus_request = ConsensusReq { + let consensus_request = ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("key".to_string(), "value") }, callback: tx, conn_offset: Some(session_req), @@ -248,5 +248,5 @@ async fn test_become_candidate_not_allow_write_request_processing() { assert!(value.is_null()); let res = rx.0.await.unwrap(); - assert!(matches!(res, ConsensusClientResponse::Result { res: Err(_), log_index: _ })) + assert!(matches!(res, ConsensusResponse::Result { res: Err(_), log_index: _ })) } diff --git a/duva/src/domains/cluster_actors/actor/tests/mod.rs b/duva/src/domains/cluster_actors/actor/tests/mod.rs index 20506643..3e1fb60e 100644 --- a/duva/src/domains/cluster_actors/actor/tests/mod.rs +++ b/duva/src/domains/cluster_actors/actor/tests/mod.rs @@ -237,10 +237,10 @@ impl Helper { } fn consensus_request( - callback: Callback, + callback: Callback, session_req: Option, - ) -> ConsensusReq { - ConsensusReq { + ) -> ConsensusRequest { + ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, callback, conn_offset: session_req, diff --git a/duva/src/domains/cluster_actors/actor/tests/replications.rs b/duva/src/domains/cluster_actors/actor/tests/replications.rs index 0137c7dc..31dfdc40 100644 --- a/duva/src/domains/cluster_actors/actor/tests/replications.rs +++ b/duva/src/domains/cluster_actors/actor/tests/replications.rs @@ -433,7 +433,7 @@ async fn req_consensus_inserts_consensus_voting() { let session_request = ConnectionOffset::new(1, client_id); let w_req = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }; let consensus_request = - ConsensusReq { entry: w_req.clone(), callback, conn_offset: Some(session_request.clone()) }; + ConsensusRequest { entry: w_req.clone(), callback, conn_offset: Some(session_request.clone()) }; // WHEN leader_c_actor.req_consensus(consensus_request, None).await; @@ -462,7 +462,7 @@ async fn test_leader_req_consensus_early_return_when_already_processed_session_r tokio::spawn(cluster_actor.handle()); let (callback, rx) = Callback::create(); handler - .send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(ConsensusReq { + .send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }, callback, conn_offset: Some(client_req), @@ -619,7 +619,7 @@ async fn test_leader_req_consensus_with_processed_session() { // WHEN - send request with already processed session let (tx, rx) = Callback::create(); - let consensus_request = ConsensusReq { + let consensus_request = ConsensusRequest { entry: LogEntry::Set { entry: CacheEntry::new("test_key".to_string(), "test_value") }, callback: tx, conn_offset: Some(session_req), @@ -632,7 +632,7 @@ async fn test_leader_req_consensus_with_processed_session() { assert_eq!(cluster_actor.log_state().last_log_index, 0); // Verify the response indicates already processed - let ConsensusClientResponse::AlreadyProcessed { key, request_id: 1 } = rx.recv().await else { + let ConsensusResponse::AlreadyProcessed { key, request_id: 1 } = rx.recv().await else { panic!("Expected AlreadyProcessed response"); }; assert_eq!(key, vec!["test_key".to_string()]); diff --git a/duva/src/domains/cluster_actors/command.rs b/duva/src/domains/cluster_actors/command.rs index de42985a..69bbc60b 100644 --- a/duva/src/domains/cluster_actors/command.rs +++ b/duva/src/domains/cluster_actors/command.rs @@ -57,7 +57,7 @@ pub enum ClusterClientRequest { ReplicationState(Callback), Forget(PeerIdentifier, Callback>), ReplicaOf(PeerIdentifier, Callback>), - MakeConsensus(ConsensusReq), + MakeConsensus(ConsensusRequest), ClusterNodes(Callback>), GetRoles(Callback>), SubscribeToTopologyChange(Callback>), @@ -74,13 +74,6 @@ impl From for ClusterCommand { } } -#[derive(Debug, PartialEq, Eq)] -pub(crate) struct ConsensusReq { - pub(crate) entry: LogEntry, - pub(crate) callback: Callback, - pub(crate) conn_offset: Option, -} - #[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] pub struct ConnectionOffset { pub(crate) offset: u64, @@ -92,8 +85,15 @@ impl ConnectionOffset { } } +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct ConsensusRequest { + pub(crate) entry: LogEntry, + pub(crate) callback: Callback, + pub(crate) conn_offset: Option, +} + #[derive(Debug)] -pub(crate) enum ConsensusClientResponse { +pub(crate) enum ConsensusResponse { AlreadyProcessed { key: Vec, request_id: u64 }, Result { res: anyhow::Result, log_index: u64 }, } diff --git a/duva/src/domains/peers/command.rs b/duva/src/domains/peers/command.rs index 2e450538..bcf2694d 100644 --- a/duva/src/domains/peers/command.rs +++ b/duva/src/domains/peers/command.rs @@ -1,7 +1,7 @@ use crate::{ domains::{ caches::cache_objects::CacheEntry, - cluster_actors::{ClusterCommand, ConsensusReq, hash_ring::HashRing}, + cluster_actors::{ClusterCommand, ConsensusRequest, hash_ring::HashRing}, replications::{ReplicationId, ReplicationState, WriteOperation}, }, prelude::PeerIdentifier, @@ -188,12 +188,12 @@ pub(crate) struct QueuedKeysToMigrate { #[derive(Debug, Default)] pub(crate) struct PendingRequests { - requests: VecDeque, + requests: VecDeque, batches: HashMap, pub(crate) callbacks: Vec>, } impl PendingRequests { - pub(crate) fn add_req(&mut self, req: ConsensusReq) { + pub(crate) fn add_req(&mut self, req: ConsensusRequest) { self.requests.push_back(req); } pub(crate) fn store_batch(&mut self, id: BatchId, batch: QueuedKeysToMigrate) { @@ -202,7 +202,7 @@ impl PendingRequests { pub(crate) fn pop_batch(&mut self, id: &BatchId) -> Option { self.batches.remove(id) } - pub(crate) fn extract_requests(&mut self) -> VecDeque { + pub(crate) fn extract_requests(&mut self) -> VecDeque { std::mem::take(&mut self.requests) } diff --git a/duva/src/domains/replications/consensus/log.rs b/duva/src/domains/replications/consensus/log.rs index 487ebc86..7bfe086f 100644 --- a/duva/src/domains/replications/consensus/log.rs +++ b/duva/src/domains/replications/consensus/log.rs @@ -1,6 +1,6 @@ use crate::{ domains::{ - cluster_actors::{ConnectionOffset, ConsensusClientResponse}, + cluster_actors::{ConnectionOffset, ConsensusResponse}, peers::identifier::PeerIdentifier, }, make_smart_pointer, @@ -16,13 +16,13 @@ make_smart_pointer!(LogConsensusTracker, HashMap); #[derive(Debug)] pub struct LogConsensusVoting { pub(crate) voters: Vec, - pub(crate) callback: Callback, + pub(crate) callback: Callback, pub(crate) cnt: u8, pub(crate) conn_offset: Option, } impl LogConsensusVoting { pub(crate) fn new( - callback: Callback, + callback: Callback, replica_count: usize, conn_offset: Option, ) -> Self { diff --git a/duva/src/lib.rs b/duva/src/lib.rs index bed8d815..fd72780c 100644 --- a/duva/src/lib.rs +++ b/duva/src/lib.rs @@ -255,7 +255,7 @@ impl StartUpFacade { cache_manager: self.cache_manager.clone(), }; tokio::spawn( - ClientStreamReader { client_id, r: read_half } + ClientStreamReader { conn_id: client_id, r: read_half } .handle_client_stream(client_controller, writer.run(observer)), ); }, diff --git a/duva/src/presentation/clients/controller.rs b/duva/src/presentation/clients/controller.rs index b1a99204..0767406d 100644 --- a/duva/src/presentation/clients/controller.rs +++ b/duva/src/presentation/clients/controller.rs @@ -4,7 +4,7 @@ use crate::domains::caches::cache_manager::CacheManager; use crate::domains::caches::cache_objects::{CacheEntry, CacheValue, TypedValue}; use crate::domains::cluster_actors::queue::ClusterActorSender; use crate::domains::cluster_actors::{ - ClusterClientRequest, ConnectionOffset, ConsensusClientResponse, ConsensusReq, + ClusterClientRequest, ConnectionOffset, ConsensusRequest, ConsensusResponse, }; use crate::domains::replications::LogEntry; use crate::domains::saves::actor::SaveTarget; @@ -135,31 +135,31 @@ impl ClientController { }; info!("{response:?}"); - Ok(ServerResponse::ReadRes { res: response, request_id }) + Ok(ServerResponse::ReadRes { res: response, conn_offset: request_id }) } pub(crate) async fn handle_mutating( &self, - session_req: ConnectionOffset, + conn_offset: u64, + conn_id: String, entry: LogEntry, ) -> anyhow::Result { // * Consensus / Persisting logs let (callback, res) = Callback::create(); - let request_id = session_req.offset; self.cluster_actor_sender - .send(ClusterClientRequest::MakeConsensus(ConsensusReq { + .send(ClusterClientRequest::MakeConsensus(ConsensusRequest { entry, callback, - conn_offset: Some(session_req), + conn_offset: Some(ConnectionOffset::new(conn_offset, conn_id)), })) .await?; let result = match res.recv().await { - ConsensusClientResponse::Result { res, log_index } => { - ServerResponse::WriteRes { res: res?, log_index, request_id } + ConsensusResponse::Result { res, log_index } => { + ServerResponse::WriteRes { res: res?, log_index, conn_offset } }, - ConsensusClientResponse::AlreadyProcessed { key: keys, request_id } => { + ConsensusResponse::AlreadyProcessed { key: keys, request_id } => { // * Conversion! request has already been processed so we need to convert it to get let action = NonMutatingAction::MGet { keys }; self.handle_non_mutating(action, request_id).await? diff --git a/duva/src/presentation/clients/request.rs b/duva/src/presentation/clients/request.rs index feff999d..6ca02df4 100644 --- a/duva/src/presentation/clients/request.rs +++ b/duva/src/presentation/clients/request.rs @@ -2,7 +2,7 @@ use crate::{ domains::{ QueryIO, caches::cache_objects::CacheEntry, - cluster_actors::{ConnectionOffset, LazyOption}, + cluster_actors::LazyOption, peers::identifier::{PeerIdentifier, TPeerAddress}, replications::LogEntry, }, @@ -14,7 +14,7 @@ use std::str::FromStr; #[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)] pub struct SessionRequest { - pub request_id: u64, + pub conn_offset: u64, pub action: ClientAction, } @@ -302,23 +302,24 @@ pub fn extract_expiry(expiry: &str) -> anyhow::Result { #[derive(Clone, Debug)] pub struct ClientRequest { pub(crate) action: ClientAction, - pub(crate) session_req: ConnectionOffset, + pub(crate) conn_offset: u64, + pub(crate) conn_id: String, } #[derive(Clone, Debug, bincode::Decode, bincode::Encode)] pub enum ServerResponse { - WriteRes { res: QueryIO, log_index: u64, request_id: u64 }, - ReadRes { res: QueryIO, request_id: u64 }, + WriteRes { res: QueryIO, log_index: u64, conn_offset: u64 }, + ReadRes { res: QueryIO, conn_offset: u64 }, TopologyChange(Topology), - Err { reason: String, request_id: u64 }, + Err { reason: String, conn_offset: u64 }, } impl ServerResponse { pub fn request_id(&self) -> Option { match self { - ServerResponse::WriteRes { request_id, .. } - | ServerResponse::ReadRes { request_id, .. } - | ServerResponse::Err { request_id, .. } => Some(*request_id), + ServerResponse::WriteRes { conn_offset, .. } + | ServerResponse::ReadRes { conn_offset, .. } + | ServerResponse::Err { conn_offset, .. } => Some(*conn_offset), ServerResponse::TopologyChange(..) => None, } diff --git a/duva/src/presentation/clients/stream.rs b/duva/src/presentation/clients/stream.rs index 76c3321c..1ef46040 100644 --- a/duva/src/presentation/clients/stream.rs +++ b/duva/src/presentation/clients/stream.rs @@ -1,6 +1,5 @@ use super::{ClientController, request::ClientRequest}; use crate::domains::TSerdeRead; -use crate::domains::cluster_actors::ConnectionOffset; use crate::domains::cluster_actors::queue::ClusterActorSender; use crate::domains::cluster_actors::topology::Topology; use crate::domains::interface::TSerdeWrite; @@ -20,11 +19,11 @@ use tracing::{error, info, instrument}; pub struct ClientStreamReader { pub(crate) r: OwnedReadHalf, - pub(crate) client_id: String, + pub(crate) conn_id: String, } impl ClientStreamReader { - #[instrument(level = tracing::Level::DEBUG, skip(self, handler, stream_writer_sender),fields(client_id= %self.client_id))] + #[instrument(level = tracing::Level::DEBUG, skip(self, handler, stream_writer_sender),fields(client_id= %self.conn_id))] pub(crate) async fn handle_client_stream( mut self, handler: ClientController, @@ -39,7 +38,7 @@ impl ClientStreamReader { return; } let _ = stream_writer_sender - .send(ServerResponse::Err { reason: err.to_string(), request_id: 0 }) + .send(ServerResponse::Err { reason: err.to_string(), conn_offset: 0 }) .await; continue; } @@ -48,7 +47,8 @@ impl ClientStreamReader { let requests = query_ios.unwrap().into_iter().map(|query_io| { Ok(ClientRequest { action: query_io.action, - session_req: ConnectionOffset::new(query_io.request_id, self.client_id.clone()), + conn_offset: query_io.conn_offset, + conn_id: self.conn_id.clone(), }) }); @@ -56,25 +56,24 @@ impl ClientStreamReader { match req { Err(err) => { let _ = stream_writer_sender - .send(ServerResponse::Err { reason: err, request_id: 0 }) + .send(ServerResponse::Err { reason: err, conn_offset: 0 }) .await; break; }, - Ok(ClientRequest { action, session_req }) => { - let request_id = session_req.offset; + Ok(ClientRequest { action, conn_offset, conn_id }) => { // * processing part let result = match action { ClientAction::NonMutating(non_mutating_action) => { - handler.handle_non_mutating(non_mutating_action, request_id).await + handler.handle_non_mutating(non_mutating_action, conn_offset).await }, ClientAction::Mutating(log_entry) => { - handler.handle_mutating(session_req, log_entry).await + handler.handle_mutating(conn_offset, conn_id, log_entry).await }, }; let response = result.unwrap_or_else(|e| { error!("failure on state change / query {e}"); - ServerResponse::Err { reason: e.to_string(), request_id } + ServerResponse::Err { reason: e.to_string(), conn_offset } }); if stream_writer_sender.send(response).await.is_err() { return; From 05cffb45847fd4cfcfa52c833a1b3ece57b2224e Mon Sep 17 00:00:00 2001 From: Migo Date: Thu, 13 Nov 2025 20:45:18 +0400 Subject: [PATCH 3/3] linter --- .../src/domains/cluster_actors/actor/tests/replications.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/duva/src/domains/cluster_actors/actor/tests/replications.rs b/duva/src/domains/cluster_actors/actor/tests/replications.rs index 31dfdc40..7b0761d6 100644 --- a/duva/src/domains/cluster_actors/actor/tests/replications.rs +++ b/duva/src/domains/cluster_actors/actor/tests/replications.rs @@ -432,8 +432,11 @@ async fn req_consensus_inserts_consensus_voting() { let client_id = Uuid::now_v7().to_string(); let session_request = ConnectionOffset::new(1, client_id); let w_req = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") }; - let consensus_request = - ConsensusRequest { entry: w_req.clone(), callback, conn_offset: Some(session_request.clone()) }; + let consensus_request = ConsensusRequest { + entry: w_req.clone(), + callback, + conn_offset: Some(session_request.clone()), + }; // WHEN leader_c_actor.req_consensus(consensus_request, None).await;