Skip to content

Comments

Fix getAny race condition#1644

Open
asahtik wants to merge 23 commits intodevelopfrom
bugfix/get_any_race_condition
Open

Fix getAny race condition#1644
asahtik wants to merge 23 commits intodevelopfrom
bugfix/get_any_race_condition

Conversation

@asahtik
Copy link
Contributor

@asahtik asahtik commented Jan 21, 2026

Fixes race condition in getAny by implementing BinarySemaphore (should be replaced with std::binary_semaphore when we switch to C++20) and ManyToOneNotifier. Also adds waitAny.

Might need some extra eyes on this 👀

Here's why I believe this works:

func waitAny():
  1. add notifiers
  // If we get a message here the next line will evaluate to true + notifier semaphore will have available set
  2. check if any messages are already present
  // If we get a message here it will be caught in the notifier predicate + notifier semaphore will have available set
  3. check predicate
  // If we get a message here the notifier semaphore will have available set
  4. acquire semaphore
  // From here on the semaphore should ensure that the thread continues when available is set
  5. go to 3. which should now return true
// This is implemented according to the reference so it should be correct
func Sem::acquire:
  6. lock the available variable
  // If there were any new messages from point 3 to 6, the next line will evaluate to true
  7. check predicate (is available true - in cv wait)
  8. unlock
  9. sleep & wait for notify

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes a race condition in the getAny method by replacing condition variables with a new synchronization mechanism consisting of BinarySemaphore and ManyToOneNotifier. The race condition occurred when a message arrived between checking for messages and waiting on the condition variable, causing the notification to be missed. The new implementation uses a binary semaphore that "remembers" notifications, ensuring no messages are lost.

Changes:

  • Introduces BinarySemaphore class as a temporary C++17-compatible alternative to std::binary_semaphore
  • Implements ManyToOneNotifier for many-to-one thread synchronization with predicate support
  • Refactors MessageQueue to use notifiers instead of condition variables, with a new waitAny static method
  • Adds thread-safe locking to LockingQueue::isDestroyed() method

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
include/depthai/utility/BinarySemaphore.hpp New binary semaphore implementation with acquire/release semantics and timeout support
include/depthai/utility/ManyToOneNotifier.hpp New notifier class that wraps BinarySemaphore with predicate-based waiting
include/depthai/pipeline/MessageQueue.hpp Updated to use notifiers instead of condition variables, added waitAny declaration
src/pipeline/MessageQueue.cpp Refactored getAny to use new waitAny method, renamed methods from condVar to notifier terminology
include/depthai/utility/LockingQueue.hpp Added mutex lock to isDestroyed() for thread safety
tests/src/onhost_tests/calibration_handler_test.cpp Reordered includes to follow standard library first convention

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 86 to 92
queue = std::move(m.queue);
name = std::move(m.name);
callbacks = std::move(m.callbacks);
notifiers = std::move(m.notifiers);
uniqueCallbackId = m.uniqueCallbackId;
pipelineEventDispatcher = m.pipelineEventDispatcher;
return *this;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing uniqueNotifierId in assignment operators. The move assignment operator copies uniqueCallbackId (line 90) but doesn't copy/move uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.

Copilot uses AI. Check for mistakes.
Comment on lines 264 to 265
removeNotifiers();
return false;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unreachable code detected. After the try-catch block completes successfully (line 259 returns true), execution will never reach lines 264-265. The removeNotifiers() call on line 264 will never execute, which means notifiers added to the queues will not be removed in the success case. This will cause a resource leak as notifiers accumulate in the MessageQueue instances.

Copilot uses AI. Check for mistakes.
CallbackId uniqueCondVarId{0};
std::mutex notifierMtx;
std::unordered_map<CallbackId, std::shared_ptr<ManyToOneNotifier>> notifiers;
;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extraneous semicolon after member declaration. This line contains only a semicolon which creates an unnecessary empty declaration.

Suggested change
;

