From eb01f2213854c251c3b99910a660cf8cf0478fa4 Mon Sep 17 00:00:00 2001 From: Gyubong Date: Mon, 26 Feb 2024 19:05:38 +0900 Subject: [PATCH 1/2] Implement `set_on_member_changed` in rust --- Cargo.lock | 1 + examples/memstore/dynamic-members/src/main.rs | 8 ++++++-- examples/memstore/static-members/src/main.rs | 8 ++++++-- raftify/Cargo.toml | 1 + raftify/src/custom_callbacks.rs | 16 ++++++++++++++++ raftify/src/lib.rs | 1 + raftify/src/raft_node/mod.rs | 6 ++++++ 7 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 raftify/src/custom_callbacks.rs diff --git a/Cargo.lock b/Cargo.lock index 102b8da9..c0d57169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1765,6 +1765,7 @@ dependencies = [ "heed", "heed-traits", "jopemachine-raft", + "lazy_static", "log", "parking_lot", "prost", diff --git a/examples/memstore/dynamic-members/src/main.rs b/examples/memstore/dynamic-members/src/main.rs index d473d624..0df094f4 100644 --- a/examples/memstore/dynamic-members/src/main.rs +++ b/examples/memstore/dynamic-members/src/main.rs @@ -5,8 +5,7 @@ extern crate slog_term; use actix_web::{web, App, HttpServer}; use raftify::{ - raft::{formatter::set_custom_formatter, logger::Slogger}, - CustomFormatter, Raft as Raft_, + custom_callbacks::set_on_member_changed, raft::{formatter::set_custom_formatter, logger::Slogger}, CustomFormatter, Raft as Raft_ }; use slog::Drain; use slog_envlogger::LogBuilder; @@ -54,6 +53,11 @@ async fn main() -> std::result::Result<(), Box> { slog: slog::Logger::root(drain, o!()), }); + set_on_member_changed(Box::new(|peers_| Box::pin(async move { + let peers_ = peers_.lock().await; + println!("Peers changed: {:?}", peers_.inner); + }))).await; + set_custom_formatter(CustomFormatter::::new()); let options = Options::from_args(); diff --git a/examples/memstore/static-members/src/main.rs b/examples/memstore/static-members/src/main.rs index adab9867..904a72c2 100644 --- a/examples/memstore/static-members/src/main.rs +++ b/examples/memstore/static-members/src/main.rs @@ -5,8 +5,7 @@ extern crate slog_term; use actix_web::{web, App, HttpServer}; use raftify::{ - raft::{formatter::set_custom_formatter, logger::Slogger}, - ClusterJoinTicket, CustomFormatter, Raft as Raft_, + custom_callbacks::set_on_member_changed, raft::{formatter::set_custom_formatter, logger::{Logger, Slogger}}, CustomFormatter, Raft as Raft_ }; use slog::Drain; use slog_envlogger::LogBuilder; @@ -57,6 +56,11 @@ async fn main() -> std::result::Result<(), Box> { slog: slog::Logger::root(drain, o!()), }); + set_on_member_changed(Box::new(|peers_| Box::pin(async move { + let peers_ = peers_.lock().await; + println!("Peers changed: {:?}", peers_.inner); + }))).await; + set_custom_formatter(CustomFormatter::::new()); let options = Options::from_args(); diff --git a/raftify/Cargo.toml b/raftify/Cargo.toml index f5e66add..e4e05a66 100644 --- a/raftify/Cargo.toml +++ b/raftify/Cargo.toml @@ -29,6 +29,7 @@ tonic = "0.9.2" built = "0.5" clap = "3.0" chrono = "0.4.31" +lazy_static = "1.4.0" [build-dependencies] tonic-build = "0.9.2" diff --git a/raftify/src/custom_callbacks.rs b/raftify/src/custom_callbacks.rs new file mode 100644 index 00000000..0aed6088 --- /dev/null +++ b/raftify/src/custom_callbacks.rs @@ -0,0 +1,16 @@ +use lazy_static::lazy_static; +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::sync::Mutex; + +use crate::Peers; + +type ON_MEMBER_CHANGED_CALLBACK = dyn FnMut(Arc>) -> Pin + Send>> + Send + Sync; + +lazy_static! { + pub static ref ON_MEMBER_CHANGED: Arc>>> = Arc::new(Mutex::new(None)); +} + +pub async fn set_on_member_changed(callback: Box) { + let mut global_callback = ON_MEMBER_CHANGED.lock().await; + *global_callback = Some(callback); +} diff --git a/raftify/src/lib.rs b/raftify/src/lib.rs index 7589db2a..25ced01a 100644 --- a/raftify/src/lib.rs +++ b/raftify/src/lib.rs @@ -18,6 +18,7 @@ mod storage; mod utils; pub mod cli; +pub mod custom_callbacks; pub mod raft_service; pub use { diff --git a/raftify/src/raft_node/mod.rs b/raftify/src/raft_node/mod.rs index 0e05f6bb..2bcddb7d 100644 --- a/raftify/src/raft_node/mod.rs +++ b/raftify/src/raft_node/mod.rs @@ -28,6 +28,7 @@ use utils::inspect_raftnode; use crate::{ create_client, + custom_callbacks::{set_on_member_changed, ON_MEMBER_CHANGED}, error::{Result, SendMessageError}, raft::{ eraftpb::{ @@ -907,6 +908,11 @@ impl< } } + let mut on_member_changed = ON_MEMBER_CHANGED.lock().await; + if let Some(cb) = on_member_changed.as_mut() { + let _ = cb(self.peers.clone()).await; + } + Ok(()) } From 427dd6c912d3c15c6e9571bfb32ee3cedcbb0ca3 Mon Sep 17 00:00:00 2001 From: Gyubong Date: Tue, 27 Feb 2024 10:57:59 +0900 Subject: [PATCH 2/2] Delete obsolete codes --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 82b2e7f0..934db088 100644 --- a/README.md +++ b/README.md @@ -137,7 +137,7 @@ let raft_handle = tokio::spawn(raft.clone().run()); raft.join(vec![join_ticket]).await; // ... -tokio::try_join!(join_ticket)?; +tokio::try_join!(raft_handle)?; ``` ### Manipulate FSM by RaftServiceClient