diff --git a/Cargo.lock b/Cargo.lock index 0916f44..6912db9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,9 +276,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "bindgen" @@ -394,9 +394,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.47" +version = "1.2.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd405d82c84ff7f35739f175f67d8b9fb7687a0e84ccdc78bd3568839827cf07" +checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" dependencies = [ "find-msvc-tools", "jobserver", @@ -506,12 +506,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-utils" -version = "0.8.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" - [[package]] name = "crypto-bigint" version = "0.5.5" @@ -585,20 +579,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dashmap" -version = "6.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" -dependencies = [ - "cfg-if", - "crossbeam-utils", - "hashbrown 0.14.5", - "lock_api", - "once_cell", - "parking_lot_core", -] - [[package]] name = "data-encoding" version = "2.9.0" @@ -1030,12 +1010,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "hashbrown" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" - [[package]] name = "hashbrown" version = "0.15.5" @@ -1103,12 +1077,11 @@ dependencies = [ [[package]] name = "http" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" dependencies = [ "bytes", - "fnv", "itoa", ] @@ -1172,9 +1145,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "bytes", "futures-core", @@ -1233,9 +1206,9 @@ checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" dependencies = [ "icu_collections", "icu_locale_core", @@ -1247,9 +1220,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" [[package]] name = "icu_provider" @@ -1350,9 +1323,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.82" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" dependencies = [ "once_cell", "wasm-bindgen", @@ -1410,9 +1383,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.177" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libloading" @@ -1453,15 +1426,15 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "matchit" -version = "0.8.6" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f926ade0c4e170215ae43342bf13b9310a437609c81f29f86c5df6657582ef9" +checksum = "b3eede3bdf92f3b4f9dc04072a9ce5ab557d5ec9038773bf9ffcd5588b3cc05b" [[package]] name = "memchr" @@ -1503,9 +1476,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "wasi", @@ -2164,9 +2137,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" +checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" dependencies = [ "zeroize", ] @@ -2195,12 +2168,34 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "saa" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3192d99ee2f69effeacef86c9fa80e75b6604e2a5d3ade6856d206b519521e53" + +[[package]] +name = "scc" +version = "3.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41314cecf05a9988a3717479e80f132c00f64298489f177c268bd675aef03fcc" +dependencies = [ + "saa", + "sdd", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sdd" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7168ecf885fdd3920ade15d50189593b076e1d060b60406a745766380195d65a" + [[package]] name = "sec1" version = "0.7.3" @@ -2338,9 +2333,9 @@ dependencies = [ [[package]] name = "simd-adler32" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" [[package]] name = "simd-json" @@ -2494,7 +2489,6 @@ dependencies = [ "brotli", "bytes", "cookie", - "dashmap", "flate2", "futures-util", "http", @@ -2516,6 +2510,7 @@ dependencies = [ "prost", "rustls", "rustls-pemfile", + "scc", "serde", "serde_bytes", "serde_json", @@ -2745,9 +2740,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.23.7" +version = "0.23.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832" dependencies = [ "indexmap", "toml_datetime", @@ -2766,9 +2761,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -2777,9 +2772,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -2788,9 +2783,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" dependencies = [ "once_cell", "valuable", @@ -2809,9 +2804,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.20" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ "nu-ansi-term", "sharded-slab", @@ -2916,9 +2911,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.18.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.4", "js-sys", @@ -2984,9 +2979,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" dependencies = [ "cfg-if", "once_cell", @@ -2997,9 +2992,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3007,9 +3002,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" dependencies = [ "bumpalo", "proc-macro2", @@ -3020,9 +3015,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" dependencies = [ "unicode-ident", ] @@ -3222,9 +3217,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "winnow" -version = "0.7.13" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" +checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" dependencies = [ "memchr", ] @@ -3266,18 +3261,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.28" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43fa6694ed34d6e57407afbccdeecfa268c470a7d2a5b0cf49ce9fcc345afb90" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.28" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c640b22cd9817fae95be82f0d2f90b11f7605f6c319d16705c459b27ac2cbc26" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index a0ef025..353fdd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ base64 = "0.22.1" brotli = { version = "8.0.1", optional = true } bytes = "1.10.1" cookie = { version = "0.18.1", features = ["private", "signed"] } -dashmap = "6.1.0" flate2 = { version = "1.1.2", optional = true } futures-util = "0.3.31" http = "1.3.1" @@ -28,7 +27,7 @@ http-body = "1.0.1" http-body-util = "0.1.3" hyper = { version = "1.6.0", features = ["full"] } hyper-util = { version = "0.1.14", features = ["tokio"] } -matchit = "0.8.6" +matchit = "0.9.1" mime = "0.3.17" mime_guess = "2.0.5" multer = { version = "3.1.0", optional = true } @@ -46,6 +45,7 @@ simd-json = { version = "0.15.1", optional = true } prometheus = { version = "0.13.4", optional = true } opentelemetry = { version = "0.26.0", optional = true } opentelemetry-prometheus = { version = "0.16.0", optional = true } +scc = "3.4.8" tikv-jemallocator = { version = "0.6.0", optional = true } tokio = { version = "1.48.0", features = ["full"] } tokio-rustls = { version = "0.26.2", optional = true } diff --git a/src/plugins/idempotency.rs b/src/plugins/idempotency.rs index 4172d28..30568af 100644 --- a/src/plugins/idempotency.rs +++ b/src/plugins/idempotency.rs @@ -27,12 +27,12 @@ use std::{ use anyhow::Result; use bytes::Bytes; -use dashmap::DashMap; use http::{ HeaderName, HeaderValue, Method, StatusCode, header::{CONTENT_LENGTH, CONTENT_TYPE, LOCATION, RETRY_AFTER}, }; use http_body_util::BodyExt; +use scc::HashMap as SccHashMap; use sha1::{Digest, Sha1}; use tokio::{sync::Notify, time::timeout}; @@ -173,15 +173,15 @@ enum Entry { } #[derive(Clone)] -struct Store(Arc>); +struct Store(Arc>); impl Store { fn new() -> Self { - Self(Arc::new(DashMap::new())) + Self(Arc::new(SccHashMap::new())) } fn get(&self, k: &str) -> Option { - self.0.get(k).map(|e| match &*e { + self.0.get_sync(k).map(|e| match &*e { Entry::InFlight { payload_sig, notify, @@ -197,28 +197,28 @@ impl Store { fn insert_inflight(&self, k: String, payload_sig: [u8; 20]) -> Arc { let notify = Arc::new(Notify::new()); - self.0.insert( + std::mem::drop(self.0.insert_sync( k, Entry::InFlight { payload_sig, notify: notify.clone(), started: Instant::now(), }, - ); + )); notify } fn complete(&self, k: String, completed: Completed) { - self.0.insert(k, Entry::Completed(completed)); + std::mem::drop(self.0.insert_sync(k, Entry::Completed(completed))); } fn remove(&self, k: &str) { - let _ = self.0.remove(k); + let _ = self.0.remove_sync(k); } fn retain_expired(&self) { let now = Instant::now(); - self.0.retain(|_, v| match v { + self.0.retain_sync(|_, v| match v { Entry::Completed(c) => c.expires_at > now, Entry::InFlight { .. } => true, }); diff --git a/src/plugins/rate_limiter.rs b/src/plugins/rate_limiter.rs index 335869b..d0eb048 100644 --- a/src/plugins/rate_limiter.rs +++ b/src/plugins/rate_limiter.rs @@ -58,8 +58,8 @@ use std::{ }; use anyhow::Result; -use dashmap::DashMap; use http::StatusCode; +use scc::HashMap as SccHashMap; use tokio::time; use crate::{ @@ -192,7 +192,7 @@ impl RateLimiterBuilder { pub fn build(self) -> RateLimiterPlugin { RateLimiterPlugin { cfg: self.0, - store: Arc::new(DashMap::new()), + store: Arc::new(SccHashMap::new()), task_started: Arc::new(AtomicBool::new(false)), } } @@ -299,7 +299,7 @@ pub struct RateLimiterPlugin { /// Rate limiting configuration parameters. cfg: Config, /// Concurrent map storing token buckets for each IP address. - store: Arc>, + store: Arc>, /// Flag to ensure background task is spawned only once. task_started: Arc, } @@ -333,7 +333,7 @@ impl TakoPlugin for RateLimiterPlugin { loop { tick.tick().await; let now = Instant::now(); - store.retain(|_, b| { + store.retain_sync(|_, b| { b.available = (b.available + requests_to_add).min(cfg.max_requests as f64); now.duration_since(b.last_seen) < purge_after }); @@ -359,13 +359,13 @@ impl TakoPlugin for RateLimiterPlugin { /// use tako::middleware::Next; /// use tako::types::Request; /// use std::sync::Arc; -/// use dashmap::DashMap; +/// use scc::HashMap as SccHashMap; /// /// # async fn example() { /// # let req = Request::builder().body(tako::body::TakoBody::empty()).unwrap(); /// # let next = Next { middlewares: Arc::new(vec![]), endpoint: Arc::new(|_| Box::pin(async { tako::types::Response::new(tako::body::TakoBody::empty()) })) }; /// let config = Config::default(); -/// let store = Arc::new(DashMap::new()); +/// let store = Arc::new(SccHashMap::new()); /// let response = retain(req, next, config, store).await; /// # } /// ``` @@ -373,7 +373,7 @@ async fn retain( req: Request, next: Next, cfg: Config, - store: Arc>, + store: Arc>, ) -> impl Responder { let ip = req .extensions() @@ -381,7 +381,7 @@ async fn retain( .map(|sa| sa.ip()) .unwrap_or(IpAddr::from([0, 0, 0, 0])); - let mut entry = store.entry(ip).or_insert_with(|| Bucket { + let mut entry = store.entry_sync(ip).or_insert_with(|| Bucket { available: cfg.max_requests as f64, last_seen: Instant::now(), }); diff --git a/src/router.rs b/src/router.rs index 1ca8fcf..7364bcf 100644 --- a/src/router.rs +++ b/src/router.rs @@ -35,10 +35,10 @@ use std::{ sync::{Arc, Weak}, }; -use dashmap::DashMap; use http::Method; use http::StatusCode; use parking_lot::RwLock; +use scc::HashMap as SccHashMap; use crate::{ body::TakoBody, @@ -88,9 +88,9 @@ use std::sync::atomic::AtomicBool; #[doc(alias = "router")] pub struct Router { /// Map of registered routes keyed by method. - inner: DashMap>>, + inner: SccHashMap>>, /// An easy-to-iterate index of the same routes so we can access the `Arc` values - routes: DashMap>>, + routes: SccHashMap>>, /// Global middleware chain applied to all routes. pub(crate) middlewares: RwLock>, /// Optional fallback handler executed when no route matches. @@ -110,8 +110,8 @@ impl Router { /// Creates a new, empty router. pub fn new() -> Self { let router = Self { - inner: DashMap::default(), - routes: DashMap::default(), + inner: SccHashMap::default(), + routes: SccHashMap::default(), middlewares: RwLock::new(Vec::new()), fallback: None, #[cfg(feature = "plugins")] @@ -168,15 +168,18 @@ impl Router { None, )); - let mut method_router = self.inner.entry(method.clone()).or_default(); + let mut method_router = self.inner.entry_sync(method.clone()).or_default(); - if let Err(err) = method_router.insert(path.to_string(), route.clone()) { + if let Err(err) = method_router + .get_mut() + .insert(path.to_string(), route.clone()) + { panic!("Failed to register route: {err}"); } self .routes - .entry(method) + .entry_sync(method) .or_default() .push(Arc::downgrade(&route)); @@ -221,15 +224,18 @@ impl Router { Some(true), )); - let mut method_router = self.inner.entry(method.clone()).or_default(); + let mut method_router = self.inner.entry_sync(method.clone()).or_default(); - if let Err(err) = method_router.insert(path.to_string(), route.clone()) { + if let Err(err) = method_router + .get_mut() + .insert(path.to_string(), route.clone()) + { panic!("Failed to register route: {err}"); } self .routes - .entry(method) + .entry_sync(method) .or_default() .push(Arc::downgrade(&route)); @@ -259,7 +265,7 @@ impl Router { let method = req.method().clone(); let path = req.uri().path().to_string(); - if let Some(method_router) = self.inner.get(&method) + if let Some(method_router) = self.inner.get_sync(&method) && let Ok(matched) = method_router.at(&path) { let route = matched.value; @@ -340,7 +346,7 @@ impl Router { format!("{path}/") }; - if let Some(method_router) = self.inner.get(&method) + if let Some(method_router) = self.inner.get_sync(&method) && let Ok(matched) = method_router.at(&tsr_path) && matched.value.tsr { @@ -632,8 +638,8 @@ impl Router { pub fn merge(&mut self, other: Router) { let upstream_globals = other.middlewares.read().clone(); - for (method, weak_vec) in other.routes.into_iter() { - let mut target_router = self.inner.entry(method.clone()).or_default(); + other.routes.iter_sync(|method, weak_vec| { + let mut target_router = self.inner.entry_sync(method.clone()).or_default(); for weak in weak_vec { if let Some(route) = weak.upgrade() { @@ -642,16 +648,20 @@ impl Router { rmw.push_front(mw.clone()); } - let _ = target_router.insert(route.path.clone(), route.clone()); + let _ = target_router + .get_mut() + .insert(route.path.clone(), route.clone()); self .routes - .entry(method.clone()) + .entry_sync(method.clone()) .or_default() .push(Arc::downgrade(&route)); } } - } + + true + }); #[cfg(feature = "signals")] self.signals.merge_from(&other.signals); diff --git a/src/signals.rs b/src/signals.rs index d44c158..91a1871 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -8,9 +8,9 @@ use crate::types::BuildHasher; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{any::Any, collections::HashMap, sync::Arc}; -use dashmap::DashMap; use futures_util::future::{BoxFuture, join_all}; use once_cell::sync::Lazy; +use scc::HashMap as SccHashMap; use tokio::sync::{broadcast, mpsc}; use tokio::time::{Duration, timeout}; @@ -100,10 +100,10 @@ pub type SignalStream = mpsc::UnboundedReceiver; #[derive(Default)] struct Inner { - handlers: DashMap>, - topics: DashMap>, - rpc: DashMap, - exporters: DashMap, + handlers: SccHashMap>, + topics: SccHashMap>, + rpc: SccHashMap, + exporters: SccHashMap, } /// Shared arbiter used to register and dispatch named signals. @@ -163,12 +163,12 @@ impl SignalArbiter { /// Returns (and lazily initializes) the broadcast sender for a signal id. pub(crate) fn topic_sender(&self, id: &str) -> broadcast::Sender { - if let Some(existing) = self.inner.topics.get(id) { + if let Some(existing) = self.inner.topics.get_sync(id) { existing.clone() } else { let cap = GLOBAL_BROADCAST_CAPACITY.load(Ordering::SeqCst); let (tx, _rx) = broadcast::channel(cap); - let entry = self.inner.topics.entry(id.to_string()).or_insert(tx); + let entry = self.inner.topics.entry_sync(id.to_string()).or_insert(tx); entry.clone() } } @@ -190,7 +190,7 @@ impl SignalArbiter { self .inner .handlers - .entry(id) + .entry_sync(id) .or_insert_with(Vec::new) .push(handler); } @@ -229,19 +229,20 @@ impl SignalArbiter { /// Broadcasts a signal to all subscribers without awaiting handler completion. pub(crate) fn broadcast(&self, signal: Signal) { // Exact id subscribers - if let Some(sender) = self.inner.topics.get(&signal.id) { + if let Some(sender) = self.inner.topics.get_sync(&signal.id) { let _ = sender.send(signal.clone()); } // Prefix subscribers: keys ending with '*' - for entry in self.inner.topics.iter() { - let key = entry.key(); + self.inner.topics.iter_sync(|key, v| { if let Some(prefix) = key.strip_suffix('*') { if signal.id.starts_with(prefix) { - let _ = entry.value().send(signal.clone()); + let _ = v.send(signal.clone()); } } - } + + true + }); } /// Subscribes using a filter function on top of an id-based subscription. @@ -311,7 +312,7 @@ impl SignalArbiter { }) }); - self.inner.rpc.insert(id_str, handler); + std::mem::drop(self.inner.rpc.insert_sync(id_str, handler)); } /// Calls a typed RPC handler and returns a shared pointer to the response. @@ -321,7 +322,7 @@ impl SignalArbiter { Res: Send + Sync + 'static, { let id_str = id.as_ref(); - let entry = self.inner.rpc.get(id_str)?; + let entry = self.inner.rpc.get_sync(id_str)?; let handler = entry.clone(); drop(entry); @@ -341,7 +342,7 @@ impl SignalArbiter { Res: Send + Sync + Clone + 'static, { let id_str = id.as_ref(); - let entry = self.inner.rpc.get(id_str); + let entry = self.inner.rpc.get_sync(id_str); let entry = match entry { Some(e) => e, None => return Err(RpcError::NoHandler), @@ -393,11 +394,12 @@ impl SignalArbiter { self.broadcast(signal.clone()); // Call exporters (non-blocking from the perspective of handlers). - for entry in self.inner.exporters.iter() { - (entry.value())(&signal); - } + self.inner.exporters.iter_sync(|_, v| { + v(&signal); + true + }); - if let Some(entry) = self.inner.handlers.get(&signal.id) { + if let Some(entry) = self.inner.handlers.get_sync(&signal.id) { let handlers = entry.clone(); drop(entry); @@ -425,7 +427,7 @@ impl SignalArbiter { // Use the pointer address as a simple, best-effort key. let key = Arc::into_raw(Arc::new(())) as u64; let exporter: SignalExporter = Arc::new(exporter); - self.inner.exporters.insert(key, exporter); + std::mem::drop(self.inner.exporters.insert_sync(key, exporter)); } /// Merges all handlers from `other` into `self`. @@ -433,73 +435,64 @@ impl SignalArbiter { /// This is used by router merging so that signal handlers attached to /// a merged router continue to be active. pub(crate) fn merge_from(&self, other: &SignalArbiter) { - for entry in other.inner.handlers.iter() { - let id = entry.key().clone(); - let handlers = entry.value().clone(); - + other.inner.handlers.iter_sync(|k, v| { self .inner .handlers - .entry(id) + .entry_sync(k.clone()) .or_insert_with(Vec::new) - .extend(handlers); - } + .extend(v.clone()); - for entry in other.inner.topics.iter() { - let id = entry.key().clone(); - let sender = entry.value().clone(); - self.inner.topics.entry(id).or_insert(sender); - } + true + }); - for entry in other.inner.rpc.iter() { - let id = entry.key().clone(); - let handler = entry.value().clone(); - self.inner.rpc.insert(id, handler); - } + other.inner.topics.iter_sync(|k, v| { + self.inner.topics.entry_sync(k.clone()).or_insert(v.clone()); + true + }); - for entry in other.inner.exporters.iter() { - let key = entry.key().clone(); - let exporter = entry.value().clone(); - self.inner.exporters.insert(key, exporter); - } + other.inner.rpc.iter_sync(|k, v| { + let _ = self.inner.rpc.insert_sync(k.clone(), v.clone()); + true + }); + + other.inner.exporters.iter_sync(|k, v| { + let _ = self.inner.exporters.insert_sync(k.clone(), v.clone()); + true + }); } /// Returns a list of known signal ids (exact topics) currently registered. pub fn signal_ids(&self) -> Vec { - self - .inner - .topics - .iter() - .filter_map(|entry| { - let id = entry.key(); - if id.ends_with('*') { - None - } else { - Some(id.clone()) - } - }) - .collect() + let mut ids = Vec::new(); + self.inner.topics.iter_sync(|k, _| { + if !k.ends_with('*') { + ids.push(k.clone()); + } + true + }); + ids } /// Returns a list of known signal prefixes (topics ending with '*'). pub fn signal_prefixes(&self) -> Vec { - self - .inner - .topics - .iter() - .filter_map(|entry| { - let id = entry.key(); - if id.ends_with('*') { - Some(id.clone()) - } else { - None - } - }) - .collect() + let mut prefixes = Vec::new(); + self.inner.topics.iter_sync(|k, _| { + if k.ends_with('*') { + prefixes.push(k.clone()); + } + true + }); + prefixes } /// Returns a list of registered RPC ids. pub fn rpc_ids(&self) -> Vec { - self.inner.rpc.iter().map(|e| e.key().clone()).collect() + let mut ids = Vec::new(); + self.inner.rpc.iter_sync(|k, _| { + ids.push(k.clone()); + true + }); + ids } } diff --git a/src/state.rs b/src/state.rs index 0719147..85bf806 100644 --- a/src/state.rs +++ b/src/state.rs @@ -33,8 +33,8 @@ use std::{ sync::Arc, }; -use dashmap::DashMap; use once_cell::sync::Lazy; +use scc::HashMap as SccHashMap; /// Global state storage using thread-safe concurrent hash map. /// @@ -42,8 +42,8 @@ use once_cell::sync::Lazy; /// shared across different parts of the application. Values are stored as type-erased /// `Arc` to enable storage of arbitrary types while maintaining /// thread safety. -pub(crate) static GLOBAL_STATE: Lazy>> = - Lazy::new(DashMap::new); +pub(crate) static GLOBAL_STATE: Lazy>> = + Lazy::new(SccHashMap::new); /// Stores a value in the global state, keyed by its concrete type `T`. /// @@ -69,7 +69,7 @@ pub(crate) static GLOBAL_STATE: Lazy> /// set_state(config); /// ``` pub fn set_state(value: T) { - GLOBAL_STATE.insert(TypeId::of::(), Arc::new(value)); + std::mem::drop(GLOBAL_STATE.insert_sync(TypeId::of::(), Arc::new(value))); } /// Retrieves a value from the global state by its concrete type `T`. @@ -97,7 +97,7 @@ pub fn set_state(value: T) { /// ``` pub fn get_state() -> Option> { GLOBAL_STATE - .get(&TypeId::of::()) + .get_sync(&TypeId::of::()) .map(|v| v.clone()) .and_then(|v| v.downcast::().ok()) }