Conversation
…; map negative value to single worker thread
…#103) * Initial plan * Add error handling to doJob() to prevent silent work dropping Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Complete implementation of doJob() error handling Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com>
* Initial plan * Remove unused _threadMapMutex and _threads members Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
* Initial plan * Remove unused EndThread() function from Threading interface Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Remove build artifacts that were accidentally committed Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com>
* Initial plan * Add progressive backoff to wait_for_available_thread to reduce CPU usage Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Cap attempts counter to prevent overflow in wait_for_available_thread Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Simplify wait_for_available_thread by removing unnecessary overflow check Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Add counter cap to prevent overflow in wait_for_available_thread Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Clarify comment for counter cap in wait_for_available_thread Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Use std::min for cleaner counter cap in wait_for_available_thread Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Simplify counter logic by removing unnecessary cap Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Make backoff constants static constexpr for clarity Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Final progress update Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> * Remove build artifacts that shouldn't be committed Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: jke000 <9449681+jke000@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Migrates Comet’s internal threading implementation to use BS::thread_pool and C++ standard library threading primitives, reducing OS-specific code paths and standardizing builds on C++20.
Changes:
- Replaced custom thread pool implementation with
BS::thread_poolwrapper while preserving existing call sites. - Reworked mutex/semaphore/thread abstractions to use
std::mutex,std::condition_variable, andstd::thread. - Updated build tooling (Makefiles + VS projects + CI) to target C++20 and newer MSVC toolset.
Reviewed changes
Copilot reviewed 24 out of 25 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| _codeql_detected_source_root | Adds CodeQL source-root marker file. |
| NOTICE | Removes standalone third-party notice file content. |
| Makefile | Bumps C++ standard to C++20; adds BS_thread_pool.hpp dependency. |
| MSToolkit/VisualStudio/extern/libexpat.vcxproj | Sets MSVC language standard to C++20. |
| MSToolkit/VisualStudio/MSToolkit.vcxproj | Sets MSVC language standard to C++20. |
| LICENSE | Consolidates third-party component notices/licenses; adds RawFileReader + BS::thread_pool attribution. |
| CometWrapper/CometWrapper.vcxproj | Sets MSVC language standard to C++20. |
| CometSearch/Threading.h | Renames/init APIs and documents migration to std threading. |
| CometSearch/Threading.cpp | Replaces POSIX/Win32 branching with std threading primitives. |
| CometSearch/ThreadPool.h | Replaces custom thread pool with a compatibility wrapper around BS::thread_pool. |
| CometSearch/OSSpecificThreading.h | Replaces OS-specific typedefs with std equivalents and a C++ semaphore struct. |
| CometSearch/Makefile | Bumps to C++20 and adds BS_thread_pool.hpp as a build dependency. |
| CometSearch/CometWritePepXML.cpp | Formatting tweaks; adds explicit flushing during XML output. |
| CometSearch/CometStatus.h | Updates mutex init callsite to new API. |
| CometSearch/CometSearchManager.cpp | Updates mutex init callsites; removes debug mass-check block. |
| CometSearch/CometSearch.vcxproj.filters | Adds BS_thread_pool.hpp to VS filters. |
| CometSearch/CometSearch.vcxproj | Adds BS_thread_pool.hpp; sets C++20; adds MSVC options/defines. |
| CometSearch/CometSearch.cpp | Adapts throttling + memory-pool waiting to new thread pool behavior. |
| CometSearch/CometPreprocess.cpp | Updates mutex init callsite to new API. |
| CometSearch/CometFragmentIndex.cpp | Updates mutex init callsite to new API. |
| CometSearch/CometDataInternal.h | Updates mutex init callsites to new API. |
| CometSearch/BS_thread_pool.hpp | Vendors the BS::thread_pool single-header implementation. |
| Comet.vcxproj | Sets C++20; adds _HAS_STD_BYTE=0. |
| AScorePro/AScorePro.vcxproj | Sets MSVC language standard to C++20. |
| .github/workflows/windows-build.yml | Updates CI to build with MSVC toolset v143. |
Comments suppressed due to low confidence (1)
_codeql_detected_source_root:1
_codeql_detected_source_rootis typically a tool-generated artifact used by CodeQL. If it is not intentionally required for your repo’s CodeQL configuration, it’s better not to commit it (add to.gitignore) to avoid churn and accidental source-root changes.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| ThreadPool(int threads) : shutdown_(false) | ||
| ThreadPool(int threads) : pool_(nullptr), thread_count_(threads) |
There was a problem hiding this comment.
thread_count_ is a size_t, but threads is an int that (per the new API contract) may be negative. Initializing thread_count_(threads) will wrap negative values into a very large size_t. Initialize thread_count_ to 0 (or another safe value) in the initializer list and let fillPool() set the final count after normalization, or change the stored type to a signed integer if negatives are meaningful pre-normalization.
| ThreadPool(int threads) : pool_(nullptr), thread_count_(threads) | |
| ThreadPool(int threads) : pool_(nullptr), thread_count_(0) |
| // Expose jobs_ member to maintain compatibility with code that checks queue size | ||
| struct jobs_proxy | ||
| { | ||
| tp->LOCK(&tp->lock_); | ||
| jobs_proxy(ThreadPool* tp) : tp_(tp) {} | ||
|
|
||
| while (! tp->shutdown_ && tp->jobs_.empty()) | ||
| size_t size() const | ||
| { | ||
| tp->UNLOCK(&tp->lock_); | ||
|
|
||
| #ifdef _WIN32 | ||
| SwitchToThread(); | ||
| #else | ||
| sched_yield(); | ||
| #endif | ||
| tp->LOCK(&tp->lock_); | ||
| if (tp->threads_.size() == 0) | ||
| { | ||
| tp->UNLOCK(&tp->lock_); | ||
|
|
||
| #ifdef _WIN32 | ||
| return 1; | ||
| #else | ||
| return NULL; | ||
| #endif | ||
| } | ||
|
|
||
| //sleep some before reacquiring lock | ||
| //Threading::ThreadSleep(100); | ||
| if (did_job) | ||
| { | ||
| tp->decrementRunningCount(); | ||
| did_job = false; | ||
| } | ||
| if (!tp_ || !tp_->pool_) | ||
| return 0; | ||
| return tp_->pool_->get_tasks_queued(); | ||
| } | ||
|
|
||
| if (tp->jobs_.empty ()) | ||
| { | ||
| // No jobs to do and we are shutting down | ||
| if (VERBOSE) | ||
| std::cerr << "Thread " << i << " terminates" << std::endl; | ||
|
|
||
| tp->UNLOCK(&tp->lock_); | ||
| #ifdef _WIN32 | ||
| return 1; | ||
| #else | ||
| return NULL; | ||
| #endif | ||
| break; | ||
| } | ||
| else | ||
| bool empty() const | ||
| { | ||
| //std::cerr << "Thread " << i << " does a job" << std::endl; | ||
| job = std::move (tp->jobs_.front ()); | ||
| tp->jobs_.pop_front(); | ||
| return size() == 0; | ||
| } | ||
|
|
||
| if (!did_job) | ||
| tp->incrementRunningCount(); | ||
| private: | ||
| ThreadPool* tp_; | ||
| }; | ||
|
|
||
| tp->UNLOCK(&tp->lock_); | ||
| // Do the job without holding any locks | ||
| try | ||
| { | ||
| job(); | ||
| did_job = true; | ||
| } | ||
| catch (std::exception& e) | ||
| { | ||
| cerr << "WARNING: running job exception ... " << e.what() << " ... exiting ... " << endl; | ||
| #ifdef _WIN32 | ||
| return 1; | ||
| #else | ||
| return NULL; | ||
| #endif | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| jobs_proxy jobs_{ this }; |
There was a problem hiding this comment.
jobs_proxy stores a raw ThreadPool* pointing at this. Since ThreadPool currently has implicitly-generated move operations (it contains a movable std::unique_ptr), moving a ThreadPool will leave jobs_.tp_ pointing at the old object address (dangling), causing undefined behavior when jobs_.size() is called. Fix by explicitly deleting copy/move constructors and assignments, or implement a custom move constructor/assignment that resets jobs_ to point at the new this.
| void Threading::BeginThread(ThreadProc pFunction, void* arg, ThreadId* pThreadId) | ||
| { | ||
| _threadId = *pThreadId; | ||
| pthread_create(pThreadId, NULL, pFunction, arg); | ||
| } | ||
|
|
||
| void Threading::EndThread() | ||
| { | ||
| pthread_exit((void*)&_threadId); | ||
| } | ||
|
|
||
| void Threading::ThreadSleep(unsigned long dwMilliseconds) | ||
| { | ||
| // usleep(dwMilliseconds); // usleep deprecated | ||
| struct timespec ts; | ||
| ts.tv_sec = dwMilliseconds / 1000; | ||
| ts.tv_nsec = (dwMilliseconds % 1000) * 1000000; | ||
| nanosleep(&ts, NULL); | ||
| } | ||
|
|
||
| void Threading::CreateSemaphore(Semaphore* pSem) | ||
| { | ||
| pthread_cond_init(&(pSem->condition), NULL); | ||
| pthread_mutex_init(&(pSem->mutex), NULL); | ||
| pSem->conditionSet = false; | ||
| } | ||
|
|
||
| void Threading::WaitSemaphore(Semaphore& sem) | ||
| { | ||
| pthread_mutex_lock(&sem.mutex); | ||
| while (!(sem.conditionSet)) | ||
| { | ||
| pthread_cond_wait(&sem.condition, &sem.mutex); | ||
| } | ||
| sem.conditionSet = false; | ||
| pthread_mutex_unlock(&sem.mutex); | ||
| } | ||
|
|
||
| void Threading::SignalSemaphore(Semaphore& sem) | ||
| { | ||
| pthread_mutex_lock(&sem.mutex); | ||
| sem.conditionSet = true; | ||
| pthread_cond_signal(&sem.condition); | ||
| pthread_mutex_unlock(&sem.mutex); | ||
| } | ||
|
|
||
| void Threading::DestroySemaphore(Semaphore& sem) | ||
| { | ||
| pthread_cond_destroy(&sem.condition); | ||
| pthread_mutex_destroy(&sem.mutex); | ||
| } | ||
|
|
||
| #else // _WIN32 | ||
| #include <process.h> | ||
|
|
||
| ////////////////////////////////////////////////////////////////////// | ||
| // Implementations for Threading base class specific to the WIN32 OS | ||
| ////////////////////////////////////////////////////////////////////// | ||
|
|
||
| Threading::Threading() | ||
| { | ||
| } | ||
| // Create a new thread | ||
| auto threadPtr = std::make_unique<std::thread>([pFunction, arg]() { | ||
| // Execute the thread procedure | ||
| pFunction(arg); | ||
| }); | ||
|
|
||
| Threading::~Threading() | ||
| { | ||
| } | ||
| // Get the thread ID before moving the thread | ||
| ThreadId newThreadId = threadPtr->get_id(); | ||
|
|
||
| bool Threading::CreateMutex(Mutex* pMutex) | ||
| { | ||
| InitializeCriticalSection(pMutex); | ||
| return (pMutex!=NULL); | ||
| } | ||
| // Store the thread ID | ||
| if (pThreadId != nullptr) | ||
| { | ||
| *pThreadId = newThreadId; | ||
| } | ||
| _threadId = newThreadId; | ||
|
|
||
| void Threading::LockMutex(Mutex& mutex) | ||
| { | ||
| EnterCriticalSection(&mutex); | ||
| } | ||
|
|
||
| void Threading::UnlockMutex(Mutex& mutex) | ||
| { | ||
| LeaveCriticalSection(&mutex); | ||
| } | ||
|
|
||
| void Threading::DestroyMutex(Mutex& mutex) | ||
| { | ||
| DeleteCriticalSection(&mutex); | ||
| } | ||
|
|
||
| void Threading::BeginThread(ThreadProc pFunction, void* arg, ThreadId* pThreadId) | ||
| { | ||
| _threadId = *pThreadId; | ||
| _beginthreadex (NULL, | ||
| 0, | ||
| (unsigned int(__stdcall*) ( void*)) pFunction, | ||
| (void*) arg, | ||
| 0, | ||
| pThreadId); | ||
| } | ||
|
|
||
| void Threading::EndThread() | ||
| { | ||
| _endthreadex(0); | ||
| // Detach the thread to allow independent execution | ||
| threadPtr->detach(); | ||
| } |
There was a problem hiding this comment.
The heap allocation (std::make_unique<std::thread>) is unnecessary here since the thread is immediately detached. This can be simplified to a stack-allocated std::thread followed by detach(), which reduces indirection and makes lifetime clearer.
Use BS:thread_pool in lieu of custom thread pool code. Replace POSIX threads (linux) with C++ threads.