Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions capiocl/monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class MulticastMonitor final : public MonitorInterface {

std::string MULTICAST_HOME_NODE_ADDR;

/// @brief variable to terminate execution
bool terminate = false;

/// @brief Multicast poll timeout interval
static constexpr int MULTICAST_THREAD_POLL_INTERVAL = 250;

/**
* @brief Multicast port number.
*/
Expand Down Expand Up @@ -183,10 +189,10 @@ class MulticastMonitor final : public MonitorInterface {
* @param lock Mutex protecting shared access to committed_files.
* @param ip_addr Multicast commit listen address.
* @param ip_port Multicast commit listen port.
* @param terminate Boolean flag to terminate thread
*/
[[noreturn]] static void commit_listener(std::vector<std::string> &committed_files,
std::mutex &lock, const std::string &ip_addr,
int ip_port);
static void commit_listener(std::vector<std::string> &committed_files, std::mutex &lock,
const std::string &ip_addr, int ip_port, const bool *terminate);

/**
* @brief Background thread function to listen for commit messages.
Expand All @@ -200,9 +206,9 @@ class MulticastMonitor final : public MonitorInterface {
* @param ip_addr Multicast home node listen address.
* @param ip_port Multicast home node listen port.
*/
[[noreturn]] static void
home_node_listener(std::unordered_map<std::string, std::string> &home_nodes, std::mutex &lock,
const std::string &ip_addr, int ip_port);
static void home_node_listener(std::unordered_map<std::string, std::string> &home_nodes,
std::mutex &lock, const std::string &ip_addr, int ip_port,
const bool *terminate);

public:
/**
Expand Down
60 changes: 51 additions & 9 deletions src/monitors/Multicast.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>
#include <sys/socket.h>

#include "capiocl.hpp"
Expand Down Expand Up @@ -74,19 +75,37 @@ static int incoming_socket_multicast(const std::string &address_ip, const int po
return _socket;
}

[[noreturn]] void
capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &committed_files,
std::mutex &lock, const std::string &ip_addr,
const int ip_port) {
void capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &committed_files,
std::mutex &lock,
const std::string &ip_addr,
const int ip_port, const bool *terminate) {
pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
sockaddr_in addr_in = {};
socklen_t addr_len = {};
const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len);
const auto addr = reinterpret_cast<sockaddr *>(&addr_in);
char incoming_message[MESSAGE_SIZE] = {0};

// Polling for non blocking
pollfd pfd = {};
pfd.fd = socket;
pfd.events = POLLIN | POLLPRI;

do {
bzero(incoming_message, sizeof(incoming_message));

// TODO: migrate to epoll for linux and kqueue on MacOS
if (poll(&pfd, 1, MULTICAST_THREAD_POLL_INTERVAL) == 0) {
// No data from incoming socket. Continue, awaking thread ensuring pthread_cancel points
// can be reached
if (*terminate) {
close(socket);
return;
}

continue;
}

// LCOV_EXCL_START
if (recvfrom(socket, incoming_message, MESSAGE_SIZE, 0, addr, &addr_len) < 0) {
continue;
Expand All @@ -113,9 +132,11 @@ capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &co
} while (true);
}

[[noreturn]] void capiocl::monitor::MulticastMonitor::home_node_listener(
void capiocl::monitor::MulticastMonitor::home_node_listener(
std::unordered_map<std::string, std::string> &home_nodes, std::mutex &lock,
const std::string &ip_addr, int ip_port) {
const std::string &ip_addr, int ip_port, const bool *terminate) {
pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);

char this_hostname[HOST_NAME_MAX] = {};
gethostname(this_hostname, HOST_NAME_MAX);

Expand All @@ -129,6 +150,23 @@ capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &co
do {
bzero(incoming_message, sizeof(incoming_message));

// Polling for non blocking
pollfd pfd = {};
pfd.fd = socket;
pfd.events = POLLIN | POLLPRI;

// TODO: migrate to epoll for linux and kqueue on MacOS
if (poll(&pfd, 1, MULTICAST_THREAD_POLL_INTERVAL) == 0) {
// No data from incoming socket. Continue, awaking thread ensuring pthread_cancel points
// can be reached
if (*terminate) {
close(socket);
return;
}

continue;
}

// LCOV_EXCL_START
if (recvfrom(socket, incoming_message, MESSAGE_SIZE, 0, addr, &addr_len) < 0) {
continue;
Expand Down Expand Up @@ -193,19 +231,23 @@ capiocl::monitor::MulticastMonitor::MulticastMonitor(

commit_thread =
std::thread(&commit_listener, std::ref(_committed_files), std::ref(committed_lock),
MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT);
MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT, &this->terminate);

home_node_thread =
std::thread(&home_node_listener, std::ref(_home_nodes), std::ref(home_node_lock),
MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT);
MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT, &this->terminate);

gethostname(_hostname, HOST_NAME_MAX);
}

capiocl::monitor::MulticastMonitor::~MulticastMonitor() {

terminate = true;

pthread_cancel(commit_thread.native_handle());
pthread_cancel(home_node_thread.native_handle());
commit_thread.join();

pthread_cancel(home_node_thread.native_handle());
home_node_thread.join();
}

Expand Down
2 changes: 2 additions & 0 deletions src/webapi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ void process_get_request(const Req &req, Res &res, Fn &&handler) {
/// @brief Main WebServer thread function
void server(const std::string &address, const int port, capiocl::engine::Engine *engine) {

pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);

capiocl::printer::print(capiocl::printer::CLI_LEVEL_INFO,
"Starting API server @ " + address + ":" + std::to_string(port));

Expand Down
Loading