diff --git a/src/ServiceDiscovery/ServiceDiscovery.cpp b/src/ServiceDiscovery/ServiceDiscovery.cpp index 23c8b8c..607a537 100644 --- a/src/ServiceDiscovery/ServiceDiscovery.cpp +++ b/src/ServiceDiscovery/ServiceDiscovery.cpp @@ -529,7 +529,7 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(sock.at(i), IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreq, sizeof(mreq)) < 0) { perror("setsockopt mreq"); - printf("Failed to goin multicast group listen thread"); + printf("Failed to join multicast group listen thread"); exit(1); } diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 47a96f9..c4df97d 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -27,21 +27,24 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC if(!m_variables.Get("service_name",m_name)) m_name="test_service"; if(!m_variables.Get("db_name",m_dbname)) m_dbname="daq"; - - if(!m_backend_client.Initialise(m_variables)){ std::clog<<"error initialising slowcontrol client"<& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SQLQuery(const std::string& database, const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index ca64947..b8557df 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -2,7 +2,7 @@ using namespace ToolFramework; -Command::Command(std::string command_in, char type_in, std::string topic_in){ +Command::Command(std::string command_in, char type_in, std::string topic_in, const unsigned int timeout_ms_in){ command = command_in; type = type_in; topic=topic_in; @@ -10,6 +10,7 @@ Command::Command(std::string command_in, char type_in, std::string topic_in){ response=std::vector{}; err=""; msg_id=0; + timeout_ms=timeout_ms_in; } Command::Command(){ @@ -20,12 +21,14 @@ Command::Command(){ response=std::vector{}; err=""; msg_id=0; + timeout_ms=0; } void Command::Print() const { std::cout<<"command="<{}; + cmd_in.timeout_ms=0; } Command& Command::operator=(Command&& cmd_in){ @@ -70,12 +76,14 @@ Command& Command::operator=(Command&& cmd_in){ response = std::move(cmd_in.response); err = cmd_in.err; msg_id = cmd_in.msg_id; + timeout_ms = cmd_in.timeout_ms; cmd_in.command=""; cmd_in.type='\0'; cmd_in.topic=""; cmd_in.success=0; cmd_in.msg_id=0; cmd_in.response=std::vector{}; + cmd_in.timeout_ms=0; } return *this; } @@ -387,20 +395,14 @@ bool ServicesBackend::SendMulticast(std::string command, std::string* err){ return true; } -////////////////////////////////////////////////////////////////////////////////////////////////////////// -// FIXME ok, i think the combination of SendCommand and DoCommand are an over-complication. // -// SendCommand is basically a pointless wrapper that spawns a thread to do what it could do itself. // -// This would remove the surplus promise/future layer. The only change to DoCommand required // -// would be that the timeouts from each stage (wait for send, wait for reply) // -// would need to be appropriately modified -.e.g if total timeout is 300ms and it takes 100ms to send, // -// then we would only wait for 200ms for the reply. // -////////////////////////////////////////////////////////////////////////////////////////////////////////// - bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const unsigned int* timeout_ms, std::string* err){ // send a command and receive response. // This is a wrapper that ensures we always return within the requested timeout. if(verbosity>10) std::cout<<"ServicesBackend::SendCommand invoked with command '"< response = std::async(std::launch::async, &ServicesBackend::DoCommand, this, cmd); - // std::async returns a std::future that will block on destruction until the promise returns. - // if we don't want that to happen, i.e. we want to abandon it if it times out, - // we instead need to obtain a future from a promise (which is somehow not blocking?), - // and run our code in a detached thread, using the promise to pass back the result. - // tbh i don't quite get why this is different but there we go. - // see https://stackoverflow.com/a/23454840/3544936 and https://stackoverflow.com/a/23460094/3544936 - std::promise returnval; - std::future response = returnval.get_future(); - if(verbosity>10) std::cout<<"ServicesBackend::SendCommand spinning up new thread"<10) std::cout<<"ServicesBackend::SendCommand waiting for response"<10) std::cout<<"ServicesBackend::SendCommand fetching response"<10) std::cout<<"ServicesBackend::SendCommand response is "<3) std::cerr<10) std::cout<<"ServicesBackend::SendCommand calling DoCommand"<10) std::cout<<"ServicesBackend::SendCommand response is "< result){ +bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ if(verbosity>10) std::cout<<"ServicesBackend::DoCommand received command"< result){ uint32_t thismsgid = ++msg_id; cmd.msg_id = thismsgid; + // we submit the command asynchrously. + // This way we control how long we wait for the response, ensuring we don't block the caller indefinitely. + // The response will be a Command object with remaining members populated. + // initial guess would have been to use: + //std::future response = std::async(std::launch::async, &ServicesBackend::DoCommand, this, cmd); + // but std::async returns a std::future that will block on destruction until the function returns. + // if we don't want that to happen, i.e. we want to abandon it if it times out, + // we instead need to obtain a future from a promise (which is somehow not blocking?), + // and run our code in a detached thread, using the promise to pass back the result. + // tbh i don't quite get why this is different but there we go. + // see https://stackoverflow.com/a/23454840/3544936 and https://stackoverflow.com/a/23460094/3544936 + // zmq sockets aren't thread-safe, so we have one central sender. // we submit our command and keep a ticket to retrieve the return status on completion. // similarly, the next response received may not be for us, so a central dealer receives @@ -530,9 +510,27 @@ bool ServicesBackend::DoCommand(Command cmd, std::promise result){ waiting_senders.emplace(cmd, std::move(send_ticket)); send_queue_mutex.unlock(); - // wait for our number to come up. loooong timeout, but don't hang forever. if(verbosity>10) std::cout<<"ServicesBackend::DoCommand waiting for send confirmation"<(send_end - send_start).count(); + timeout_ms -= send_time_ms; + // juuuust in case, ensure our remaining time is not negative. :) + if(timeout_ms<0) timedout=true; + } + + // did we get a response in time? + if(timedout){ if(verbosity>10) std::cerr<<"ServicesBackend::DoCommand timeout"< result){ Log("Timed out sending command "+std::to_string(thismsgid),v_warning,verbosity); cmd.success = false; cmd.err = "Timed out sending command"; - result.set_value(cmd); - // since we are giving up waiting for the response, remove ourselves from - // the list of waiting recipients + // since we are giving up waiting for the response, remove ourselves + // from the list of recipients awaiting response if(verbosity>10) std::cout<<"ServicesBackend::DoCommand de-registering for response on id "<10) std::cout<<"ServicesBackend::DoCommand got send confirmation"< result){ Log(errmsg,v_debug,verbosity); cmd.success = false; cmd.err = errmsg; - result.set_value(cmd); // since the send failed we don't expect a response, so remove ourselves // from the list of recipients awaiting response @@ -584,7 +581,7 @@ bool ServicesBackend::DoCommand(Command cmd, std::promise result){ // if we succeeded in sending the message, we now need to wait for a repsonse. if(verbosity>10) std::cout<<"ServicesBackend::DoCommand waiting for response"<10) std::cout<<"ServicesBackend::DoCommand response timeout"< result){ Log("Timed out waiting for response for command "+std::to_string(thismsgid),v_warning,verbosity); cmd.success = false; cmd.err = "Timed out waiting for response"; - result.set_value(cmd); return false; } else { // got a response! if(verbosity>10) std::cout<<"ServicesBackend::DoCommand got a response for command "<10) std::cout<<"ServicesBackend::SendNextCommand calling PollAndSend" <<", message type: "<10) std::cout<<"ServicesBackend::SendNextCommand send returned "<10) std::cout<<__PRETTY_FUNCTION__<<" called"<send(message, ZMQ_SNDMORE); @@ -865,7 +856,6 @@ bool ServicesBackend::Send(zmq::socket_t* sock, bool more, std::vector& return true; } +bool ServicesBackend::Ready(int timeout){ + + // poll the output sockets for listeners + // only poll dealer socket, pub sockets always return true immediately so ignore the timeout + // polling the input socket checks for a message, so don't do that. + int ret; + try { + dlr_socket_mutex.lock(); + ret = zmq::poll(&out_polls.at(1), 1, timeout); + dlr_socket_mutex.unlock(); + } catch (zmq::error_t& err){ + std::cerr<<"ServicesBackend::Ready caught "< log=nullptr); // possibly move to constructor + bool Ready(int timeout); // check if zmq sockets have connections bool Initialise(std::string configfile); bool Initialise(Store &varaibles_in); bool Finalise(); @@ -101,7 +103,7 @@ class ServicesBackend { bool InitMulticast(); // private bool RegisterServices(); //private // wrapper funtion; add command to outgoing queue, receive response. ~30s timeout. - bool DoCommand(Command cmd, std::promise); //private + bool DoCommand(Command& cmd, int timeout_ms); //private // actual send/receive functions bool SendNextCommand(); //private bool GetNextResponse(); //priavte @@ -206,6 +208,9 @@ class ServicesBackend { if(ret<0){ // error polling - is the socket closed? send_ok = -3; + } else if(ret==0){ + // 'resource temporarily unavailable'...? no connections on this socket? + send_ok = -4; } else if(poll.revents & ZMQ_POLLOUT){ bool success = Send(sock, false, std::forward(message)); send_ok = success ? 0 : -1; @@ -234,6 +239,9 @@ class ServicesBackend { if(ret<0){ // error polling - is the socket closed? send_ok = -3; + } else if(ret==0){ + // 'resource temporarily unavailable'...? no connections on this socket? + send_ok = -4; } else if(poll.revents & ZMQ_POLLOUT){ bool success = Send(sock, false, std::forward(message), std::forward(rest)...); send_ok = success ? 0 : -1;