Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions duva-client/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Broker {
} else {
context.callback(ServerResponse::Err {
reason: "Failed to route command. Try again after ttl time".to_string(),
request_id: 0,
conn_offset: 0,
})
};
},
Expand Down Expand Up @@ -292,7 +292,10 @@ impl Broker {
// ! otherwise, server will not be able to process the next command

match res {
ServerResponse::ReadRes { res: QueryIO::BulkString(..), request_id } => {
ServerResponse::ReadRes {
res: QueryIO::BulkString(..),
conn_offset: request_id,
} => {
connection.request_id = connection.request_id.max(*request_id);
},
ServerResponse::WriteRes { res: QueryIO::BulkString(..), log_index, .. } => {
Expand Down
3 changes: 2 additions & 1 deletion duva-client/src/broker/node_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ impl NodeConnection {
}

pub(crate) async fn send(&self, client_action: ClientAction) -> anyhow::Result<()> {
let session_request = SessionRequest { request_id: self.request_id, action: client_action };
let session_request =
SessionRequest { conn_offset: self.request_id, action: client_action };
self.writer
.send(MsgToServer::Command(bincode::encode_to_vec(session_request, SERDE_CONFIG)?))
.await
Expand Down
16 changes: 9 additions & 7 deletions duva-client/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl CommandQueue {

let result = context
.get_result()
.unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), request_id: 0 });
.unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), conn_offset: 0 });
context.callback(result);
}
}
Expand Down Expand Up @@ -84,19 +84,21 @@ impl InputContext {
ClientAction::NonMutating(Keys { pattern: _ } | MGet { keys: _ }) => {
let mut init = QueryIO::Array(Vec::with_capacity(iterator.len()));

while let Some(ServerResponse::ReadRes { res, request_id }) = iterator.next() {
while let Some(ServerResponse::ReadRes { res, conn_offset: request_id }) =
iterator.next()
{
init = init.merge(res)?;
highest_req_id = highest_req_id.max(request_id);
}
Ok(ServerResponse::ReadRes { res: init, request_id: highest_req_id })
Ok(ServerResponse::ReadRes { res: init, conn_offset: highest_req_id })
},

ClientAction::NonMutating(Exists { keys: _ }) => {
let mut count = 0;

while let Some(ServerResponse::ReadRes {
res: QueryIO::BulkString(byte),
request_id,
conn_offset: request_id,
}) = iterator.next()
{
let num = String::from_utf8(byte.to_vec())
Expand All @@ -109,15 +111,15 @@ impl InputContext {

Ok(ServerResponse::ReadRes {
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
request_id: highest_req_id,
conn_offset: highest_req_id,
})
},
ClientAction::Mutating(LogEntry::Delete { keys: _ }) => {
let mut count = 0;

while let Some(ServerResponse::WriteRes {
res: QueryIO::BulkString(value),
request_id,
conn_offset: request_id,
..
}) = iterator.next()
{
Expand All @@ -128,7 +130,7 @@ impl InputContext {
Ok(ServerResponse::WriteRes {
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
log_index: 0, // TODO
request_id: highest_req_id,
conn_offset: highest_req_id,
})
},
_ => iterator.next().ok_or(anyhow::anyhow!("Expected exactly one result")),
Expand Down
16 changes: 8 additions & 8 deletions duva/src/adapters/op_logs/disk_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ mod tests {
entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") },
log_index: index,
term,
session_req: None,
conn_offset: None,
}
}

Expand Down Expand Up @@ -576,7 +576,7 @@ mod tests {
let request = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") };

let write_op =
WriteOperation { entry: request.clone(), log_index: 0, term: 0, session_req: None };
WriteOperation { entry: request.clone(), log_index: 0, term: 0, conn_offset: None };

// WHEN
op_logs.write_many(vec![write_op]).unwrap();
Expand Down Expand Up @@ -743,7 +743,7 @@ mod tests {
},
log_index: i as u64,
term: 1,
session_req: None,
conn_offset: None,
}])?;
}
// Force rotation
Expand All @@ -753,7 +753,7 @@ mod tests {
entry: LogEntry::Set { entry: CacheEntry::new("new".to_string(), "value") },
log_index: 100,
term: 1,
session_req: None,
conn_offset: None,
}])?;

