From 0f093b4c655c2dd06ad62862af2fb9231719f3f5 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Sun, 15 Feb 2026 15:55:33 +0000 Subject: [PATCH] Improved termination phase for thread teardown --- capiocl/monitor.h | 18 ++++++++---- src/monitors/Multicast.cpp | 60 ++++++++++++++++++++++++++++++++------ src/webapi.cpp | 2 ++ 3 files changed, 65 insertions(+), 15 deletions(-) diff --git a/capiocl/monitor.h b/capiocl/monitor.h index 2ab6b3d..04d6f4d 100644 --- a/capiocl/monitor.h +++ b/capiocl/monitor.h @@ -146,6 +146,12 @@ class MulticastMonitor final : public MonitorInterface { std::string MULTICAST_HOME_NODE_ADDR; + /// @brief variable to terminate execution + bool terminate = false; + + /// @brief Multicast poll timeout interval + static constexpr int MULTICAST_THREAD_POLL_INTERVAL = 250; + /** * @brief Multicast port number. */ @@ -183,10 +189,10 @@ class MulticastMonitor final : public MonitorInterface { * @param lock Mutex protecting shared access to committed_files. * @param ip_addr Multicast commit listen address. * @param ip_port Multicast commit listen port. + * @param terminate Boolean flag to terminate thread */ - [[noreturn]] static void commit_listener(std::vector &committed_files, - std::mutex &lock, const std::string &ip_addr, - int ip_port); + static void commit_listener(std::vector &committed_files, std::mutex &lock, + const std::string &ip_addr, int ip_port, const bool *terminate); /** * @brief Background thread function to listen for commit messages. @@ -200,9 +206,9 @@ class MulticastMonitor final : public MonitorInterface { * @param ip_addr Multicast home node listen address. * @param ip_port Multicast home node listen port. */ - [[noreturn]] static void - home_node_listener(std::unordered_map &home_nodes, std::mutex &lock, - const std::string &ip_addr, int ip_port); + static void home_node_listener(std::unordered_map &home_nodes, + std::mutex &lock, const std::string &ip_addr, int ip_port, + const bool *terminate); public: /** diff --git a/src/monitors/Multicast.cpp b/src/monitors/Multicast.cpp index 23705ff..29c05b3 100644 --- a/src/monitors/Multicast.cpp +++ b/src/monitors/Multicast.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include "capiocl.hpp" @@ -74,19 +75,37 @@ static int incoming_socket_multicast(const std::string &address_ip, const int po return _socket; } -[[noreturn]] void -capiocl::monitor::MulticastMonitor::commit_listener(std::vector &committed_files, - std::mutex &lock, const std::string &ip_addr, - const int ip_port) { +void capiocl::monitor::MulticastMonitor::commit_listener(std::vector &committed_files, + std::mutex &lock, + const std::string &ip_addr, + const int ip_port, const bool *terminate) { + pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); sockaddr_in addr_in = {}; socklen_t addr_len = {}; const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len); const auto addr = reinterpret_cast(&addr_in); char incoming_message[MESSAGE_SIZE] = {0}; + // Polling for non blocking + pollfd pfd = {}; + pfd.fd = socket; + pfd.events = POLLIN | POLLPRI; + do { bzero(incoming_message, sizeof(incoming_message)); + // TODO: migrate to epoll for linux and kqueue on MacOS + if (poll(&pfd, 1, MULTICAST_THREAD_POLL_INTERVAL) == 0) { + // No data from incoming socket. Continue, awaking thread ensuring pthread_cancel points + // can be reached + if (*terminate) { + close(socket); + return; + } + + continue; + } + // LCOV_EXCL_START if (recvfrom(socket, incoming_message, MESSAGE_SIZE, 0, addr, &addr_len) < 0) { continue; @@ -113,9 +132,11 @@ capiocl::monitor::MulticastMonitor::commit_listener(std::vector &co } while (true); } -[[noreturn]] void capiocl::monitor::MulticastMonitor::home_node_listener( +void capiocl::monitor::MulticastMonitor::home_node_listener( std::unordered_map &home_nodes, std::mutex &lock, - const std::string &ip_addr, int ip_port) { + const std::string &ip_addr, int ip_port, const bool *terminate) { + pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + char this_hostname[HOST_NAME_MAX] = {}; gethostname(this_hostname, HOST_NAME_MAX); @@ -129,6 +150,23 @@ capiocl::monitor::MulticastMonitor::commit_listener(std::vector &co do { bzero(incoming_message, sizeof(incoming_message)); + // Polling for non blocking + pollfd pfd = {}; + pfd.fd = socket; + pfd.events = POLLIN | POLLPRI; + + // TODO: migrate to epoll for linux and kqueue on MacOS + if (poll(&pfd, 1, MULTICAST_THREAD_POLL_INTERVAL) == 0) { + // No data from incoming socket. Continue, awaking thread ensuring pthread_cancel points + // can be reached + if (*terminate) { + close(socket); + return; + } + + continue; + } + // LCOV_EXCL_START if (recvfrom(socket, incoming_message, MESSAGE_SIZE, 0, addr, &addr_len) < 0) { continue; @@ -193,19 +231,23 @@ capiocl::monitor::MulticastMonitor::MulticastMonitor( commit_thread = std::thread(&commit_listener, std::ref(_committed_files), std::ref(committed_lock), - MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT); + MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT, &this->terminate); home_node_thread = std::thread(&home_node_listener, std::ref(_home_nodes), std::ref(home_node_lock), - MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT); + MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT, &this->terminate); gethostname(_hostname, HOST_NAME_MAX); } capiocl::monitor::MulticastMonitor::~MulticastMonitor() { + + terminate = true; + pthread_cancel(commit_thread.native_handle()); - pthread_cancel(home_node_thread.native_handle()); commit_thread.join(); + + pthread_cancel(home_node_thread.native_handle()); home_node_thread.join(); } diff --git a/src/webapi.cpp b/src/webapi.cpp index 5542911..a401da2 100644 --- a/src/webapi.cpp +++ b/src/webapi.cpp @@ -50,6 +50,8 @@ void process_get_request(const Req &req, Res &res, Fn &&handler) { /// @brief Main WebServer thread function void server(const std::string &address, const int port, capiocl::engine::Engine *engine) { + pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + capiocl::printer::print(capiocl::printer::CLI_LEVEL_INFO, "Starting API server @ " + address + ":" + std::to_string(port));