From 0927bd88f040a3868042c521b00f0d973f17f7de Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Thu, 14 Nov 2024 11:29:55 +0000 Subject: [PATCH 1/7] Add Ready function to Services(Backend) to check for connections to backend sockets --- src/ServiceDiscovery/Services.cpp | 4 ++++ src/ServiceDiscovery/Services.h | 1 + src/ServiceDiscovery/ServicesBackend.cpp | 24 ++++++++++++++++++++++++ src/ServiceDiscovery/ServicesBackend.h | 1 + 4 files changed, 30 insertions(+) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index a4be2fc..7e8cca1 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -42,6 +42,10 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC return true; } +bool Services::Ready(const unsigned int timeout){ + return m_backend_client.Ready(timeout); +} + // =========================================================================== // Write Functions // --------------- diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index 86c6b5f..ffef3d4 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -38,6 +38,7 @@ namespace ToolFramework { Services(); ~Services(); bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service=false); + bool Ready(const unsigned int timeout=10000); // default service discovery broadcast period is 5s, middleman also checks intermittently, compound total time should be <10s... bool SQLQuery(const std::string& database, const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SQLQuery(const std::string& database, const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index aff94e2..47e0e67 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -936,3 +936,27 @@ bool ServicesBackend::Receive(zmq::socket_t* sock, std::vector& return true; } +bool ServicesBackend::Ready(int timeout){ + + // poll the output sockets for listeners + int ret; + try { + ret = zmq::poll(out_polls.data(), out_polls.size(), timeout); + } catch (zmq::error_t& err){ + std::cerr<<"ServicesBackend::Ready caught "< log=nullptr); // possibly move to constructor + bool Ready(int timeout); // check if zmq sockets have connections bool Initialise(std::string configfile); bool Initialise(Store &varaibles_in); bool Finalise(); From 0cfbe771adbcb809c2eec3b1333fea431de462cb Mon Sep 17 00:00:00 2001 From: root Date: Thu, 28 Nov 2024 12:47:21 +0000 Subject: [PATCH 2/7] don't poll PUB socket as it always returns true immediately, making the timeout ineffective --- src/ServiceDiscovery/ServicesBackend.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index 47e0e67..d560f12 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -939,9 +939,11 @@ bool ServicesBackend::Receive(zmq::socket_t* sock, std::vector& bool ServicesBackend::Ready(int timeout){ // poll the output sockets for listeners + // only poll dealer socket, pub sockets always return true immediately so ignore the timeout + // polling the input socket checks for a message, so don't do that. int ret; try { - ret = zmq::poll(out_polls.data(), out_polls.size(), timeout); + ret = zmq::poll(&out_polls.at(1), 1, timeout); } catch (zmq::error_t& err){ std::cerr<<"ServicesBackend::Ready caught "< Date: Thu, 28 Nov 2024 18:45:37 +0000 Subject: [PATCH 3/7] need mutex lock as socket is not thread safe --- src/ServiceDiscovery/ServicesBackend.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index d560f12..3fbe98f 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -943,7 +943,9 @@ bool ServicesBackend::Ready(int timeout){ // polling the input socket checks for a message, so don't do that. int ret; try { + dlr_socket_mutex.lock(); ret = zmq::poll(&out_polls.at(1), 1, timeout); + dlr_socket_mutex.unlock(); } catch (zmq::error_t& err){ std::cerr<<"ServicesBackend::Ready caught "< Date: Thu, 14 Nov 2024 19:47:32 +0000 Subject: [PATCH 4/7] replace sleep on construction with poll on services backend socket --- src/ServiceDiscovery/Services.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 7e8cca1..e952e78 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -27,17 +27,16 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC if(!m_variables.Get("service_name",m_name)) m_name="test_service"; if(!m_variables.Get("db_name",m_dbname)) m_dbname="daq"; - - if(!m_backend_client.Initialise(m_variables)){ std::clog<<"error initialising slowcontrol client"< Date: Fri, 15 Nov 2024 01:01:40 +0000 Subject: [PATCH 5/7] simplify ServicesBackend by not running call to DoCommand in a separate thread --- src/ServiceDiscovery/ServicesBackend.cpp | 114 ++++++++++------------- src/ServiceDiscovery/ServicesBackend.h | 2 +- 2 files changed, 51 insertions(+), 65 deletions(-) diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index 3fbe98f..16e48f6 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -384,15 +384,6 @@ bool ServicesBackend::SendMulticast(std::string command, std::string* err){ return true; } -////////////////////////////////////////////////////////////////////////////////////////////////////////// -// FIXME ok, i think the combination of SendCommand and DoCommand are an over-complication. // -// SendCommand is basically a pointless wrapper that spawns a thread to do what it could do itself. // -// This would remove the surplus promise/future layer. The only change to DoCommand required // -// would be that the timeouts from each stage (wait for send, wait for reply) // -// would need to be appropriately modified -.e.g if total timeout is 300ms and it takes 100ms to send, // -// then we would only wait for 200ms for the reply. // -////////////////////////////////////////////////////////////////////////////////////////////////////////// - bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const unsigned int* timeout_ms, std::string* err){ // send a command and receive response. // This is a wrapper that ensures we always return within the requested timeout. @@ -428,50 +419,18 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c // (for now) go over a dealer/router combination that cannot filter on the topic. Command cmd{command, type, topic.substr(2,std::string::npos)}; - // submit the command asynchrously. - // This way we have control over how long we wait for the response - // The response will be a Command object with remaining members populated. - //std::future response = std::async(std::launch::async, &ServicesBackend::DoCommand, this, cmd); - // std::async returns a std::future that will block on destruction until the promise returns. - // if we don't want that to happen, i.e. we want to abandon it if it times out, - // we instead need to obtain a future from a promise (which is somehow not blocking?), - // and run our code in a detached thread, using the promise to pass back the result. - // tbh i don't quite get why this is different but there we go. - // see https://stackoverflow.com/a/23454840/3544936 and https://stackoverflow.com/a/23460094/3544936 - std::promise returnval; - std::future response = returnval.get_future(); - if(verbosity>10) std::cout<<"ServicesBackend::SendCommand spinning up new thread"<10) std::cout<<"ServicesBackend::SendCommand waiting for response"<10) std::cout<<"ServicesBackend::SendCommand fetching response"<10) std::cout<<"ServicesBackend::SendCommand response is "<3) std::cerr<10) std::cout<<"ServicesBackend::SendCommand calling DoCommand"<10) std::cout<<"ServicesBackend::SendCommand response is "< result){ +bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ if(verbosity>10) std::cout<<"ServicesBackend::DoCommand received command"< result){ uint32_t thismsgid = ++msg_id; cmd.msg_id = thismsgid; + // we submit the command asynchrously. + // This way we control how long we wait for the response, ensuring we don't block the caller indefinitely. + // The response will be a Command object with remaining members populated. + // initial guess would have been to use: + //std::future response = std::async(std::launch::async, &ServicesBackend::DoCommand, this, cmd); + // but std::async returns a std::future that will block on destruction until the function returns. + // if we don't want that to happen, i.e. we want to abandon it if it times out, + // we instead need to obtain a future from a promise (which is somehow not blocking?), + // and run our code in a detached thread, using the promise to pass back the result. + // tbh i don't quite get why this is different but there we go. + // see https://stackoverflow.com/a/23454840/3544936 and https://stackoverflow.com/a/23460094/3544936 + // zmq sockets aren't thread-safe, so we have one central sender. // we submit our command and keep a ticket to retrieve the return status on completion. // similarly, the next response received may not be for us, so a central dealer receives @@ -527,9 +498,27 @@ bool ServicesBackend::DoCommand(Command cmd, std::promise result){ waiting_senders.emplace(cmd, std::move(send_ticket)); send_queue_mutex.unlock(); - // wait for our number to come up. loooong timeout, but don't hang forever. if(verbosity>10) std::cout<<"ServicesBackend::DoCommand waiting for send confirmation"<(send_end - send_start).count(); + timeout_ms -= send_time_ms; + // juuuust in case, ensure our remaining time is not negative. :) + if(timeout_ms<0) timedout=true; + } + + // did we get a response in time? + if(timedout){ if(verbosity>10) std::cerr<<"ServicesBackend::DoCommand timeout"< result){ Log("Timed out sending command "+std::to_string(thismsgid),v_warning,verbosity); cmd.success = false; cmd.err = "Timed out sending command"; - result.set_value(cmd); - // since we are giving up waiting for the response, remove ourselves from - // the list of waiting recipients + // since we are giving up waiting for the response, remove ourselves + // from the list of recipients awaiting response if(verbosity>10) std::cout<<"ServicesBackend::DoCommand de-registering for response on id "<10) std::cout<<"ServicesBackend::DoCommand got send confirmation"< result){ Log(errmsg,v_debug,verbosity); cmd.success = false; cmd.err = errmsg; - result.set_value(cmd); // since the send failed we don't expect a response, so remove ourselves // from the list of recipients awaiting response @@ -581,7 +568,7 @@ bool ServicesBackend::DoCommand(Command cmd, std::promise result){ // if we succeeded in sending the message, we now need to wait for a repsonse. if(verbosity>10) std::cout<<"ServicesBackend::DoCommand waiting for response"<10) std::cout<<"ServicesBackend::DoCommand response timeout"< result){ Log("Timed out waiting for response for command "+std::to_string(thismsgid),v_warning,verbosity); cmd.success = false; cmd.err = "Timed out waiting for response"; - result.set_value(cmd); return false; } else { // got a response! if(verbosity>10) std::cout<<"ServicesBackend::DoCommand got a response for command "<10) std::cout<<"ServicesBackend::GetNextResponse had response in socket"<); //private + bool DoCommand(Command& cmd, int timeout_ms); //private // actual send/receive functions bool SendNextCommand(); //private bool GetNextResponse(); //priavte From 5ef89828b9c5790b8f46cda0d2290bfc61129b83 Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Tue, 19 Nov 2024 16:38:46 +0000 Subject: [PATCH 6/7] Propagate timeout of ServicesBackend::SendCommand to underlying PollAndSend so that poll time is timeout (rather than class member). Could be problematic as it means a user calling SendCommand with a long timeout could cause backend to get stuck, but only if the poll fails (in which case it can't do anything anyway)... --- src/ServiceDiscovery/Services.cpp | 4 ++- src/ServiceDiscovery/Services.h | 1 + src/ServiceDiscovery/ServicesBackend.cpp | 31 ++++++++++++++++-------- src/ServiceDiscovery/ServicesBackend.h | 13 +++++++++- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index e952e78..05cc7e1 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -265,7 +265,8 @@ bool Services::GetDeviceConfig(std::string& json_data, const int version, const // get a run configuration via configuration ID bool Services::GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout){ - + + printf("GetRunConfig with timeout %d\n",timeout); json_data=""; std::string cmd_string = "{ \"config_id\":"+std::to_string(config_id) + "}"; @@ -343,6 +344,7 @@ bool Services::GetRunConfig(std::string& json_data, const std::string& name, con bool Services::GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device, int* version, unsigned int timeout){ + printf("GetRunDeviceConfig with timeout %d\n",timeout); json_data=""; const std::string& name = (device=="") ? m_name : device; diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index ffef3d4..c764dbb 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -30,6 +30,7 @@ namespace ToolFramework { Store info; }; + class Services{ diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index 16e48f6..a0e6b9e 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -2,7 +2,7 @@ using namespace ToolFramework; -Command::Command(std::string command_in, char type_in, std::string topic_in){ +Command::Command(std::string command_in, char type_in, std::string topic_in, const unsigned int timeout_ms_in){ command = command_in; type = type_in; topic=topic_in; @@ -10,6 +10,7 @@ Command::Command(std::string command_in, char type_in, std::string topic_in){ response=std::vector{}; err=""; msg_id=0; + timeout_ms=timeout_ms_in; } Command::Command(){ @@ -20,12 +21,14 @@ Command::Command(){ response=std::vector{}; err=""; msg_id=0; + timeout_ms=0; } void Command::Print() const { std::cout<<"command="<{}; + cmd_in.timeout_ms=0; } Command& Command::operator=(Command&& cmd_in){ @@ -70,12 +76,14 @@ Command& Command::operator=(Command&& cmd_in){ response = std::move(cmd_in.response); err = cmd_in.err; msg_id = cmd_in.msg_id; + timeout_ms = cmd_in.timeout_ms; cmd_in.command=""; cmd_in.type='\0'; cmd_in.topic=""; cmd_in.success=0; cmd_in.msg_id=0; cmd_in.response=std::vector{}; + cmd_in.timeout_ms=0; } return *this; } @@ -389,6 +397,9 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c // This is a wrapper that ensures we always return within the requested timeout. if(verbosity>10) std::cout<<"ServicesBackend::SendCommand invoked with command '"<10) std::cout<<"ServicesBackend::DoCommand received command"<10) std::cout<<"ServicesBackend::DoCommand got send confirmation"<10) std::cout<<"ServicesBackend::SendNextCommand calling PollAndSend" <<", message type: "<10) std::cout<<"ServicesBackend::SendNextCommand send returned "<(message)); send_ok = success ? 0 : -1; @@ -226,15 +232,20 @@ class ServicesBackend { int send_ok = 0; // check for listener int ret = 0; + printf("polling out socket for %d ms\n",timeout); try { ret = zmq::poll(&poll, 1, timeout); } catch (zmq::error_t& err){ std::cerr<<"ServicesBackend::PollAndSend poller caught "<(message), std::forward(rest)...); send_ok = success ? 0 : -1; From 32702671560a76312738956fb8b1af62232b3657 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 28 Nov 2024 13:03:45 +0000 Subject: [PATCH 7/7] remove printfs --- src/ServiceDiscovery/ServiceDiscovery.cpp | 2 +- src/ServiceDiscovery/Services.cpp | 7 +------ src/ServiceDiscovery/Services.h | 1 - src/ServiceDiscovery/ServicesBackend.cpp | 7 ------- src/ServiceDiscovery/ServicesBackend.h | 4 ---- 5 files changed, 2 insertions(+), 19 deletions(-) diff --git a/src/ServiceDiscovery/ServiceDiscovery.cpp b/src/ServiceDiscovery/ServiceDiscovery.cpp index 5ea9601..86cfe1d 100644 --- a/src/ServiceDiscovery/ServiceDiscovery.cpp +++ b/src/ServiceDiscovery/ServiceDiscovery.cpp @@ -446,7 +446,7 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreq, sizeof(mreq)) < 0) { perror("setsockopt mreq"); - printf("Failed to goin multicast group listen thread"); + printf("Failed to join multicast group listen thread"); exit(1); } diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 05cc7e1..70e3eca 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -265,8 +265,7 @@ bool Services::GetDeviceConfig(std::string& json_data, const int version, const // get a run configuration via configuration ID bool Services::GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout){ - - printf("GetRunConfig with timeout %d\n",timeout); + json_data=""; std::string cmd_string = "{ \"config_id\":"+std::to_string(config_id) + "}"; @@ -344,7 +343,6 @@ bool Services::GetRunConfig(std::string& json_data, const std::string& name, con bool Services::GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device, int* version, unsigned int timeout){ - printf("GetRunDeviceConfig with timeout %d\n",timeout); json_data=""; const std::string& name = (device=="") ? m_name : device; @@ -361,14 +359,11 @@ bool Services::GetRunDeviceConfig(std::string& json_data, const int runconfig_id return false; } -printf("GetRunConfig from GetRunDev returned: '%s'\n",run_config.c_str()); // 2. extract the device's configuration id Store tmp; tmp.JsonParser(run_config); int device_config_id; get_ok = tmp.Get(name, device_config_id); -printf("tmp store contains:\n"); -tmp.Print(); if(!get_ok){ std::string err= "GetRunDeviceConfig error getting device config; device '"+name diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index c764dbb..ffef3d4 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -30,7 +30,6 @@ namespace ToolFramework { Store info; }; - class Services{ diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index a0e6b9e..2b609a0 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -430,7 +430,6 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c // (for now) go over a dealer/router combination that cannot filter on the topic. // forward the timeout to the Command (and thus zmq::poll in PollAndSend...) ... is this sensible? HMMMMM FIXME Command cmd{command, type, topic.substr(2,std::string::npos),timeout}; - printf("formed command with timeout %d ms from timeout at %p of value %d\n",cmd.timeout_ms,timeout_ms,(timeout_ms ? *timeout_ms : 0)); // wrap our attempt to get the response in try/catch, just in case? try { @@ -465,7 +464,6 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c } bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ - printf("DoCommand with timeout %d ms\n",timeout_ms); if(verbosity>10) std::cout<<"ServicesBackend::DoCommand received command"<10) std::cout<<"ServicesBackend::GetNextResponse had response in socket"<10) std::cout<<"ServicesBackend::SendNextCommand calling PollAndSend" <<", message type: "<10) std::cout<<__PRETTY_FUNCTION__<<" called"<send(message, ZMQ_SNDMORE); @@ -859,7 +853,6 @@ bool ServicesBackend::Send(zmq::socket_t* sock, bool more, std::vector