// WHEN
Expand Down Expand Up @@ -826,7 +826,7 @@ mod tests {
},
log_index: start_index + i as u64,
term,
session_req: None,
conn_offset: None,
})
.collect()
}
Expand Down Expand Up @@ -1065,7 +1065,7 @@ mod tests {
},
log_index: i as u64,
term: 1,
session_req: None,
conn_offset: None,
})
.collect(),
)?;
Expand All @@ -1084,7 +1084,7 @@ mod tests {
entry: LogEntry::Set { entry: CacheEntry::new("new".to_string(), "value") },
log_index: 100,
term: 1,
session_req: None,
conn_offset: None,
}])?;

// Verify index data in active segment
Expand All @@ -1110,7 +1110,7 @@ mod tests {
},
log_index: i as u64,
term: 1,
session_req: None,
conn_offset: None,
})
.collect(),
)?;
Expand Down
30 changes: 15 additions & 15 deletions duva/src/domains/cluster_actors/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::ClusterCommand;
use super::ConsensusClientResponse;
use super::ConsensusRequest;
use super::ConsensusResponse;
use super::LazyOption;
use super::hash_ring::HashRing;
pub mod client_sessions;
Expand Down Expand Up @@ -327,20 +327,20 @@ impl<T: TWriteAheadLog> ClusterActor<T> {

pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusRequest) {
if !self.replication.is_leader() {
req.callback.send(ConsensusClientResponse::Result {
req.callback.send(ConsensusResponse::Result {
res: Err(anyhow::anyhow!("Write given to follower")),
log_index: self.replication.last_log_index(),
});
return;
}

if self.client_sessions.is_processed(&req.session_req) {
if self.client_sessions.is_processed(&req.conn_offset) {
// mapping between early returned values to client result
let key = req.entry.all_keys().into_iter().map(String::from).collect();
req.callback.send(ConsensusClientResponse::AlreadyProcessed {
req.callback.send(ConsensusResponse::AlreadyProcessed {
key,
// TODO : remove unwrap
request_id: req.session_req.unwrap().request_id,
request_id: req.conn_offset.unwrap().offset,
});
return;
};
Expand All @@ -358,14 +358,14 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
// To notify client's of what keys have been moved.
// ! Still, client won't know where the key has been moved. The assumption here is client SHOULD have correct hashring information.
let moved_keys = replids.except(&self.log_state().replid).join(" ");
req.callback.send(ConsensusClientResponse::Result {
req.callback.send(ConsensusResponse::Result {
res: Err(anyhow::anyhow!("Moved! {moved_keys}")),
log_index: self.replication.last_log_index(),
})
},
Err(err) => {
err!("{}", err);
req.callback.send(ConsensusClientResponse::Result {
req.callback.send(ConsensusResponse::Result {
res: Err(anyhow::anyhow!(err)),
log_index: self.replication.last_log_index(),
});
Expand All @@ -377,7 +377,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
let log_index = self.replication.write_single_entry(
req.entry,
self.log_state().term,
req.session_req.clone(),
req.conn_offset.clone(),
);

let repl_cnt = self.replicas().count();
Expand All @@ -387,11 +387,11 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
self.replication.increase_con_idx_by(1);
let _ = self.replication.flush();
let res = self.commit_entry(entry.entry, log_index).await;
req.callback.send(ConsensusClientResponse::Result { res, log_index });
req.callback.send(ConsensusResponse::Result { res, log_index });
return;
}
self.consensus_tracker
.insert(log_index, LogConsensusVoting::new(req.callback, repl_cnt, req.session_req));
.insert(log_index, LogConsensusVoting::new(req.callback, repl_cnt, req.conn_offset));

if let Some(send_in_mills) = send_in_mills {
tokio::spawn({
Expand Down Expand Up @@ -954,12 +954,12 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
}

self.replication.increase_con_idx_by(1);
self.client_sessions.set_response(voting.session_req.take());
self.client_sessions.set_response(voting.conn_offset.take());
let log_entry = self.replication.read_at(log_index).unwrap();
let res = self.commit_entry(log_entry.entry, log_index).await;
let _ = self.replication.flush();

voting.callback.send(ConsensusClientResponse::Result { res, log_index });
voting.callback.send(ConsensusResponse::Result { res, log_index });
}

async fn commit_entry(&mut self, entry: LogEntry, index: u64) -> anyhow::Result<QueryIO> {
Expand Down Expand Up @@ -1343,7 +1343,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
let req = ConsensusRequest {
entry: LogEntry::MSet { entries: migrate_batch.entries.clone() },
callback,
session_req: None,
conn_offset: None,
};
self.make_consensus_in_batch(req).await;

Expand Down Expand Up @@ -1387,7 +1387,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
let req = ConsensusRequest {
entry: LogEntry::Delete { keys: pending_migration_batch.keys.clone() },
callback,
session_req: None,
conn_offset: None,
};
self.make_consensus_in_batch(req).await;

Expand Down Expand Up @@ -1428,7 +1428,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {

while let Some(req) = pending_reqs.pop_front() {
if let Err(err) = handler
.send(ClusterCommand::Client(ClientMessage::LeaderReqConsensus(req)))
.send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(req)))
.await
{
error!("{}", err)
Expand Down
16 changes: 8 additions & 8 deletions duva/src/domains/cluster_actors/actor/client_sessions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{make_smart_pointer, presentation::clients::request::ClientReq};
use crate::{domains::cluster_actors::ConnectionOffset, make_smart_pointer};
use chrono::{DateTime, Utc};
use std::collections::HashMap;

Expand All @@ -13,22 +13,22 @@ pub(crate) struct Session {
}

impl ClientSessions {
pub(crate) fn is_processed(&self, req: &Option<ClientReq>) -> bool {
pub(crate) fn is_processed(&self, req: &Option<ConnectionOffset>) -> bool {
if let Some(session_req) = req
&& let Some(session) = self.get(&session_req.client_id)
&& let Some(session) = self.get(&session_req.conn_id)
&& let Some(res) = session.processed_req_id.as_ref()
{
return *res == session_req.request_id;
return *res == session_req.offset;
}

false
}
pub(crate) fn set_response(&mut self, session_req: Option<ClientReq>) {
let Some(session_req) = session_req else { return };
pub(crate) fn set_response(&mut self, conn_offset: Option<ConnectionOffset>) {
let Some(conn_offset) = conn_offset else { return };
let entry = self
.entry(session_req.client_id)
.entry(conn_offset.conn_id)
.or_insert(Session { last_accessed: Default::default(), processed_req_id: None });
entry.last_accessed = Utc::now();
entry.processed_req_id = Some(session_req.request_id);
entry.processed_req_id = Some(conn_offset.offset);
}
}
10 changes: 5 additions & 5 deletions duva/src/domains/cluster_actors/actor/tests/elections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn test_vote_election_deny_vote_older_log() {
log_index: initial_term + 2,
term: initial_term,
entry: LogEntry::Set { entry: CacheEntry::new("k".to_string(), "v") },
session_req: None,
conn_offset: None,
}])
.unwrap(); // Follower log: idx 2, term 2

Expand Down Expand Up @@ -189,7 +189,7 @@ async fn test_receive_election_vote_candidate_wins_election() {
entry: LogEntry::NoOp,
log_index: candidate_actor.replication.last_log_index(),
term: candidate_actor.replication.last_log_term(),
session_req: None,
conn_offset: None,
}],
..Default::default()
};
Expand Down Expand Up @@ -231,12 +231,12 @@ async fn test_become_candidate_not_allow_write_request_processing() {
// GIVEN: A candidate actor
let mut candidate_actor = Helper::cluster_actor(ReplicationRole::Follower).await;
let (tx, rx) = Callback::create();
let session_req = ClientReq::new(1, "client1".to_string());
let session_req = ConnectionOffset::new(1, "client1".to_string());

let consensus_request = ConsensusRequest {
entry: LogEntry::Set { entry: CacheEntry::new("key".to_string(), "value") },
callback: tx,
session_req: Some(session_req),
conn_offset: Some(session_req),
};
// WHEN: A write request is received
candidate_actor.become_candidate();
Expand All @@ -248,5 +248,5 @@ async fn test_become_candidate_not_allow_write_request_processing() {
assert!(value.is_null());

let res = rx.0.await.unwrap();
assert!(matches!(res, ConsensusClientResponse::Result { res: Err(_), log_index: _ }))
assert!(matches!(res, ConsensusResponse::Result { res: Err(_), log_index: _ }))
}
Loading
Loading