Conversation
There was a problem hiding this comment.
Pull request overview
This pull request fixes a race condition in the getAny method by replacing condition variables with a new synchronization mechanism consisting of BinarySemaphore and ManyToOneNotifier. The race condition occurred when a message arrived between checking for messages and waiting on the condition variable, causing the notification to be missed. The new implementation uses a binary semaphore that "remembers" notifications, ensuring no messages are lost.
Changes:
- Introduces
BinarySemaphoreclass as a temporary C++17-compatible alternative tostd::binary_semaphore - Implements
ManyToOneNotifierfor many-to-one thread synchronization with predicate support - Refactors
MessageQueueto use notifiers instead of condition variables, with a newwaitAnystatic method - Adds thread-safe locking to
LockingQueue::isDestroyed()method
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| include/depthai/utility/BinarySemaphore.hpp | New binary semaphore implementation with acquire/release semantics and timeout support |
| include/depthai/utility/ManyToOneNotifier.hpp | New notifier class that wraps BinarySemaphore with predicate-based waiting |
| include/depthai/pipeline/MessageQueue.hpp | Updated to use notifiers instead of condition variables, added waitAny declaration |
| src/pipeline/MessageQueue.cpp | Refactored getAny to use new waitAny method, renamed methods from condVar to notifier terminology |
| include/depthai/utility/LockingQueue.hpp | Added mutex lock to isDestroyed() for thread safety |
| tests/src/onhost_tests/calibration_handler_test.cpp | Reordered includes to follow standard library first convention |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| queue = std::move(m.queue); | ||
| name = std::move(m.name); | ||
| callbacks = std::move(m.callbacks); | ||
| notifiers = std::move(m.notifiers); | ||
| uniqueCallbackId = m.uniqueCallbackId; | ||
| pipelineEventDispatcher = m.pipelineEventDispatcher; | ||
| return *this; |
There was a problem hiding this comment.
Missing uniqueNotifierId in assignment operators. The move assignment operator copies uniqueCallbackId (line 90) but doesn't copy/move uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.
src/pipeline/MessageQueue.cpp
Outdated
| removeNotifiers(); | ||
| return false; |
There was a problem hiding this comment.
Unreachable code detected. After the try-catch block completes successfully (line 259 returns true), execution will never reach lines 264-265. The removeNotifiers() call on line 264 will never execute, which means notifiers added to the queues will not be removed in the success case. This will cause a resource leak as notifiers accumulate in the MessageQueue instances.
| CallbackId uniqueCondVarId{0}; | ||
| std::mutex notifierMtx; | ||
| std::unordered_map<CallbackId, std::shared_ptr<ManyToOneNotifier>> notifiers; | ||
| ; |
There was a problem hiding this comment.
Extraneous semicolon after member declaration. This line contains only a semicolon which creates an unnecessary empty declaration.
| ; |
| queue = c.queue; | ||
| name = c.name; | ||
| callbacks = c.callbacks; | ||
| notifiers = c.notifiers; | ||
| uniqueCallbackId = c.uniqueCallbackId; | ||
| pipelineEventDispatcher = c.pipelineEventDispatcher; | ||
| return *this; |
There was a problem hiding this comment.
Missing uniqueNotifierId in assignment operators. The copy and move assignment operators copy uniqueCallbackId (lines 80 and 90) but don't copy uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.
src/pipeline/MessageQueue.cpp
Outdated
| std::lock_guard<std::mutex> lock(cvNotifyMtx); | ||
| std::lock_guard<std::mutex> lock(notifierMtx); | ||
|
|
||
| // Call all callbacks |
There was a problem hiding this comment.
Misleading comment. The comment says "Call all callbacks" but the code actually notifies all notifiers, not callbacks. The comment should be updated to say "Notify all notifiers" or "Notify all listeners" to match the function name.
| // Call all callbacks | |
| // Notify all listeners |
src/pipeline/MessageQueue.cpp
Outdated
| std::unique_lock<std::mutex> lock(cvNotifyMtx); | ||
| std::unique_lock<std::mutex> lock(notifierMtx); | ||
|
|
||
| // If callback with id 'callbackId' doesn't exists, return false |
There was a problem hiding this comment.
Incorrect variable reference in comment. The comment refers to 'callbackId' but the parameter is named 'notifierId'. The comment should be updated to: "If notifier with id 'notifierId' doesn't exist, return false".
| // If callback with id 'callbackId' doesn't exists, return false | |
| // If notifier with id 'notifierId' doesn't exist, return false |
| #pragma once | ||
|
|
||
| #include <chrono> | ||
| #include <condition_variable> | ||
| #include <mutex> | ||
|
|
||
| /** | ||
| * @brief A binary semaphore implementation | ||
| * Should be removed and replaced with std::binary_semaphore if and when C++20 is supported | ||
| */ | ||
| class BinarySemaphore { | ||
| std::mutex mtx; | ||
| std::condition_variable cv; | ||
| bool available = false; // zero-initialized | ||
|
|
||
| public: | ||
| BinarySemaphore() = default; | ||
| explicit BinarySemaphore(bool initiallyAvailable) : available(initiallyAvailable) {} | ||
|
|
||
| // signal / post / V | ||
| void release() { | ||
| { | ||
| std::lock_guard<std::mutex> lk(mtx); | ||
| available = true; | ||
| } | ||
| cv.notify_one(); | ||
| } | ||
|
|
||
| // wait / P | ||
| void acquire() { | ||
| std::unique_lock<std::mutex> lk(mtx); | ||
| cv.wait(lk, [&] { return available; }); | ||
| available = false; // consume | ||
| } | ||
|
|
||
| // try-wait | ||
| bool tryAcquire() { | ||
| std::lock_guard<std::mutex> lk(mtx); | ||
| if(!available) return false; | ||
| available = false; | ||
| return true; | ||
| } | ||
|
|
||
| template <class Rep, class Period> | ||
| bool tryAcquireFor(const std::chrono::duration<Rep, Period>& timeout) { | ||
| std::unique_lock<std::mutex> lk(mtx); | ||
| if(!cv.wait_for(lk, timeout, [&] { return available; })) return false; | ||
| available = false; | ||
| return true; | ||
| } | ||
|
|
||
| template <class Clock, class Duration> | ||
| bool tryAcquireUntil(const std::chrono::time_point<Clock, Duration>& deadline) { | ||
| std::unique_lock<std::mutex> lk(mtx); | ||
| if(!cv.wait_until(lk, deadline, [&] { return available; })) return false; | ||
| available = false; // consume | ||
| return true; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Missing namespace declaration. The BinarySemaphore class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.
| #pragma once | ||
|
|
||
| #include <atomic> | ||
| #include <stdexcept> | ||
|
|
||
| #include "BinarySemaphore.hpp" | ||
|
|
||
| class ManyToOneNotifier { | ||
| BinarySemaphore semaphore{false}; | ||
| std::atomic<bool> waiting{false}; | ||
|
|
||
| public: | ||
| ManyToOneNotifier() = default; | ||
|
|
||
| /** | ||
| * @brief Notify the waiting thread | ||
| * Should always be called after the state change that the waiting thread is waiting for | ||
| */ | ||
| void notifyOne() { | ||
| semaphore.release(); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Wait until notified | ||
| * @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe | ||
| */ | ||
| template <typename Pred> | ||
| void wait(Pred pred) { | ||
| if(waiting.exchange(true)) { | ||
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); | ||
| } | ||
| try { | ||
| while(!pred()) semaphore.acquire(); | ||
| } catch(...) { | ||
| waiting = false; | ||
| throw; | ||
| } | ||
| waiting = false; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Wait until notified or timeout occurs | ||
| * @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe | ||
| */ | ||
| template <typename Pred, typename Rep, typename Period> | ||
| bool waitFor(Pred pred, std::chrono::duration<Rep, Period> timeout) { | ||
| if(waiting.exchange(true)) { | ||
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); | ||
| } | ||
|
|
||
| try { | ||
| auto deadline = std::chrono::steady_clock::now() + timeout; | ||
|
|
||
| while(!pred()) { | ||
| if(!semaphore.tryAcquireUntil(deadline)) { | ||
| return pred(); | ||
| } | ||
| } | ||
| return true; | ||
| } catch(...) { | ||
| waiting = false; | ||
| throw; | ||
| } | ||
| waiting = false; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Missing namespace declaration. The ManyToOneNotifier class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.
| #pragma once | ||
|
|
||
| #include <atomic> | ||
| #include <stdexcept> |
There was a problem hiding this comment.
Missing chrono header include. The waitFor method uses std::chrono::duration and std::chrono::steady_clock but the header is not included. This works only because BinarySemaphore.hpp includes , creating a hidden dependency. Add #include for proper self-contained header design.
| #include <stdexcept> | |
| #include <stdexcept> | |
| #include <chrono> |
…/get_any_race_condition
| template <typename Pred> | ||
| void wait(Pred pred) { | ||
| if(waiting.exchange(true)) { | ||
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); |
There was a problem hiding this comment.
What is the purpose of the condition that only one thread can wait for a notifier; what is that beneficial for?
In the message queues, you typically want that many threads wait in a notification queue and only one thread gets notified.
There was a problem hiding this comment.
The purpose of this class is for one main thread to wait on any one of multiple worker threads. This check ensures that someone using this class doesn't get unexpected results if they try to wait from multiple threads.
There was a problem hiding this comment.
Is that a beneficial behavior?
If you did not use a BinarySemaphore but used a class cv + mutex, then there could be more waiting threads waiting while Worker threads could be notifying them.
There was a problem hiding this comment.
If I used a mutex I'd have to lock the queue.push method and no output would be able to send while another watched queue is being sent to or while checking the predicate. I'd also have to add a lock for each waiting thread which would increase code complexity, which is not necessary since this accomplishes the goal of this PR.
There was a problem hiding this comment.
I assume we have the following scenario - and correct me, if I am wrong.
- Multiple writers. One writer doesn't wait for a signal, it tries to push to a particular queue. But there may be multiple writers trying to push data to the same queue.
- Multiple readers on the particular queue. A reader is waiting to get a signal that the queue has any data available - and it reads the data and processes further.
What about this solution then:
-
A worker thread (writer) that pushes data to a queue
1.1. The thread locks the queue with queue's related queueMutex
1.2. Data is pushed into queue
1.3. The queue is unlocked with queue's related queueMutex
1.4. A conditional variable dataReady is called to notify max. one thread -
A worker thread (reader) that waits for a data from a queue:
2.1 The queue locks the queue with queue's related queueMutex
2.2 Conditional variable dataReady->wait(queueMutex...) is called:
2.2.1 Checks the predicate and if true, return
2.2.2 Unlocks the queueMutex
2.2.3 Waits for a signal from dataReady (with unlocked queueMutex)
2.2.4 Locks the queueMutex and goes to 2.2.1
2.2 The worker thread reads the data from the queue
2.3 The worker thread unlocks the queueMutex
There was a problem hiding this comment.
If I understand correctly, you're describing the way queue.get() works. This is a separate thing. Each message queue already has its own conditional variable and mutex that handles pushes and pops from the queue.
The issue I had for getAny is that there isn't a mechanism built in to only wait until one conditional variable notifies. To do this you need to pass a single condition variable instance to multiple queues and wait until one notifies.
So the way it works now is that each time you want to wait on any of a list of queues, a semaphore is created and added to every queue in the list. When a new message arrives to any of the queues (or when a queue is destroyed) the semaphore is notified, and the waiting thread gets woken up.
| std::lock_guard<std::mutex> lk(mtx); | ||
| available = true; | ||
| } | ||
| cv.notify_one(); |
There was a problem hiding this comment.
If we talk about BinarySemaphore (only one waiting), then you can notify conditionally.
If a semaphore was already released and anybody would call "release", do not notify. Take it as an erroneous situation with no action.
There was a problem hiding this comment.
If there's no waiting thread then there's practically no overhead to notifying, which would be the case if available was already true. If I conditionally notified, I'd have to do it under the lock which would mean that the waiting thread would wake up and then immediately block and wait for the mutex to be released which would cause two extra context switches.
There was a problem hiding this comment.
Not sure you understood me. Let's make it clear.
This is one scenario I was thinking about:
void release() {
ASSERT(available == true, "There is a bug in the SW, a thread calls release while the semaphore is not acquired. We will not call notify_one");
{
std::lock_guard<std::mutex> lk(mtx);
available = true;
}
cv.notify_one();
}
Another one, without ASSERT but with
void release() {
bool was_available = false;
{
std::lock_guard<std::mutex> lk(mtx);
was_available = available;
available = true;
}
if (!was_available)
cv.notify_one(); // call notify one only in an expected scenario.
}
There was a problem hiding this comment.
Ah I see. notify_one has little overhead when there are no waiters but it certainly doesn't hurt to add this. Thanks
…/get_any_race_condition
|
@majvan Do you have any more concerns or can this be merged? |
…/get_any_race_condition
…/get_any_race_condition
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); | ||
| } | ||
| try { | ||
| while(!pred()) semaphore.acquire(); |
There was a problem hiding this comment.
Two issues.
[1]
We check pred() and then (maybe) acquire a semaphore
This function has then two expected outcomes then:
- the pred was fulfilled BUT NO semaphore acquired (the case when the pred() returns true the first time)
- the pred was fulfilled AND semaphore acquired (the case when the pred() returns true the next time)
The semaphore acquisition is not causally linked to the predicate becoming true.
[2]
Typically the Pred in the wait (like cv::wait) are used just because the whole pred() is running protected under the mutex. The reason is that the mutex ensures the atomicity of the operation together with cv's waiting cycle.
It's not a case here. If pred() is an unprotected function, it can be preempted by other thread which can access the state of the pred() - and this creates a potential RACE CONDITION.
Example:
// We need to ensure that the queue has data and that camera is on (an abstract example)
bool pred() {
if (!camera_on)
return false;
if (queue.empty)
return false;
return true;
}
Step 0 - initial state:
camera_on = true;
queue.empty = true;
Step 1 - ThreadA acquires a semaphore and continues:
gets into pred() and computes camera_on = true
-- preemptive context switch --
Step 2 - Thread C runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --
Step 3 - ThreadA continues:
continues the pred() and computes queue.empty = false and returns true
ManyToWaitNotifier::wait() returns but the state is:
camera_on = false; <-- RACE CONDITION: we wanted to wait only till camera is on
The predicate reads shared state (camera_on, queue.empty) without holding the mutex that protects it. This allows concurrent modification by other threads, resulting in undefined behavior and inconsistent observations of the system state.
In contrast, std::condition_variable::wait(lock, pred) guarantees that the predicate is evaluated while holding the mutex, making the check and sleep cycle atomic.
Another outcome of [2] is a possible deadlock:
Step 1: ThreadA - runs:
if (!pred())
semaphore.acquire(); // waits in the semaphore for the cv
Step 2: ThreadB - is about to release the semaphore:
{
std::lock(camera_and_queue)
queue.empty = false;
notifyOne(); -> semaphore.release(); // notify there is next some data, release the semaphore
}
Step 3: ThreadA - continues:
semaphore.acquire(); // finishes the semaphore acquiring
-- preemptive context switch --
Step 4: ThreadC - runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --
Step 5 - ThreadA continues:
if (!pred())
semaphore.acquire(); // DEADLOCK <-- semaphore count is zero and no further release will occur; ThreadA will get not awaken anymore
The semaphore release is not tied to the predicate remaining true.
A thread may wake, consume the semaphore, and then observe the predicate as false due to a concurrent state change. Since the semaphore signal has already been consumed and no further releases are guaranteed, the waiting thread may block forever.
There was a problem hiding this comment.
The second issue is documented:
Should always be called after the state change that the waiting thread is waiting for
(maybe the wording is not clear, should it be 'after any state change'?)
ThreadC should notify after setting camera_on to false.
The first issue is a good point. I'll document this limitation.
There was a problem hiding this comment.
This constraint works for the deadlock, but these rules have to be followed.
And it brings additional unnecessary wakeups even in the case your state is going "against" the wakeup (why would you wakeup threads when you switch off the camera when you want them to work when the camera is switched on?)
Using std::condition_variable is much easier and exactly what you need here without reinventing it. :)
…/get_any_race_condition
…/get_any_race_condition
src/pipeline/MessageQueue.cpp
Outdated
| std::mutex inputsWaitMutex; | ||
| auto inputsWaitCv = std::make_shared<std::condition_variable>(); | ||
| }; | ||
| auto checkAllClosed = [&]() { return boost::algorithm::all_of(queues, [](const std::reference_wrapper<MessageQueue> q) { return q.get().isClosed(); }); }; |
src/pipeline/MessageQueue.cpp
Outdated
| if(gotAny) { | ||
| for(const auto& kv : queues) { | ||
| auto& input = kv.second; | ||
| if(!input.isClosed() && input.has()) inputs[kv.first] = input.get<ADatatype>(); |
There was a problem hiding this comment.
Shouldn't be this atomically checked and retrieved?
Get an input only when it is not closed and has data. Any modification and readout of the queue state should be mutex-ed.
Fixes race condition in
getAnyby implementingBinarySemaphore(should be replaced withstd::binary_semaphorewhen we switch to C++20) andManyToOneNotifier. Also addswaitAny.Might need some extra eyes on this 👀
Here's why I believe this works: