diff --git a/Cargo.lock b/Cargo.lock index c701e53..a6e9e17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,9 +75,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" dependencies = [ "addr2line", "cfg-if", @@ -108,9 +108,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.5.31" +version = "4.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" +checksum = "eccb054f56cbd38340b380d4a8e69ef1f02f1af43db2f0cc817a4774d80ae071" dependencies = [ "clap_builder", "clap_derive", @@ -118,9 +118,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.31" +version = "4.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" +checksum = "efd9466fac8543255d3b1fcad4762c5e116ffe808c8a3043d4263cd4fd4862a2" dependencies = [ "anstream", "anstyle", @@ -130,9 +130,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.28" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" dependencies = [ "heck", "proc-macro2", @@ -166,9 +166,9 @@ checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" [[package]] name = "errno" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" +checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", "windows-sys 0.59.0", @@ -203,15 +203,15 @@ checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "libc" -version = "0.2.170" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "linux-raw-sys" -version = "0.4.15" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" [[package]] name = "lock_api" @@ -225,9 +225,9 @@ dependencies = [ [[package]] name = "mea" -version = "0.3.2" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb74bfea0c87e63e0c96aa35c2c5251c552f1c935f0e47ddedded0db3f5f373e" +checksum = "3dd4c034214a0a3656fafe3a600d6ea4f9329ec9a35168126ff548af33375fc0" dependencies = [ "slab", ] @@ -240,9 +240,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "miniz_oxide" -version = "0.8.5" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" dependencies = [ "adler2", ] @@ -269,9 +269,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.3" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "parking_lot" @@ -304,27 +304,27 @@ checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "proc-macro2" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.39" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] [[package]] name = "redox_syscall" -version = "0.5.10" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ "bitflags", ] @@ -337,9 +337,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "0.38.44" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ "bitflags", "errno", @@ -356,9 +356,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "signal-hook-registry" -version = "1.4.2" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" dependencies = [ "libc", ] @@ -374,15 +374,15 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" +checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" [[package]] name = "socket2" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" dependencies = [ "libc", "windows-sys 0.52.0", @@ -396,9 +396,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" -version = "2.0.99" +version = "2.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" dependencies = [ "proc-macro2", "quote", @@ -407,9 +407,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.43.0" +version = "1.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" dependencies = [ "backtrace", "bytes", @@ -454,9 +454,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "which" -version = "7.0.2" +version = "7.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2774c861e1f072b3aadc02f8ba886c26ad6321567ecc294c935434cad06f1283" +checksum = "24d643ce3fd3e5b54854602a080f34fb10ab75e0b813ee32d00ca2b44fa74762" dependencies = [ "either", "env_home", diff --git a/examples/postgres/src/main.rs b/examples/postgres/src/main.rs index e4c225e..83fd8c1 100644 --- a/examples/postgres/src/main.rs +++ b/examples/postgres/src/main.rs @@ -16,17 +16,17 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use fastpool::ObjectStatus; use fastpool::bounded::Object; use fastpool::bounded::Pool; use fastpool::bounded::PoolConfig; -use fastpool::ObjectStatus; use futures::future::BoxFuture; -use sqlx::postgres::PgConnectOptions; use sqlx::Acquire; use sqlx::ConnectOptions; use sqlx::Connection; use sqlx::PgConnection; use sqlx::TransactionManager; +use sqlx::postgres::PgConnectOptions; #[derive(Debug, Clone)] pub struct ConnectionPool { @@ -46,6 +46,10 @@ impl ConnectionPool { tokio::time::sleep(REAP_IDLE_INTERVAL).await; if let Some(pool) = weak_pool.upgrade() { pool.retain(|_, m| m.last_used().elapsed() < IDLE_TIMEOUT); + + let status = pool.status(); + let gap = status.max_size - status.current_size; + pool.replenish(gap).await; } else { break; } diff --git a/fastpool/Cargo.toml b/fastpool/Cargo.toml index 70a5814..fc16b53 100644 --- a/fastpool/Cargo.toml +++ b/fastpool/Cargo.toml @@ -26,7 +26,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -mea = { version = "0.3.2" } +mea = { version = "0.3.7" } scopeguard = { version = "1.2.0" } [dev-dependencies] diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 13c119c..3e3e521 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -195,6 +195,74 @@ impl Pool { }) } + /// Replenishes the pool with at most `most` number of new objects. + /// + /// Returns the number of objects that are actually replenished to the pool. + pub async fn replenish(&self, most: usize) -> usize { + let mut permit = { + let mut n = most; + loop { + match self.permits.try_acquire(n) { + Some(permit) => break permit, + None => { + n = n.min(self.permits.available_permits()); + continue; + } + } + } + }; + + if permit.permits() == 0 { + return 0; + } + + let gap = { + let idles = self.slots.lock().deque.len(); + if idles >= permit.permits() { + return 0; + } + + match permit.split(idles) { + None => unreachable!( + "idles ({}) should be less than permits ({})", + idles, + permit.permits() + ), + Some(p) => { + // reduced by existing idle objects and release the corresponding permits + drop(p); + } + } + + permit.permits() + }; + + let mut replenished = 0; + for _ in 0..gap { + if let Ok(o) = self.manager.create().await { + let status = ObjectStatus::default(); + let state = ObjectState { o, status }; + + let mut slots = self.slots.lock(); + slots.current_size += 1; + slots.deque.push_back(state); + drop(slots); + + replenished += 1; + } + + match permit.split(1) { + None => unreachable!("permit must be greater than 0 at this point"), + Some(p) => { + // always release one permit to unblock other waiters + drop(p); + } + } + } + + replenished + } + /// Retrieves an [`Object`] from this [`Pool`]. /// /// This method should be called with a pool wrapped in an [`Arc`]. If the pool reaches the diff --git a/fastpool/tests/replenish_tests.rs b/fastpool/tests/replenish_tests.rs new file mode 100644 index 0000000..0b2ddc6 --- /dev/null +++ b/fastpool/tests/replenish_tests.rs @@ -0,0 +1,75 @@ +// Copyright 2025 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::Infallible; +use std::sync::Arc; + +use fastpool::ManageObject; +use fastpool::ObjectStatus; +use fastpool::bounded::Pool; +use fastpool::bounded::PoolConfig; + +#[tokio::test] +async fn test_replenish() { + #[derive(Default)] + struct Manager; + + impl ManageObject for Manager { + type Object = (); + type Error = Infallible; + + async fn create(&self) -> Result { + Ok(()) + } + + async fn is_recyclable( + &self, + _o: &mut Self::Object, + _status: &ObjectStatus, + ) -> Result<(), Self::Error> { + Ok(()) + } + } + + const MAX_SIZE: usize = 2; + + fn make_default() -> Arc> { + Pool::new(PoolConfig::new(MAX_SIZE), Manager) + } + + for i in 0..5 { + let pool = make_default(); + let n = pool.replenish(i).await; + assert_eq!(n, i.min(MAX_SIZE)); + } + + // stage one idle object + { + let pool = make_default(); + pool.get().await.unwrap(); + let n = pool.replenish(2).await; + assert_eq!(n, 1); + } + + // stage two idle objects + { + let pool = make_default(); + let o1 = pool.get().await.unwrap(); + let o2 = pool.get().await.unwrap(); + drop((o1, o2)); + + let n = pool.replenish(2).await; + assert_eq!(n, 0); + } +}