From 226625dbca51c6a95a0d5daf617f7169185d0ef6 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 7 May 2025 16:16:26 +0800 Subject: [PATCH] feat: support retain for unbounded pool Signed-off-by: tison --- fastpool/src/bounded.rs | 75 ++++++++++----------------------- fastpool/src/lib.rs | 2 + fastpool/src/retain_spec.rs | 83 +++++++++++++++++++++++++++++++++++++ fastpool/src/unbounded.rs | 52 +++++++++++++++++++++++ 4 files changed, 159 insertions(+), 53 deletions(-) create mode 100644 fastpool/src/retain_spec.rs diff --git a/fastpool/src/bounded.rs b/fastpool/src/bounded.rs index 46d85d9..4435d8c 100644 --- a/fastpool/src/bounded.rs +++ b/fastpool/src/bounded.rs @@ -86,7 +86,9 @@ use mea::semaphore::Semaphore; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; +use crate::RetainResult; use crate::mutex::Mutex; +use crate::retain_spec; /// The configuration of [`Pool`]. #[derive(Clone, Copy, Debug)] @@ -117,16 +119,6 @@ impl PoolConfig { } } -/// The result returned by [`Pool::retain`]. -#[derive(Debug)] -#[non_exhaustive] -pub struct RetainResult { - /// The number of retained objects. - pub retained: usize, - /// The objects removed from the pool. - pub removed: Vec, -} - /// The current pool status. /// /// See [`Pool::status`]. @@ -288,51 +280,12 @@ impl Pool { /// ``` pub fn retain( &self, - mut f: impl FnMut(&mut M::Object, ObjectStatus) -> bool, + f: impl FnMut(&mut M::Object, ObjectStatus) -> bool, ) -> RetainResult { let mut slots = self.slots.lock(); - - let len = slots.deque.len(); - let mut idx = 0; - let mut cur = 0; - - // Stage 1: All values are retained. - while cur < len { - let state = &mut slots.deque[cur]; - if !f(&mut state.o, state.status) { - cur += 1; - break; - } - cur += 1; - idx += 1; - } - - // Stage 2: Swap retained value into current idx. - while cur < len { - let state = &mut slots.deque[cur]; - if !f(&mut state.o, state.status) { - cur += 1; - continue; - } - - slots.deque.swap(idx, cur); - cur += 1; - idx += 1; - } - - // Stage 3: Truncate all values after idx. - let removed = if cur != idx { - let removed = slots.deque.split_off(idx); - slots.current_size -= removed.len(); - removed.into_iter().map(|state| state.o).collect() - } else { - Vec::new() - }; - - RetainResult { - retained: idx, - removed, - } + let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f); + slots.current_size -= result.removed.len(); + result } /// Returns the current status of the pool. @@ -525,3 +478,19 @@ struct ObjectState { o: T, status: ObjectStatus, } + +impl retain_spec::SealedState for ObjectState { + type Object = T; + + fn status(&self) -> ObjectStatus { + self.status + } + + fn mut_object(&mut self) -> &mut Self::Object { + &mut self.o + } + + fn take_object(self) -> Self::Object { + self.o + } +} diff --git a/fastpool/src/lib.rs b/fastpool/src/lib.rs index bb5bea8..57e74b3 100644 --- a/fastpool/src/lib.rs +++ b/fastpool/src/lib.rs @@ -198,9 +198,11 @@ pub use common::ManageObject; pub use common::ObjectStatus; pub use common::QueueStrategy; +pub use retain_spec::RetainResult; mod common; mod mutex; +mod retain_spec; pub mod bounded; pub mod unbounded; diff --git a/fastpool/src/retain_spec.rs b/fastpool/src/retain_spec.rs new file mode 100644 index 0000000..367eaab --- /dev/null +++ b/fastpool/src/retain_spec.rs @@ -0,0 +1,83 @@ +// Copyright 2025 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +use crate::ObjectStatus; + +/// The result returned by `Pool::retain`. +#[derive(Debug)] +#[non_exhaustive] +pub struct RetainResult { + /// The number of retained objects. + pub retained: usize, + /// The objects removed from the pool. + pub removed: Vec, +} + +pub(crate) trait SealedState { + type Object; + + fn status(&self) -> ObjectStatus; + fn mut_object(&mut self) -> &mut Self::Object; + fn take_object(self) -> Self::Object; +} + +pub(crate) fn do_vec_deque_retain>( + deque: &mut VecDeque, + mut f: impl FnMut(&mut T, ObjectStatus) -> bool, +) -> RetainResult { + let len = deque.len(); + let mut idx = 0; + let mut cur = 0; + + // Stage 1: All values are retained. + while cur < len { + let state = &mut deque[cur]; + let status = state.status(); + if !f(state.mut_object(), status) { + cur += 1; + break; + } + cur += 1; + idx += 1; + } + + // Stage 2: Swap retained value into current idx. + while cur < len { + let state = &mut deque[cur]; + let status = state.status(); + if !f(state.mut_object(), status) { + cur += 1; + continue; + } + + deque.swap(idx, cur); + cur += 1; + idx += 1; + } + + // Stage 3: Truncate all values after idx. + let removed = if cur != idx { + let removed = deque.split_off(idx); + removed.into_iter().map(State::take_object).collect() + } else { + Vec::new() + }; + + RetainResult { + retained: idx, + removed, + } +} diff --git a/fastpool/src/unbounded.rs b/fastpool/src/unbounded.rs index eb9f807..a9d03c9 100644 --- a/fastpool/src/unbounded.rs +++ b/fastpool/src/unbounded.rs @@ -102,7 +102,9 @@ use std::sync::Weak; use crate::ManageObject; use crate::ObjectStatus; use crate::QueueStrategy; +use crate::RetainResult; use crate::mutex::Mutex; +use crate::retain_spec; /// The configuration of [`Pool`]. #[derive(Clone, Copy, Debug)] @@ -388,6 +390,40 @@ impl> Pool { } } + /// Retains only the objects that pass the given predicate. + /// + /// This function blocks the entire pool. Therefore, the given function should not block. + /// + /// The following example starts a background task that runs every 30 seconds and removes + /// objects from the pool that have not been used for more than one minute. The task will + /// terminate if the pool is dropped. + /// + /// ```rust,ignore + /// let interval = Duration::from_secs(30); + /// let max_age = Duration::from_secs(60); + /// + /// let weak_pool = Arc::downgrade(&pool); + /// tokio::spawn(async move { + /// loop { + /// tokio::time::sleep(interval).await; + /// if let Some(pool) = weak_pool.upgrade() { + /// pool.retain(|_, status| status.last_used().elapsed() < max_age); + /// } else { + /// break; + /// } + /// } + /// }); + /// ``` + pub fn retain( + &self, + f: impl FnMut(&mut M::Object, ObjectStatus) -> bool, + ) -> RetainResult { + let mut slots = self.slots.lock(); + let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f); + slots.current_size -= result.removed.len(); + result + } + /// Returns the current status of the pool. /// /// The status returned by the pool is not guaranteed to be consistent. @@ -537,3 +573,19 @@ struct ObjectState { o: T, status: ObjectStatus, } + +impl retain_spec::SealedState for ObjectState { + type Object = T; + + fn status(&self) -> ObjectStatus { + self.status + } + + fn mut_object(&mut self) -> &mut Self::Object { + &mut self.o + } + + fn take_object(self) -> Self::Object { + self.o + } +}