Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ members = [
"wrappers/shuttle_sync",
]

resolver = "2"
resolver = "2"
2 changes: 2 additions & 0 deletions shuttle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ scoped-tls = "1.0.0"
smallvec = { version = "1.11.2", features = ["const_new"] }
tracing = { version = "0.1.36", default-features = false, features = ["std"] }
corosensei = "0.3.1"
indexmap = { version = "2.9", features = ["std"] }
ahash = "0.8"

# for annotation only
regex = { version = "1.10.6", optional = true }
Expand Down
85 changes: 64 additions & 21 deletions shuttle/src/future/batch_semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! A counting semaphore supporting both async and sync operations.
use crate::current;
use crate::current::{self, get_current_task};
use crate::runtime::execution::ExecutionState;
use crate::runtime::task::{clock::VectorClock, TaskId};
use crate::runtime::thread;
use crate::sync::{ResourceSignature, ResourceType};
use ahash::random_state::RandomState;
use indexmap::IndexMap;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
Expand All @@ -13,7 +15,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::task::{Context, Poll, Waker};
use tracing::trace;
use tracing::{error, trace};

struct Waiter {
task_id: TaskId,
Expand Down Expand Up @@ -210,10 +212,17 @@ struct BatchSemaphoreState {
permits_available: PermitsAvailable,
// TODO: should there be a clock for the close event?
closed: bool,
// tracks which tasks hold how many permits.
holders: IndexMap<TaskId, usize, RandomState>,
}

impl BatchSemaphoreState {
fn acquire_permits(&mut self, num_permits: usize, fairness: Fairness) -> Result<(), TryAcquireError> {
fn acquire_permits(
&mut self,
num_permits: usize,
fairness: Fairness,
task_id: TaskId,
) -> Result<(), TryAcquireError> {
assert!(num_permits > 0);
if self.closed {
Err(TryAcquireError::Closed)
Expand All @@ -234,6 +243,11 @@ impl BatchSemaphoreState {
s.update_clock(&clock);
});

self.holders
.entry(task_id)
.and_modify(|permits| *permits += num_permits)
.or_insert(num_permits);

Ok(())
} else {
Err(TryAcquireError::NoPermits)
Expand Down Expand Up @@ -341,6 +355,7 @@ impl BatchSemaphore {
waiters: VecDeque::new(),
permits_available: PermitsAvailable::new(num_permits),
closed: false,
holders: IndexMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0)),
});
Self {
state,
Expand Down Expand Up @@ -369,6 +384,7 @@ impl BatchSemaphore {
waiters: VecDeque::new(),
permits_available: PermitsAvailable::const_new(num_permits),
closed: false,
holders: IndexMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0)),
});
Self {
state,
Expand Down Expand Up @@ -446,24 +462,30 @@ impl BatchSemaphore {
self.init_object_id();
let mut state = self.state.borrow_mut();
let id = state.id.unwrap();
let res = state.acquire_permits(num_permits, self.fairness).inspect_err(|_err| {
// Conservatively, the requester causally depends on the
// last successful acquire.
// TODO: This is not precise, but `try_acquire` causal dependency
// TODO: is both hard to define, and is most likely not worth the
// TODO: effort. The cases where causality would be tracked
// TODO: "imprecisely" do not correspond to commonly used sync.
// TODO: primitives, such as mutexes, mutexes, or condvars.
// TODO: An example would be a counting semaphore used to guard
// TODO: access to N homogenous resources (as opposed to FIFO,
// TODO: heterogenous resources).
// TODO: More precision could be gained by tracking clocks for all
// TODO: current permit holders, with a data structure similar to
// TODO: `permits_available`.
ExecutionState::with(|s| {
s.update_clock(&state.permits_available.last_acquire);
});
let task_id = get_current_task().unwrap_or_else(|| {
error!("Tried to acquire a semaphore while there is no current task. Panicking");
panic!("Tried to acquire a semaphore while there is no current task.");
});
let res = state
.acquire_permits(num_permits, self.fairness, task_id)
.inspect_err(|_err| {
// Conservatively, the requester causally depends on the
// last successful acquire.
// TODO: This is not precise, but `try_acquire` causal dependency
// TODO: is both hard to define, and is most likely not worth the
// TODO: effort. The cases where causality would be tracked
// TODO: "imprecisely" do not correspond to commonly used sync.
// TODO: primitives, such as mutexes, mutexes, or condvars.
// TODO: An example would be a counting semaphore used to guard
// TODO: access to N homogenous resources (as opposed to FIFO,
// TODO: heterogenous resources).
// TODO: More precision could be gained by tracking clocks for all
// TODO: current permit holders, with a data structure similar to
// TODO: `permits_available`.
ExecutionState::with(|s| {
s.update_clock(&state.permits_available.last_acquire);
});
});
drop(state);

// If we won the race for permits of an unfair semaphore, re-block
Expand Down Expand Up @@ -622,6 +644,23 @@ impl BatchSemaphore {
}
drop(state);
}

/// Returns the number of permits currently held by the given `task_id`
pub fn held_permits(&self, task_id: &TaskId) -> usize {
let state = self.state.borrow_mut();
*state.holders.get(task_id).unwrap_or(&0)
}

/// Returns `true` iff the given `task_id` is currently waiting to acquire permits from the `BatchSemaphore`
pub fn is_queued(&self, task_id: &TaskId) -> bool {
let state = self.state.borrow_mut();
for waiter in &state.waiters {
if waiter.task_id == *task_id {
return true;
}
}
false
}
}

// Safety: Semaphore is never actually passed across true threads, only across continuations. The
Expand Down Expand Up @@ -748,7 +787,11 @@ impl Future for Acquire<'_> {
// clock, as this thread will be blocked below.
let mut state = self.semaphore.state.borrow_mut();
let id = state.id.unwrap();
let acquire_result = state.acquire_permits(self.waiter.num_permits, self.semaphore.fairness);
let task_id = get_current_task().unwrap_or_else(|| {
error!("Tried to acquire a semaphore while there is no current task. Panicking");
panic!("Tried to acquire a semaphore while there is no current task.");
});
let acquire_result = state.acquire_permits(self.waiter.num_permits, self.semaphore.fairness, task_id);
drop(state);

match acquire_result {
Expand Down
Loading