From 4158a64539efccdad4d0c965801e034d8d8c8ecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sarek=20H=C3=B8verstad=20Skot=C3=A5m?= Date: Fri, 18 Apr 2025 02:20:36 -0700 Subject: [PATCH] Add BatchSemaphore::{held_permits, is_queued} --- Cargo.toml | 2 +- shuttle/Cargo.toml | 2 + shuttle/src/future/batch_semaphore.rs | 85 ++++++++++++++++++++------- 3 files changed, 67 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fa9f0037..d5fc472a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,4 @@ members = [ "wrappers/shuttle_sync", ] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/shuttle/Cargo.toml b/shuttle/Cargo.toml index ebb85e8e..c098be60 100644 --- a/shuttle/Cargo.toml +++ b/shuttle/Cargo.toml @@ -21,6 +21,8 @@ scoped-tls = "1.0.0" smallvec = { version = "1.11.2", features = ["const_new"] } tracing = { version = "0.1.36", default-features = false, features = ["std"] } corosensei = "0.3.1" +indexmap = { version = "2.9", features = ["std"] } +ahash = "0.8" # for annotation only regex = { version = "1.10.6", optional = true } diff --git a/shuttle/src/future/batch_semaphore.rs b/shuttle/src/future/batch_semaphore.rs index 5be975e2..9611d541 100644 --- a/shuttle/src/future/batch_semaphore.rs +++ b/shuttle/src/future/batch_semaphore.rs @@ -1,9 +1,11 @@ //! A counting semaphore supporting both async and sync operations. -use crate::current; +use crate::current::{self, get_current_task}; use crate::runtime::execution::ExecutionState; use crate::runtime::task::{clock::VectorClock, TaskId}; use crate::runtime::thread; use crate::sync::{ResourceSignature, ResourceType}; +use ahash::random_state::RandomState; +use indexmap::IndexMap; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; @@ -13,7 +15,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::sync::Mutex; use std::task::{Context, Poll, Waker}; -use tracing::trace; +use tracing::{error, trace}; struct Waiter { task_id: TaskId, @@ -210,10 +212,17 @@ struct BatchSemaphoreState { permits_available: PermitsAvailable, // TODO: should there be a clock for the close event? closed: bool, + // tracks which tasks hold how many permits. + holders: IndexMap, } impl BatchSemaphoreState { - fn acquire_permits(&mut self, num_permits: usize, fairness: Fairness) -> Result<(), TryAcquireError> { + fn acquire_permits( + &mut self, + num_permits: usize, + fairness: Fairness, + task_id: TaskId, + ) -> Result<(), TryAcquireError> { assert!(num_permits > 0); if self.closed { Err(TryAcquireError::Closed) @@ -234,6 +243,11 @@ impl BatchSemaphoreState { s.update_clock(&clock); }); + self.holders + .entry(task_id) + .and_modify(|permits| *permits += num_permits) + .or_insert(num_permits); + Ok(()) } else { Err(TryAcquireError::NoPermits) @@ -341,6 +355,7 @@ impl BatchSemaphore { waiters: VecDeque::new(), permits_available: PermitsAvailable::new(num_permits), closed: false, + holders: IndexMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0)), }); Self { state, @@ -369,6 +384,7 @@ impl BatchSemaphore { waiters: VecDeque::new(), permits_available: PermitsAvailable::const_new(num_permits), closed: false, + holders: IndexMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0)), }); Self { state, @@ -446,24 +462,30 @@ impl BatchSemaphore { self.init_object_id(); let mut state = self.state.borrow_mut(); let id = state.id.unwrap(); - let res = state.acquire_permits(num_permits, self.fairness).inspect_err(|_err| { - // Conservatively, the requester causally depends on the - // last successful acquire. - // TODO: This is not precise, but `try_acquire` causal dependency - // TODO: is both hard to define, and is most likely not worth the - // TODO: effort. The cases where causality would be tracked - // TODO: "imprecisely" do not correspond to commonly used sync. - // TODO: primitives, such as mutexes, mutexes, or condvars. - // TODO: An example would be a counting semaphore used to guard - // TODO: access to N homogenous resources (as opposed to FIFO, - // TODO: heterogenous resources). - // TODO: More precision could be gained by tracking clocks for all - // TODO: current permit holders, with a data structure similar to - // TODO: `permits_available`. - ExecutionState::with(|s| { - s.update_clock(&state.permits_available.last_acquire); - }); + let task_id = get_current_task().unwrap_or_else(|| { + error!("Tried to acquire a semaphore while there is no current task. Panicking"); + panic!("Tried to acquire a semaphore while there is no current task."); }); + let res = state + .acquire_permits(num_permits, self.fairness, task_id) + .inspect_err(|_err| { + // Conservatively, the requester causally depends on the + // last successful acquire. + // TODO: This is not precise, but `try_acquire` causal dependency + // TODO: is both hard to define, and is most likely not worth the + // TODO: effort. The cases where causality would be tracked + // TODO: "imprecisely" do not correspond to commonly used sync. + // TODO: primitives, such as mutexes, mutexes, or condvars. + // TODO: An example would be a counting semaphore used to guard + // TODO: access to N homogenous resources (as opposed to FIFO, + // TODO: heterogenous resources). + // TODO: More precision could be gained by tracking clocks for all + // TODO: current permit holders, with a data structure similar to + // TODO: `permits_available`. + ExecutionState::with(|s| { + s.update_clock(&state.permits_available.last_acquire); + }); + }); drop(state); // If we won the race for permits of an unfair semaphore, re-block @@ -622,6 +644,23 @@ impl BatchSemaphore { } drop(state); } + + /// Returns the number of permits currently held by the given `task_id` + pub fn held_permits(&self, task_id: &TaskId) -> usize { + let state = self.state.borrow_mut(); + *state.holders.get(task_id).unwrap_or(&0) + } + + /// Returns `true` iff the given `task_id` is currently waiting to acquire permits from the `BatchSemaphore` + pub fn is_queued(&self, task_id: &TaskId) -> bool { + let state = self.state.borrow_mut(); + for waiter in &state.waiters { + if waiter.task_id == *task_id { + return true; + } + } + false + } } // Safety: Semaphore is never actually passed across true threads, only across continuations. The @@ -748,7 +787,11 @@ impl Future for Acquire<'_> { // clock, as this thread will be blocked below. let mut state = self.semaphore.state.borrow_mut(); let id = state.id.unwrap(); - let acquire_result = state.acquire_permits(self.waiter.num_permits, self.semaphore.fairness); + let task_id = get_current_task().unwrap_or_else(|| { + error!("Tried to acquire a semaphore while there is no current task. Panicking"); + panic!("Tried to acquire a semaphore while there is no current task."); + }); + let acquire_result = state.acquire_permits(self.waiter.num_permits, self.semaphore.fairness, task_id); drop(state); match acquire_result {