Copilot uses AI. Check for mistakes.
Comment on lines 76 to 82
queue = c.queue;
name = c.name;
callbacks = c.callbacks;
notifiers = c.notifiers;
uniqueCallbackId = c.uniqueCallbackId;
pipelineEventDispatcher = c.pipelineEventDispatcher;
return *this;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing uniqueNotifierId in assignment operators. The copy and move assignment operators copy uniqueCallbackId (lines 80 and 90) but don't copy uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.

Copilot uses AI. Check for mistakes.
std::lock_guard<std::mutex> lock(cvNotifyMtx);
std::lock_guard<std::mutex> lock(notifierMtx);

// Call all callbacks
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading comment. The comment says "Call all callbacks" but the code actually notifies all notifiers, not callbacks. The comment should be updated to say "Notify all notifiers" or "Notify all listeners" to match the function name.

Suggested change
// Call all callbacks
// Notify all listeners

Copilot uses AI. Check for mistakes.
std::unique_lock<std::mutex> lock(cvNotifyMtx);
std::unique_lock<std::mutex> lock(notifierMtx);

// If callback with id 'callbackId' doesn't exists, return false
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect variable reference in comment. The comment refers to 'callbackId' but the parameter is named 'notifierId'. The comment should be updated to: "If notifier with id 'notifierId' doesn't exist, return false".

Suggested change
// If callback with id 'callbackId' doesn't exists, return false
// If notifier with id 'notifierId' doesn't exist, return false

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 59
#pragma once

#include <chrono>
#include <condition_variable>
#include <mutex>

/**
* @brief A binary semaphore implementation
* Should be removed and replaced with std::binary_semaphore if and when C++20 is supported
*/
class BinarySemaphore {
std::mutex mtx;
std::condition_variable cv;
bool available = false; // zero-initialized

public:
BinarySemaphore() = default;
explicit BinarySemaphore(bool initiallyAvailable) : available(initiallyAvailable) {}

// signal / post / V
void release() {
{
std::lock_guard<std::mutex> lk(mtx);
available = true;
}
cv.notify_one();
}

// wait / P
void acquire() {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&] { return available; });
available = false; // consume
}

// try-wait
bool tryAcquire() {
std::lock_guard<std::mutex> lk(mtx);
if(!available) return false;
available = false;
return true;
}

template <class Rep, class Period>
bool tryAcquireFor(const std::chrono::duration<Rep, Period>& timeout) {
std::unique_lock<std::mutex> lk(mtx);
if(!cv.wait_for(lk, timeout, [&] { return available; })) return false;
available = false;
return true;
}

template <class Clock, class Duration>
bool tryAcquireUntil(const std::chrono::time_point<Clock, Duration>& deadline) {
std::unique_lock<std::mutex> lk(mtx);
if(!cv.wait_until(lk, deadline, [&] { return available; })) return false;
available = false; // consume
return true;
}
};
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing namespace declaration. The BinarySemaphore class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 66
#pragma once

#include <atomic>
#include <stdexcept>

#include "BinarySemaphore.hpp"

class ManyToOneNotifier {
BinarySemaphore semaphore{false};
std::atomic<bool> waiting{false};

public:
ManyToOneNotifier() = default;

/**
* @brief Notify the waiting thread
* Should always be called after the state change that the waiting thread is waiting for
*/
void notifyOne() {
semaphore.release();
}

/**
* @brief Wait until notified
* @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe
*/
template <typename Pred>
void wait(Pred pred) {
if(waiting.exchange(true)) {
throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
}
try {
while(!pred()) semaphore.acquire();
} catch(...) {
waiting = false;
throw;
}
waiting = false;
}

/**
* @brief Wait until notified or timeout occurs
* @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe
*/
template <typename Pred, typename Rep, typename Period>
bool waitFor(Pred pred, std::chrono::duration<Rep, Period> timeout) {
if(waiting.exchange(true)) {
throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
}

try {
auto deadline = std::chrono::steady_clock::now() + timeout;

while(!pred()) {
if(!semaphore.tryAcquireUntil(deadline)) {
return pred();
}
}
return true;
} catch(...) {
waiting = false;
throw;
}
waiting = false;
}
};
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing namespace declaration. The ManyToOneNotifier class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.

Copilot uses AI. Check for mistakes.
#pragma once

