From 58a03bd3c146eaa7390bdcc822abf7ce91393fad Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 17:58:04 +0800 Subject: [PATCH 1/6] feat: implmenet bounded pool replenishment Signed-off-by: tison --- examples/postgres/src/main.rs | 11 +++++++-- fastpool/src/bounded.rs | 45 +++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/examples/postgres/src/main.rs b/examples/postgres/src/main.rs index e4c225e..bc09b93 100644 --- a/examples/postgres/src/main.rs +++ b/examples/postgres/src/main.rs @@ -16,17 +16,17 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use fastpool::ObjectStatus; use fastpool::bounded::Object; use fastpool::bounded::Pool; use fastpool::bounded::PoolConfig; -use fastpool::ObjectStatus; use futures::future::BoxFuture; -use sqlx::postgres::PgConnectOptions; use sqlx::Acquire; use sqlx::ConnectOptions; use sqlx::Connection; use sqlx::PgConnection; use sqlx::TransactionManager; +use sqlx::postgres::PgConnectOptions; #[derive(Debug, Clone)] pub struct ConnectionPool { @@ -46,6 +46,13 @@ impl ConnectionPool { tokio::time::sleep(REAP_IDLE_INTERVAL).await; if let Some(pool) = weak_pool.upgrade() { pool.retain(|_, m| m.last_used().elapsed() < IDLE_TIMEOUT); + + let status = pool.status(); + let gap = status.max_size - status.current_size; + match pool.replenish(gap).await { + Ok(n) => println!("Replenished {n} connections"), + Err(err) => eprintln!("Failed to replenish connections: {err}"), + } } else { break; } diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 13c119c..18e2efc 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -195,6 +195,51 @@ impl Pool { }) } + /// Replenishes the pool with at most `most` number of new objects. + /// + /// * Returns `Ok(n)` if successfully replenished `n` objects. + /// * Returns `Ok(0)` if no replenishment is needed. + /// * Returns `Err(e)` if an error occurred while replenishing. + pub async fn replenish(&self, mut most: usize) -> Result { + let permit = loop { + if most == 0 { + return Ok(0); + } + + match self.permits.try_acquire(most) { + Some(permit) => break permit, + None => { + most = most.min(self.permits.available_permits()); + continue; + } + } + }; + + most = most.saturating_sub(self.slots.lock().deque.len()); + if most == 0 { + return Ok(0); + } + + let mut idles = vec![]; + for _ in 0..most { + let object = self.manager.create().await?; + let state = ObjectState { + o: object, + status: ObjectStatus::default(), + }; + idles.push(state); + } + + let mut slots = self.slots.lock(); + slots.deque.extend(idles); + slots.current_size += most; + drop(slots); + + // ensure the permit is held at this point + drop(permit); + Ok(most) + } + /// Retrieves an [`Object`] from this [`Pool`]. /// /// This method should be called with a pool wrapped in an [`Arc`]. If the pool reaches the From e272ce337f01305f8a98cb079d985434c8257103 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 18:24:00 +0800 Subject: [PATCH 2/6] fixup Signed-off-by: tison --- examples/postgres/src/main.rs | 5 +--- fastpool/src/bounded.rs | 53 ++++++++++++++++++----------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/examples/postgres/src/main.rs b/examples/postgres/src/main.rs index bc09b93..83fd8c1 100644 --- a/examples/postgres/src/main.rs +++ b/examples/postgres/src/main.rs @@ -49,10 +49,7 @@ impl ConnectionPool { let status = pool.status(); let gap = status.max_size - status.current_size; - match pool.replenish(gap).await { - Ok(n) => println!("Replenished {n} connections"), - Err(err) => eprintln!("Failed to replenish connections: {err}"), - } + pool.replenish(gap).await; } else { break; } diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 18e2efc..5cf7137 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -197,47 +197,48 @@ impl Pool { /// Replenishes the pool with at most `most` number of new objects. /// - /// * Returns `Ok(n)` if successfully replenished `n` objects. - /// * Returns `Ok(0)` if no replenishment is needed. - /// * Returns `Err(e)` if an error occurred while replenishing. - pub async fn replenish(&self, mut most: usize) -> Result { - let permit = loop { - if most == 0 { - return Ok(0); - } - - match self.permits.try_acquire(most) { - Some(permit) => break permit, - None => { - most = most.min(self.permits.available_permits()); - continue; + /// Returns `n` where `n` is the number of objects added to the pool. + pub async fn replenish(&self, most: usize) -> usize { + let permit = { + let mut n = most; + loop { + match self.permits.try_acquire(most) { + Some(permit) => break permit, + None => { + n = n.min(self.permits.available_permits()); + continue; + } } } }; + if permit.permits() == 0 { + return 0; + } - most = most.saturating_sub(self.slots.lock().deque.len()); - if most == 0 { - return Ok(0); + let idle_count = self.slots.lock().deque.len(); + let gap = permit.permits().saturating_sub(idle_count); + if gap == 0 { + return 0; } let mut idles = vec![]; - for _ in 0..most { - let object = self.manager.create().await?; - let state = ObjectState { - o: object, - status: ObjectStatus::default(), - }; - idles.push(state); + for _ in 0..gap { + if let Ok(o) = self.manager.create().await { + let status = ObjectStatus::default(); + let state = ObjectState { o, status }; + idles.push(state); + } } + let replenished = idles.len(); let mut slots = self.slots.lock(); + slots.current_size += replenished; slots.deque.extend(idles); - slots.current_size += most; drop(slots); // ensure the permit is held at this point drop(permit); - Ok(most) + replenished } /// Retrieves an [`Object`] from this [`Pool`]. From b32c300b6fc8861f956806fddb82ed9a86a57c87 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 19:40:31 +0800 Subject: [PATCH 3/6] more Signed-off-by: tison --- Cargo.lock | 4 ++-- fastpool/Cargo.toml | 2 +- fastpool/src/bounded.rs | 16 +++++++++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c701e53..dbafbab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,9 +225,9 @@ dependencies = [ [[package]] name = "mea" -version = "0.3.2" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb74bfea0c87e63e0c96aa35c2c5251c552f1c935f0e47ddedded0db3f5f373e" +checksum = "8f95fb3fd830a995cad9668051189c4dcb53ca812e80d58e0781f617e91b4ee2" dependencies = [ "slab", ] diff --git a/fastpool/Cargo.toml b/fastpool/Cargo.toml index 70a5814..6bb599b 100644 --- a/fastpool/Cargo.toml +++ b/fastpool/Cargo.toml @@ -26,7 +26,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -mea = { version = "0.3.2" } +mea = { version = "0.3.6" } scopeguard = { version = "1.2.0" } [dev-dependencies] diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 5cf7137..1dfdce1 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -211,15 +211,21 @@ impl Pool { } } }; + if permit.permits() == 0 { return 0; } - let idle_count = self.slots.lock().deque.len(); - let gap = permit.permits().saturating_sub(idle_count); - if gap == 0 { - return 0; - } + let (gap, permit) = { + let mut permit = permit; + let idle_count = self.slots.lock().deque.len(); + if idle_count >= permit.permits() { + return 0; + } + + permit.release(idle_count); + (permit.permits(), permit) + }; let mut idles = vec![]; for _ in 0..gap { From d89c4ff55d3af10709a5d306c4fb875bd9966fd1 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 21:06:20 +0800 Subject: [PATCH 4/6] tidy Signed-off-by: tison --- fastpool/src/bounded.rs | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 1dfdce1..920f332 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -197,9 +197,9 @@ impl Pool { /// Replenishes the pool with at most `most` number of new objects. /// - /// Returns `n` where `n` is the number of objects added to the pool. + /// Returns the number of objects that are actually replenished to the pool. pub async fn replenish(&self, most: usize) -> usize { - let permit = { + let mut permit = { let mut n = most; loop { match self.permits.try_acquire(most) { @@ -216,34 +216,36 @@ impl Pool { return 0; } - let (gap, permit) = { - let mut permit = permit; - let idle_count = self.slots.lock().deque.len(); - if idle_count >= permit.permits() { + let gap = { + let idles = self.slots.lock().deque.len(); + if idles >= permit.permits() { return 0; } - permit.release(idle_count); - (permit.permits(), permit) + // exclude idle objects that already exists + permit.release(idles); + permit.permits() }; - let mut idles = vec![]; + let mut replenished = 0; for _ in 0..gap { if let Ok(o) = self.manager.create().await { let status = ObjectStatus::default(); let state = ObjectState { o, status }; - idles.push(state); + + let mut slots = self.slots.lock(); + slots.current_size += 1; + slots.deque.push_back(state); + drop(slots); + + replenished += 1; + permit.release(1); + } else { + // always release one permit to unblock other waiters + permit.release(1); } } - let replenished = idles.len(); - - let mut slots = self.slots.lock(); - slots.current_size += replenished; - slots.deque.extend(idles); - drop(slots); - // ensure the permit is held at this point - drop(permit); replenished } From 4859eef0b74e4c805f31e454b5886bfdff0c6eef Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 21:24:47 +0800 Subject: [PATCH 5/6] test and fix Signed-off-by: tison --- fastpool/src/bounded.rs | 9 ++-- fastpool/tests/replenish_tests.rs | 75 +++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 fastpool/tests/replenish_tests.rs diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 920f332..115cbac 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -202,7 +202,7 @@ impl Pool { let mut permit = { let mut n = most; loop { - match self.permits.try_acquire(most) { + match self.permits.try_acquire(n) { Some(permit) => break permit, None => { n = n.min(self.permits.available_permits()); @@ -239,11 +239,10 @@ impl Pool { drop(slots); replenished += 1; - permit.release(1); - } else { - // always release one permit to unblock other waiters - permit.release(1); } + + // always release one permit to unblock other waiters + permit.release(1); } replenished diff --git a/fastpool/tests/replenish_tests.rs b/fastpool/tests/replenish_tests.rs new file mode 100644 index 0000000..b35cd9a --- /dev/null +++ b/fastpool/tests/replenish_tests.rs @@ -0,0 +1,75 @@ +// Copyright 2025 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::Infallible; +use std::sync::Arc; + +use fastpool::ManageObject; +use fastpool::ObjectStatus; +use fastpool::bounded::Pool; +use fastpool::bounded::PoolConfig; + +#[tokio::test] +async fn test_simple_replenish() { + #[derive(Default)] + struct Manager; + + impl ManageObject for Manager { + type Object = (); + type Error = Infallible; + + async fn create(&self) -> Result { + Ok(()) + } + + async fn is_recyclable( + &self, + _o: &mut Self::Object, + _status: &ObjectStatus, + ) -> Result<(), Self::Error> { + Ok(()) + } + } + + const MAX_SIZE: usize = 2; + + fn make_default() -> Arc> { + Pool::new(PoolConfig::new(MAX_SIZE), Manager) + } + + for i in 0..5 { + let pool = make_default(); + let n = pool.replenish(i).await; + assert_eq!(n, i.min(MAX_SIZE)); + } + + // stage one idle object + { + let pool = make_default(); + pool.get().await.unwrap(); + let n = pool.replenish(2).await; + assert_eq!(n, 1); + } + + // stage two idle objects + { + let pool = make_default(); + let o1 = pool.get().await.unwrap(); + let o2 = pool.get().await.unwrap(); + drop((o1, o2)); + + let n = pool.replenish(2).await; + assert_eq!(n, 0); + } +} From 41a7caf5239f768ece2180cf36630684e133f7f5 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 21:51:17 +0800 Subject: [PATCH 6/6] catch up Signed-off-by: tison --- Cargo.lock | 80 +++++++++++++++---------------- fastpool/Cargo.toml | 2 +- fastpool/src/bounded.rs | 23 +++++++-- fastpool/tests/replenish_tests.rs | 2 +- 4 files changed, 61 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbafbab..a6e9e17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,9 +75,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" dependencies = [ "addr2line", "cfg-if", @@ -108,9 +108,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.5.31" +version = "4.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" +checksum = "eccb054f56cbd38340b380d4a8e69ef1f02f1af43db2f0cc817a4774d80ae071" dependencies = [ "clap_builder", "clap_derive", @@ -118,9 +118,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.31" +version = "4.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" +checksum = "efd9466fac8543255d3b1fcad4762c5e116ffe808c8a3043d4263cd4fd4862a2" dependencies = [ "anstream", "anstyle", @@ -130,9 +130,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.28" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" dependencies = [ "heck", "proc-macro2", @@ -166,9 +166,9 @@ checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" [[package]] name = "errno" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" +checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", "windows-sys 0.59.0", @@ -203,15 +203,15 @@ checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "libc" -version = "0.2.170" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "linux-raw-sys" -version = "0.4.15" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" [[package]] name = "lock_api" @@ -225,9 +225,9 @@ dependencies = [ [[package]] name = "mea" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f95fb3fd830a995cad9668051189c4dcb53ca812e80d58e0781f617e91b4ee2" +checksum = "3dd4c034214a0a3656fafe3a600d6ea4f9329ec9a35168126ff548af33375fc0" dependencies = [ "slab", ] @@ -240,9 +240,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "miniz_oxide" -version = "0.8.5" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" dependencies = [ "adler2", ] @@ -269,9 +269,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.3" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "parking_lot" @@ -304,27 +304,27 @@ checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "proc-macro2" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.39" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] [[package]] name = "redox_syscall" -version = "0.5.10" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ "bitflags", ] @@ -337,9 +337,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "0.38.44" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ "bitflags", "errno", @@ -356,9 +356,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "signal-hook-registry" -version = "1.4.2" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" dependencies = [ "libc", ] @@ -374,15 +374,15 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" +checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" [[package]] name = "socket2" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" dependencies = [ "libc", "windows-sys 0.52.0", @@ -396,9 +396,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.99" +version = "2.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" dependencies = [ "proc-macro2", "quote", @@ -407,9 +407,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.43.0" +version = "1.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" dependencies = [ "backtrace", "bytes", @@ -454,9 +454,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "which" -version = "7.0.2" +version = "7.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2774c861e1f072b3aadc02f8ba886c26ad6321567ecc294c935434cad06f1283" +checksum = "24d643ce3fd3e5b54854602a080f34fb10ab75e0b813ee32d00ca2b44fa74762" dependencies = [ "either", "env_home", diff --git a/fastpool/Cargo.toml b/fastpool/Cargo.toml index 6bb599b..fc16b53 100644 --- a/fastpool/Cargo.toml +++ b/fastpool/Cargo.toml @@ -26,7 +26,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -mea = { version = "0.3.6" } +mea = { version = "0.3.7" } scopeguard = { version = "1.2.0" } [dev-dependencies] diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 115cbac..3e3e521 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -222,8 +222,18 @@ impl Pool { return 0; } - // exclude idle objects that already exists - permit.release(idles); + match permit.split(idles) { + None => unreachable!( + "idles ({}) should be less than permits ({})", + idles, + permit.permits() + ), + Some(p) => { + // reduced by existing idle objects and release the corresponding permits + drop(p); + } + } + permit.permits() }; @@ -241,8 +251,13 @@ impl Pool { replenished += 1; } - // always release one permit to unblock other waiters - permit.release(1); + match permit.split(1) { + None => unreachable!("permit must be greater than 0 at this point"), + Some(p) => { + // always release one permit to unblock other waiters + drop(p); + } + } } replenished diff --git a/fastpool/tests/replenish_tests.rs b/fastpool/tests/replenish_tests.rs index b35cd9a..0b2ddc6 100644 --- a/fastpool/tests/replenish_tests.rs +++ b/fastpool/tests/replenish_tests.rs @@ -21,7 +21,7 @@ use fastpool::bounded::Pool; use fastpool::bounded::PoolConfig; #[tokio::test] -async fn test_simple_replenish() { +async fn test_replenish() { #[derive(Default)] struct Manager;