#include <atomic>
#include <stdexcept>
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing chrono header include. The waitFor method uses std::chrono::duration and std::chrono::steady_clock but the header is not included. This works only because BinarySemaphore.hpp includes , creating a hidden dependency. Add #include for proper self-contained header design.

Suggested change
#include <stdexcept>
#include <stdexcept>
#include <chrono>

Copilot uses AI. Check for mistakes.
template <typename Pred>
void wait(Pred pred) {
if(waiting.exchange(true)) {
throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the condition that only one thread can wait for a notifier; what is that beneficial for?
In the message queues, you typically want that many threads wait in a notification queue and only one thread gets notified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this class is for one main thread to wait on any one of multiple worker threads. This check ensures that someone using this class doesn't get unexpected results if they try to wait from multiple threads.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a beneficial behavior?
If you did not use a BinarySemaphore but used a class cv + mutex, then there could be more waiting threads waiting while Worker threads could be notifying them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I used a mutex I'd have to lock the queue.push method and no output would be able to send while another watched queue is being sent to or while checking the predicate. I'd also have to add a lock for each waiting thread which would increase code complexity, which is not necessary since this accomplishes the goal of this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we have the following scenario - and correct me, if I am wrong.

  1. Multiple writers. One writer doesn't wait for a signal, it tries to push to a particular queue. But there may be multiple writers trying to push data to the same queue.
  2. Multiple readers on the particular queue. A reader is waiting to get a signal that the queue has any data available - and it reads the data and processes further.

What about this solution then:

  1. A worker thread (writer) that pushes data to a queue
    1.1. The thread locks the queue with queue's related queueMutex
    1.2. Data is pushed into queue
    1.3. The queue is unlocked with queue's related queueMutex
    1.4. A conditional variable dataReady is called to notify max. one thread

  2. A worker thread (reader) that waits for a data from a queue:
    2.1 The queue locks the queue with queue's related queueMutex
    2.2 Conditional variable dataReady->wait(queueMutex...) is called:
    2.2.1 Checks the predicate and if true, return
    2.2.2 Unlocks the queueMutex
    2.2.3 Waits for a signal from dataReady (with unlocked queueMutex)
    2.2.4 Locks the queueMutex and goes to 2.2.1
    2.2 The worker thread reads the data from the queue
    2.3 The worker thread unlocks the queueMutex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you're describing the way queue.get() works. This is a separate thing. Each message queue already has its own conditional variable and mutex that handles pushes and pops from the queue.

The issue I had for getAny is that there isn't a mechanism built in to only wait until one conditional variable notifies. To do this you need to pass a single condition variable instance to multiple queues and wait until one notifies.

So the way it works now is that each time you want to wait on any of a list of queues, a semaphore is created and added to every queue in the list. When a new message arrives to any of the queues (or when a queue is destroyed) the semaphore is notified, and the waiting thread gets woken up.

std::lock_guard<std::mutex> lk(mtx);
available = true;
}
cv.notify_one();
Copy link
Collaborator

@majvan majvan Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we talk about BinarySemaphore (only one waiting), then you can notify conditionally.
If a semaphore was already released and anybody would call "release", do not notify. Take it as an erroneous situation with no action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no waiting thread then there's practically no overhead to notifying, which would be the case if available was already true. If I conditionally notified, I'd have to do it under the lock which would mean that the waiting thread would wake up and then immediately block and wait for the mutex to be released which would cause two extra context switches.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you understood me. Let's make it clear.

This is one scenario I was thinking about:

    void release() {
        ASSERT(available == true, "There is a bug in the SW, a thread calls release while the semaphore is not acquired. We will not call notify_one");
        {
            std::lock_guard<std::mutex> lk(mtx);
            available = true;
        }
        cv.notify_one();
    }

Another one, without ASSERT but with

   void release() {
       bool was_available = false;
       {
           std::lock_guard<std::mutex> lk(mtx);
           was_available = available;
           available = true;
       }
       if (!was_available)
           cv.notify_one();  // call notify one only in an expected scenario.
   }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. notify_one has little overhead when there are no waiters but it certainly doesn't hurt to add this. Thanks

@asahtik asahtik added the testable PR is ready to be tested label Jan 26, 2026
@asahtik asahtik requested review from majvan and removed request for moratom January 26, 2026 15:18
@asahtik
Copy link
Contributor Author

asahtik commented Jan 27, 2026

@majvan Do you have any more concerns or can this be merged?

throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
}
try {
while(!pred()) semaphore.acquire();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues.

[1]
We check pred() and then (maybe) acquire a semaphore
This function has then two expected outcomes then:

  • the pred was fulfilled BUT NO semaphore acquired (the case when the pred() returns true the first time)
  • the pred was fulfilled AND semaphore acquired (the case when the pred() returns true the next time)
    The semaphore acquisition is not causally linked to the predicate becoming true.

[2]
Typically the Pred in the wait (like cv::wait) are used just because the whole pred() is running protected under the mutex. The reason is that the mutex ensures the atomicity of the operation together with cv's waiting cycle.

It's not a case here. If pred() is an unprotected function, it can be preempted by other thread which can access the state of the pred() - and this creates a potential RACE CONDITION.

Example:
// We need to ensure that the queue has data and that camera is on (an abstract example)
bool pred() {
if (!camera_on)
return false;
if (queue.empty)
return false;
return true;
}

Step 0 - initial state:
camera_on = true;
queue.empty = true;

Step 1 - ThreadA acquires a semaphore and continues:
gets into pred() and computes camera_on = true
-- preemptive context switch --

Step 2 - Thread C runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --

Step 3 - ThreadA continues:
continues the pred() and computes queue.empty = false and returns true
ManyToWaitNotifier::wait() returns but the state is:
camera_on = false; <-- RACE CONDITION: we wanted to wait only till camera is on

The predicate reads shared state (camera_on, queue.empty) without holding the mutex that protects it. This allows concurrent modification by other threads, resulting in undefined behavior and inconsistent observations of the system state.
In contrast, std::condition_variable::wait(lock, pred) guarantees that the predicate is evaluated while holding the mutex, making the check and sleep cycle atomic.

Another outcome of [2] is a possible deadlock:

Step 1: ThreadA - runs:
if (!pred())
semaphore.acquire(); // waits in the semaphore for the cv

Step 2: ThreadB - is about to release the semaphore:
{
std::lock(camera_and_queue)
queue.empty = false;
notifyOne(); -> semaphore.release(); // notify there is next some data, release the semaphore
}

Step 3: ThreadA - continues:
semaphore.acquire(); // finishes the semaphore acquiring
-- preemptive context switch --

Step 4: ThreadC - runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --

Step 5 - ThreadA continues:
if (!pred())
semaphore.acquire(); // DEADLOCK <-- semaphore count is zero and no further release will occur; ThreadA will get not awaken anymore

The semaphore release is not tied to the predicate remaining true.
A thread may wake, consume the semaphore, and then observe the predicate as false due to a concurrent state change. Since the semaphore signal has already been consumed and no further releases are guaranteed, the waiting thread may block forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second issue is documented:

Should always be called after the state change that the waiting thread is waiting for

(maybe the wording is not clear, should it be 'after any state change'?)

ThreadC should notify after setting camera_on to false.

The first issue is a good point. I'll document this limitation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constraint works for the deadlock, but these rules have to be followed.
And it brings additional unnecessary wakeups even in the case your state is going "against" the wakeup (why would you wakeup threads when you switch off the camera when you want them to work when the camera is switched on?)

Using std::condition_variable is much easier and exactly what you need here without reinventing it. :)

std::mutex inputsWaitMutex;
auto inputsWaitCv = std::make_shared<std::condition_variable>();
};
auto checkAllClosed = [&]() { return boost::algorithm::all_of(queues, [](const std::reference_wrapper<MessageQueue> q) { return q.get().isClosed(); }); };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no boost please!

if(gotAny) {
for(const auto& kv : queues) {
auto& input = kv.second;
if(!input.isClosed() && input.has()) inputs[kv.first] = input.get<ADatatype>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be this atomically checked and retrieved?
Get an input only when it is not closed and has data. Any modification and readout of the queue state should be mutex-ed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

testable PR is ready to be tested

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants