diff --git a/.gitignore b/.gitignore index a2cf310..b1e8c16 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ *.pcm # python pre-compiled modules *.pyc +# core dumps +core # lib folder is populated during build lib/ @@ -23,3 +25,4 @@ tempinclude/ main NodeDaemon RemoteControl +MCDebug diff --git a/configfiles/Dummy/ToolChainConfig b/configfiles/Dummy/ToolChainConfig index 7f0a1a0..533cb97 100644 --- a/configfiles/Dummy/ToolChainConfig +++ b/configfiles/Dummy/ToolChainConfig @@ -18,8 +18,8 @@ log_interactive 1 # Interactive=cout; 0=false, 1= true log_local 1 # Local = local file log; 0=false, 1= true log_local_path ./log # file to store logs to if local is active log_remote 1 # Remote= remote logging system "serservice_name Remote_Logging"; 0=false, 1= true -log_address 239.192.1.1 # Remote multicast address to send logs -log_port 55554 # port on remote machine to connect to +log_address 239.192.1.2 # Remote multicast address to send logs +log_port 5000 # port on remote machine to connect to log_append_time 0 # append seconds since epoch to filename; 0=false, 1= true log_split_files 1 # seperate output and error log files (named x.o and x.e) @@ -46,8 +46,8 @@ clt_dlr_socket_timeout 500 # inpoll_timeout 50 # keep these short! outpoll_timeout 50 # keep these short! command_timeout 2000 # -multicast_port 55554 # -multicast_address 239.192.1.1 # +mon_port 5000 # +mon_address 239.192.1.3 # ##### Tools To Add ##### Tools_File configfiles/Dummy/ToolsConfig # list of tools to run and their config files diff --git a/configfiles/template/ToolChainConfig b/configfiles/template/ToolChainConfig index d26b67a..badb1c3 100644 --- a/configfiles/template/ToolChainConfig +++ b/configfiles/template/ToolChainConfig @@ -18,8 +18,8 @@ log_interactive 1 # Interactive=cout; 0=false, 1= true log_local 0 # Local = local file log; 0=false, 1= true log_local_path ./log # file to store logs to if local is active log_remote 0 # Remote= remote logging system "serservice_name Remote_Logging"; 0=false, 1= true -log_address 239.192.1.1 # Remote multicast address to send logs -log_port 5001 # port on remote machine to connect to +log_address 239.192.1.2 # Remote multicast address to send logs +log_port 5000 # port on remote machine to connect to log_append_time 0 # append seconds since epoch to filename; 0=false, 1= true log_split_files 0 # seperate output and error log files (named x.o and x.e) @@ -45,8 +45,8 @@ clt_dlr_socket_timeout 500 # inpoll_timeout 50 # keep these short! outpoll_timeout 50 # keep these short! command_timeout 2000 # -multicast_port 55554 # -multicast_address 239.192.1.1 # +mon_port 5000 # +mon_address 239.192.1.3 # ##### Tools To Add ##### Tools_File configfiles/ToolsConfig # list of tools to run and their config files diff --git a/src/DAQDataModelBase/DAQUtilities.cpp b/src/DAQDataModelBase/DAQUtilities.cpp index f2cc7f5..cdc7b5e 100644 --- a/src/DAQDataModelBase/DAQUtilities.cpp +++ b/src/DAQDataModelBase/DAQUtilities.cpp @@ -119,10 +119,11 @@ int DAQUtilities::UpdateConnections(std::string ServiceName, zmq::socket_t* sock if(port=="" && port_name=="") service->Get("remote_port",remote_port); else if(port_name!="") service->Get(port_name, remote_port); if(remote_port==""){ - delete service; - service=0; - continue; + delete service; + service=0; + continue; } + //printf("updateconnections checking if service '%s' is interested in client '%s' on port '%s'\n",ServiceName.c_str(), type.c_str(),remote_port.c_str()); std::string tmp=ip + ":" + remote_port; @@ -162,7 +163,7 @@ DAQThread_args* DAQUtilities::CreateThread(std::string ThreadName, void (*func) } return args; -} +} void *DAQUtilities::String_Thread(void *arg){ @@ -189,11 +190,11 @@ void *DAQUtilities::String_Thread(void *arg){ zmq::poll(&initems[0], 1, 0); if ((initems[0].revents & ZMQ_POLLIN)){ - - zmq::message_t message; - IThread.recv(&message); - command=std::string(static_cast(message.data())); - + + zmq::message_t message; + IThread.recv(&message); + command=std::string(static_cast(message.data())); + } args->func_with_string(command); diff --git a/src/DAQLogging/DAQLogging.cpp b/src/DAQLogging/DAQLogging.cpp index 4042443..35f6807 100644 --- a/src/DAQLogging/DAQLogging.cpp +++ b/src/DAQLogging/DAQLogging.cpp @@ -342,7 +342,6 @@ src/DAQLogging/DAQLogging.{h,cpp} -nw bzero((char *)&addr, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(log_port); addrlen = sizeof(addr); diff --git a/src/ServiceDiscovery/ServiceDiscovery.cpp b/src/ServiceDiscovery/ServiceDiscovery.cpp index 607a537..1150d16 100644 --- a/src/ServiceDiscovery/ServiceDiscovery.cpp +++ b/src/ServiceDiscovery/ServiceDiscovery.cpp @@ -4,25 +4,25 @@ using namespace ToolFramework; ServiceDiscovery::ServiceDiscovery(bool Send, bool Receive, int remoteport, std::string address, int multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ - + std::vector in_address; std::vector in_multicastport; - + in_address.push_back(address); in_multicastport.push_back(multicastport); - + Init(Send, Receive, remoteport, in_address, in_multicastport, incontext, UUID, service, pubsec, kicksec); - + } ServiceDiscovery::ServiceDiscovery(bool Send, bool Receive, int remoteport, std::vector address, std::vector multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ Init(Send, Receive, remoteport, address, multicastport, incontext, UUID, service, pubsec, kicksec); - + } void ServiceDiscovery::Init(bool Send, bool Receive, int remoteport, std::vector address, std::vector multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ - - + + m_UUID=UUID; context=incontext; m_multicastport=multicastport; @@ -31,12 +31,12 @@ void ServiceDiscovery::Init(bool Send, bool Receive, int remoteport, std::vector m_remoteport=remoteport; m_send=Send; m_receive=Receive; - + args= new thread_args(m_UUID, context, m_multicastaddress, m_multicastport, m_service, m_remoteport, pubsec, kicksec); - - if (Receive) pthread_create (&thread[0], NULL, ServiceDiscovery::MulticastListenThread, args); - if (Send) pthread_create (&thread[1], NULL, ServiceDiscovery::MulticastPublishThread, args); + if (Receive) pthread_create (&thread[0], NULL, ServiceDiscovery::MulticastListenThread, args); + + if (Send) pthread_create (&thread[1], NULL, ServiceDiscovery::MulticastPublishThread, args); //sleep(2); @@ -51,15 +51,15 @@ void ServiceDiscovery::Init(bool Send, bool Receive, int remoteport, std::vector } ServiceDiscovery::ServiceDiscovery( std::string address, int multicastport, zmq::context_t * incontext, int kicksec){ - + std::vector in_address; std::vector in_multicastport; - + in_address.push_back(address); in_multicastport.push_back(multicastport); - + Init(in_address, in_multicastport, incontext, kicksec); - + } ServiceDiscovery::ServiceDiscovery( std::vector address, std::vector multicastport, zmq::context_t * incontext, int kicksec){ @@ -76,20 +76,20 @@ void ServiceDiscovery::Init( std::vector address, std::vector m_UUID=boost::uuids::random_generator()(); m_receive=true; m_send=false; - + args= new thread_args(m_UUID, context, m_multicastaddress, m_multicastport, m_service, m_remoteport, 0 , kicksec); - + pthread_create (&thread[0], NULL, ServiceDiscovery::MulticastListenThread, args); - + // sleep(2); - + } void* ServiceDiscovery::MulticastPublishThread(void* arg){ - - + + thread_args* args= static_cast(arg); zmq::context_t * context = args->context; boost::uuids::uuid m_UUID=args->UUID; @@ -97,20 +97,20 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ int m_multicastport=args->multicastport.at(0); std::string m_service=args->service; int m_remoteport=args->remoteport; - + long msg_id=0; - - zmq::socket_t Ireceive (*context, ZMQ_PULL); + + zmq::socket_t Ireceive (*context, ZMQ_PULL); int linger = 0; //Ireceive.setsockopt(ZMQ_IMMEDIATE, 1); - Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); - Ireceive.bind("inproc://ServicePublish"); + Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + Ireceive.bind("inproc://ServicePublish"); /// multi cast ///// struct sockaddr_in addr; int addrlen, sock, cnt; - // struct ip_mreq mreq; + // struct ip_mreq mreq; // set up socket // @@ -119,8 +119,8 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ l.l_onoff = 0; l.l_linger = 0; setsockopt(sock, SOL_SOCKET, SO_LINGER,(char *) &l, sizeof(l)); - - //fcntl(sock, F_SETFL, O_NONBLOCK); + + //fcntl(sock, F_SETFL, O_NONBLOCK); if (sock < 0) { perror("socket"); printf("Failed to connect to multicast publish socket"); @@ -128,7 +128,6 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ } bzero((char *)&addr, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(m_multicastport); addrlen = sizeof(addr); @@ -137,7 +136,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ std::vector PubServices; - Store bb; + Store bb; bb.Set("msg_type", "Service Discovery"); bb.Set("msg_value",m_service); bb.Set("remote_port",m_remoteport); @@ -145,7 +144,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ else bb.Set("status_query",false); bb.Set("uuid",boost::uuids::to_string(m_UUID)); PubServices.push_back(bb); - + // Initialize poll set zmq::pollitem_t items [] = { { Ireceive, 0, ZMQ_POLLIN, 0 }, @@ -156,7 +155,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ bool running=true; while(running){ - + try{ zmq::poll(&items [0], 2, 1000); } catch(zmq::error_t& err){ @@ -164,7 +163,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ if(zmq_errno()==EINTR) continue; throw; } - + if ((items [0].revents & ZMQ_POLLIN) && running) { zmq::message_t commands; @@ -178,256 +177,259 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ bool statusquery; tmp>>command>>service>>uuid>>port>>statusquery; - - + + //printf("SD publish thread got request command %s\n",command.c_str()); if(command=="Quit"){ - //printf("publish quitting \n"); - running=false; + //printf("publish quitting \n"); + running=false; } else if(command=="Add"){ - - Store bb; - bb.Set("msg_type","Service Discovery"); - bb.Set("msg_value",service); - bb.Set("remote_port",port); - bb.Set("status_query",statusquery); - bb.Set("uuid",boost::uuids::to_string(uuid)); - PubServices.push_back(bb); + + Store bb; + bb.Set("msg_type","Service Discovery"); + bb.Set("msg_value",service); + bb.Set("remote_port",port); + bb.Set("status_query",statusquery); + bb.Set("uuid",boost::uuids::to_string(uuid)); + PubServices.push_back(bb); + //printf("SD publish thread Adding service %s\n", service.c_str()); } else if(command=="Delete"){ - std::vector::iterator it; - for (it = PubServices.begin() ; it != PubServices.end(); ++it){ - //std::cout<<"d3.5 "<<*((*it)["msg_value"])<("msg_value")==service)break; - - - } - if (it!=PubServices.end())PubServices.erase(it); - - + std::vector::iterator it; + for (it = PubServices.begin() ; it != PubServices.end(); ++it){ + //std::cout<<"d3.5 "<<*((*it)["msg_value"])<("msg_value")==service)break; + + + } + if (it!=PubServices.end())PubServices.erase(it); + + } else if(command=="PortAdd"){ - if(PubServices.size()) PubServices.at(0).Set(service, port); + if(PubServices.size()) PubServices.at(0).Set(service, port); + //printf("SD publish thread %s adding port %d for service %s (# services: %d)\n",(PubServices.empty() ? "not" : "\b"), port, service.c_str(), PubServices.size()); } else if(command =="PortDelete"){ - if(PubServices.size()) PubServices.at(0).Erase(service); + if(PubServices.size()) PubServices.at(0).Erase(service); } + continue; } if ((items [1].revents & ZMQ_POLLOUT) && running){ - + for(unsigned int i=0;i("remote_port"); - // StatusCheck.setsockopt(ZMQ_IMMEDIATE, 1); - StatusCheck.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); - - StatusCheck.connect(connection.str().c_str()); - - zmq::pollitem_t out[]={{StatusCheck,0,ZMQ_POLLOUT,0}}; - zmq::pollitem_t in[]={{StatusCheck,0,ZMQ_POLLIN,0}}; - - mm.Set("msg_type","Command"); - mm.Set("msg_value","Status"); - - std::string command; - mm>>command; - - // zmq::message_t Esend(256); - //std::string command="Status; - mm.Delete(); - - zmq::message_t Esend(command.length()+1); - snprintf ((char *) Esend.data(), command.length()+1 , "%s" ,command.c_str()) ; - - try{ - zmq::poll(out,1,1000); - } catch(zmq::error_t& err){ - // ignore poll aborting due to signals - if(zmq_errno()==EINTR) continue; - throw; - } - - if(out[0].revents & ZMQ_POLLOUT){ - StatusCheck.send(Esend); - - //std::cout<<"waiting for message "<(Ereceive.data())); - - mm.JsonParser(ss.str()); - } - } - } - /* - std::cout<<"received for publish "< subwriter(subbuffer); - params.Accept(subwriter); - - std::string tmpbufferout=subbuffer.GetString(); - - std::cout<<" substringtest "<name.GetString()<value<name.GetString()=="port") itr->value.SetInt(m_multicastport); - //if(itr->name.GetString()=="status") itr->value.SetString(ss.str().c_str(),strlen(ss.str().c_str())); - } - */ - - - - //rapidjson::Document params=d["params"]; - //params["port"].SetInt(m_multicastport,strlen(); - // rapidjson::Value& params = d["params"]; - // rapidjson::Document::AllocatorType& allocator = d.GetAllocator(); - // params.PushBack(i, allocator); - - // params.PushBack("key1","value1"); + /* + + + rapidjson::Document d; + d.Parse(json); + // d.SetObject(); + //["hello"] = "rapidjson"; + // test.SetString("My JSON Document", d.GetAllocator()); + // d.AddMember("Doc Name", test , d.GetAllocator()); + // d.AddMember("uuid", tmp.str().c_str(), d.GetAllocator()); + // (*newDoc)["Parameters"].SetObject(); + + msg_id++; + std::stringstream tmp; + tmp<("remote_port"); + // StatusCheck.setsockopt(ZMQ_IMMEDIATE, 1); + StatusCheck.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + + StatusCheck.connect(connection.str().c_str()); + + zmq::pollitem_t out[]={{StatusCheck,0,ZMQ_POLLOUT,0}}; + zmq::pollitem_t in[]={{StatusCheck,0,ZMQ_POLLIN,0}}; + + mm.Set("msg_type","Command"); + mm.Set("msg_value","Status"); + + std::string command; + mm>>command; + + // zmq::message_t Esend(256); + //std::string command="Status; + mm.Delete(); + + zmq::message_t Esend(command.length()+1); + snprintf ((char *) Esend.data(), command.length()+1 , "%s" ,command.c_str()) ; + + try{ + zmq::poll(out,1,1000); + } catch(zmq::error_t& err){ + // ignore poll aborting due to signals + if(zmq_errno()==EINTR) continue; + throw; + } + + if(out[0].revents & ZMQ_POLLOUT){ + StatusCheck.send(Esend); + + //std::cout<<"waiting for message "<(Ereceive.data())); + + mm.JsonParser(ss.str()); + } + } + } + /* + std::cout<<"received for publish "< subwriter(subbuffer); + params.Accept(subwriter); + + std::string tmpbufferout=subbuffer.GetString(); + + std::cout<<" substringtest "<name.GetString()<value<name.GetString()=="port") itr->value.SetInt(m_multicastport); + //if(itr->name.GetString()=="status") itr->value.SetString(ss.str().c_str(),strlen(ss.str().c_str())); + } + */ + + + + //rapidjson::Document params=d["params"]; + //params["port"].SetInt(m_multicastport,strlen(); + // rapidjson::Value& params = d["params"]; + // rapidjson::Document::AllocatorType& allocator = d.GetAllocator(); + // params.PushBack(i, allocator); + + // params.PushBack("key1","value1"); //params.PushBack("key2","value2"); - /* - std::cout<<" d[UUID] = "< > writer(buffer); - //rapidjson::Writer writer(buffer); - d.Accept(writer); - std::string hhh=buffer.GetString(); - std::cout<< "bufer = "<("msg_value")); - else PubServices.at(i).Set("status","N/A"); - std::string pubmessage; - PubServices.at(i)>>pubmessage; - - //std::stringstream pubmessage; - - //pubmessage<<"{\"uuid\":\""< > writer(buffer); + //rapidjson::Writer writer(buffer); + d.Accept(writer); + std::string hhh=buffer.GetString(); + std::cout<< "bufer = "<("msg_value")); + else PubServices.at(i).Set("status","N/A"); + std::string pubmessage; + PubServices.at(i)>>pubmessage; + + //std::stringstream pubmessage; + + //pubmessage<<"{\"uuid\":\""<pubsec); @@ -440,7 +442,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ close(sock); Ireceive.close(); // printf("publish out of runnin \n"); - + pthread_exit(NULL); //return (NULL); @@ -449,7 +451,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ void* ServiceDiscovery::MulticastListenThread(void* arg){ - + thread_args* args= static_cast(arg); zmq::context_t * context = args->context; // boost::uuids::uuid m_UUID=args->UUID; @@ -458,9 +460,11 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ std::string m_service=args->service; zmq::socket_t Ireceive (*context, ZMQ_ROUTER); - Ireceive.bind("inproc://ServiceDiscovery"); + int linger = 0; + Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + Ireceive.bind("inproc://ServiceDiscovery"); + - /* zmq::message_t config; Ireceive.recv (&config); @@ -471,26 +475,26 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ Ireceive.send(config); */ - + ///// multi cast ///// std::vector sock; char message[512]; std::vector addr; std::vector addrlen; - + std::vector items; - + items.resize(m_multicastaddress.size()+1); //zmq::pollitem_t items[m_multicastaddress.size()+1]; - + items.at(0).socket = Ireceive; items.at(0).fd = 0; items.at(0).events = ZMQ_POLLIN; items.at(0).revents = 0; - + zmq::pollitem_t out[] = { {Ireceive, 0, ZMQ_POLLOUT, 0} }; @@ -505,34 +509,34 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ sock.emplace_back(socket(AF_INET, SOCK_DGRAM, 0)); addrlen.emplace_back(); addr.emplace_back(); - + int a =1; setsockopt(sock.at(i), SOL_SOCKET, SO_REUSEADDR, &a, sizeof(int)); - //fcntl(sock, F_SETFL, O_NONBLOCK); + //fcntl(sock, F_SETFL, O_NONBLOCK); if (sock.at(i) < 0) { perror("socket"); exit(1); } bzero((char *)&addr.at(i), sizeof(addr.at(i))); addr.at(i).sin_family = AF_INET; - addr.at(i).sin_addr.s_addr = htonl(INADDR_ANY); + inet_aton(m_multicastaddress.at(i).c_str(), &addr.at(i).sin_addr); addr.at(i).sin_port = htons(m_multicastport.at(i)); addrlen.at(i) = sizeof(addr.at(i)); // receive // - if (bind(sock.at(i), (struct sockaddr *) &addr.at(i), sizeof(addr.at(i))) < 0) { + if (bind(sock.at(i), (struct sockaddr *) &addr.at(i), sizeof(addr.at(i))) < 0) { perror("bind"); printf("Failed to bind to multicast listen socket"); exit(1); - } - mreq.imr_multiaddr.s_addr = inet_addr(m_multicastaddress.at(i).c_str()); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); + } + mreq.imr_multiaddr.s_addr = inet_addr(m_multicastaddress.at(i).c_str()); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(sock.at(i), IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreq, sizeof(mreq)) < 0) { perror("setsockopt mreq"); printf("Failed to join multicast group listen thread"); exit(1); - } - + } + items.at(i+1).socket = 0; items.at(i+1).fd = sock.at(i); @@ -561,90 +565,90 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ for(size_t i =0; i < sock.size(); i++){ if ((items.at(i+1).revents & ZMQ_POLLIN) && running) { - - - cnt = recvfrom(sock.at(i), message, sizeof(message), 0, (struct sockaddr *) &addr.at(i), (socklen_t*) &addrlen.at(i)); - if ((cnt > 0) && (message[0]=='{') ) { - //perror("recvfrom"); - // exit(1); - // break; - //} - //else if (cnt > 0){ - //printf("%s: message = \"%s\"\n", inet_ntoa(addr.at(i).sin_addr), message); - - //if(message[0]!='[') break; - - Store* newservice= new Store(); - newservice->Set("ip",inet_ntoa(addr.at(i).sin_addr)); - newservice->JsonParser(message); - - std::string uuid; - newservice->Get("uuid",uuid); - if(RemoteServices.count(uuid)) delete RemoteServices[uuid]; - //std::cout< 0){ + //printf("SD receive from %s: message = \"%s\"\n", inet_ntoa(addr.at(i).sin_addr), message); + + //if(message[0]!='[') break; + + Store* newservice= new Store(); + newservice->Set("ip",inet_ntoa(addr.at(i).sin_addr)); + newservice->JsonParser(message); + + std::string uuid; + newservice->Get("uuid",uuid); + if(RemoteServices.count(uuid)) delete RemoteServices[uuid]; + //std::cout<second->Get("msg_time",msg_time_orig); - - //std::cout<<" time orig ="<second->Get("msg_time",msg_time_orig); + + //std::cout<<" time orig ="<>arg1; - - if(arg1=="All"){ - - //printf("d2\n"); - //zmq::message_t sizem(512); - int size= RemoteServices.size(); - zmq::message_t sizem(sizeof size); - - snprintf ((char *) sizem.data(), sizeof size , "%d" ,size) ; - - // zmq::poll(out,1,1000); - - // if (out[0].revents & ZMQ_POLLOUT){ - //std::cout<<"SD sent size="<>arg1>>arg2; - - for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ - - std::string test; - it->second->Get("service",test); - - if(arg2==test){ - - std::string service; - *(it->second)>>service; - zmq::message_t send(service.length()+1); - snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; - - try{ - zmq::poll(out,1,1000); - } catch(zmq::error_t& err){ - // ignore poll aborting due to signals - if(zmq_errno()==EINTR) continue; - throw; - } - - - if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); - } - } - - } - - - - if(arg1=="UUID"){ - - iss>>arg1>>arg2; - - for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ - - - std::string test; - it->second->Get("uuid",test); - - if(arg2==test){ - - std::string service; - *(it->second)>>service; - zmq::message_t send(service.length()+1); - snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; - - try{ - zmq::poll(out,1,1000); - } catch(zmq::error_t& err){ - // ignore poll aborting due to signals - if(zmq_errno()==EINTR) continue; - throw; - } - - if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); - } - } - - } - - if(arg1=="Quit"){ - - running=false; - //printf("quitting listening \n"); - } - - - - - /* - std::string tmp="0"; - zmq::message_t send(tmp.length()+1); - snprintf ((char *) send.data(), tmp.length()+1 , "%s" ,tmp.c_str()) ; - Ireceive.send(send); - //printf("sent \n"); - */ + + std::istringstream iss(static_cast(comm.data())); + std::string arg1=""; + std::string arg2=""; + + iss>>arg1; + + if(arg1=="All"){ + + //printf("d2\n"); + //zmq::message_t sizem(512); + int size= RemoteServices.size(); + zmq::message_t sizem(sizeof size); + + snprintf ((char *) sizem.data(), sizeof size , "%d" ,size) ; + + // zmq::poll(out,1,1000); + + // if (out[0].revents & ZMQ_POLLOUT){ + //std::cout<<"SD sent size="<>arg1>>arg2; + + for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ + + std::string test; + it->second->Get("service",test); + + if(arg2==test){ + + std::string service; + *(it->second)>>service; + zmq::message_t send(service.length()+1); + snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; + + try{ + zmq::poll(out,1,1000); + } catch(zmq::error_t& err){ + // ignore poll aborting due to signals + if(zmq_errno()==EINTR) continue; + throw; + } + + + if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); + } + } + + } + + + + if(arg1=="UUID"){ + + iss>>arg1>>arg2; + + for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ + + + std::string test; + it->second->Get("uuid",test); + + if(arg2==test){ + + std::string service; + *(it->second)>>service; + zmq::message_t send(service.length()+1); + snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; + + try{ + zmq::poll(out,1,1000); + } catch(zmq::error_t& err){ + // ignore poll aborting due to signals + if(zmq_errno()==EINTR) continue; + throw; + } + + if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); + } + } + + } + + if(arg1=="Quit"){ + + running=false; + //printf("quitting listening \n"); + } + + + + + /* + std::string tmp="0"; + zmq::message_t send(tmp.length()+1); + snprintf ((char *) send.data(), tmp.length()+1 , "%s" ,tmp.c_str()) ; + Ireceive.send(send); + //printf("sent \n"); + */ } } @@ -805,16 +809,16 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ } -for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ - delete it->second; - it->second=0; + for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ + delete it->second; + it->second=0; + + } + RemoteServices.clear(); + //printf("exiting sd listen thread \n"); + pthread_exit(NULL); + //return (NULL); - } - RemoteServices.clear(); - //printf("exiting sd listen thread \n"); - pthread_exit(NULL); - //return (NULL); - } @@ -822,7 +826,7 @@ for (std::map::iterator it=RemoteServices.begin(); it!=Remot ServiceDiscovery::~ServiceDiscovery(){ //printf("in sd destructor \n"); - sleep(1); + sleep(1); //printf("finnish sleep \n"); // kill publish thread @@ -831,6 +835,8 @@ ServiceDiscovery::~ServiceDiscovery(){ if (m_send){ //printf("in sd send kill \n"); zmq::socket_t ServicePublish (*context, ZMQ_PUSH); + int linger = 0; + ServicePublish.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); //int a=120000; //ServicePublish.setsockopt(ZMQ_RCVTIMEO, a); //ServicePublish.setsockopt(ZMQ_SNDTIMEO, a); @@ -860,9 +866,11 @@ ServiceDiscovery::~ServiceDiscovery(){ if(m_receive){ //printf("in sd receive kill \n"); zmq::socket_t ServiceDiscovery (*context, ZMQ_DEALER); + int linger=0; + ServiceDiscovery.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); // int a=60000; //ServiceDiscovery.setsockopt(ZMQ_RCVTIMEO, a); - //ServiceDiscovery.setsockopt(ZMQ_SNDTIMEO, a); + //ServiceDiscovery.setsockopt(ZMQ_SNDTIMEO, a); ServiceDiscovery.connect("inproc://ServiceDiscovery"); @@ -889,7 +897,7 @@ ServiceDiscovery::~ServiceDiscovery(){ //printf("finnish Set args=0 \n"); } //printf("deleted args \n"); - + //printf("finnish sd destructor \n"); } diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 0589034..339cbde 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -2,6 +2,10 @@ using namespace ToolFramework; +namespace { + const uint32_t MAX_UDP_PACKET_SIZE = 655355; +} + Services::Services(){ m_context=0; m_dbname=""; @@ -33,7 +37,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC m_variables.Get("alerts_receive", alerts_receive); m_variables.Get("alert_receive_port", alert_receive_port); m_variables.Get("sc_port", sc_port); - + sc_vars->InitThreadedReceiver(m_context, sc_port, 100, new_service, alert_receive_port, alerts_receive, alert_send_port, alerts_send); m_backend_client.SetUp(m_context); @@ -42,7 +46,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC if(!m_backend_client.Initialise(m_variables)){ - std::clog<<"error initialising slowcontrol client"<0) level=1; - std::string cmd_string = "{\"time\":"+std::to_string(timestamp) + std::string cmd_string = "{\"time\":\""+TimeStringFromUnixMs(timestamp)+"\"" + ",\"device\":\""+name+"\"" + ",\"level\":"+std::to_string(level) - + ",\"message\":\"" + message + "\"}"; + + ",\"alarm\":\"" + message + "\"}"; std::string err=""; // send the alarm on the pub socket - bool ok = m_backend_client.SendCommand("W_ALARM", cmd_string, (std::vector*)nullptr, &timeout, &err); + bool ok = m_backend_client.SendCommand("W_ALARM", cmd_string, (std::vector*)nullptr, &timeout, &err); if(!ok){ - std::clog<<"SendAlarm error: "< { trace }, + layout, + version, + timestamp, + lifetime, + timeout + ); +} + +// ««-------------- ≪ °◇◆◇° ≫ --------------»» + +// multiple traces version +bool Services::SendPlotlyPlot( + const std::string& name, + const std::vector& traces, + const std::string& layout, + int* version, + const uint64_t timestamp, + const unsigned int lifetime, + const unsigned int timeout +) { + std::stringstream ss; + ss << "{ \"name\":\"" << name + << "\", \"time\":\"" << TimeStringFromUnixMs(timestamp) + << "\", \"lifetime\":" << lifetime + << ", \"layout\":" << layout + << ", \"data\":["; + bool first = true; + for (auto& trace : traces) { + if (!first) ss << ','; + first = false; + ss << trace; + }; + ss << "]}"; + + std::string response=""; + + std::string err; + if (!m_backend_client.SendCommand("W_PLOTLYPLOT", ss.str(), &response, &timeout, &err)){ + std::cerr << "SendPlotlyPlot error: " << err << std::endl; + return false; + }; + + if(response.empty()){ + std::cerr<<"SendPlotlyPlot error: empty response"<& responses, const unsigned int timeout){ + + responses.clear(); + + //const std::string& db = (database=="") ? m_dbname : database; + + std::string err=""; + + if(!m_backend_client.SendCommand("W_QUERY", query, &responses, &timeout, &err)){ + std::cerr<<"SQLQuery error: "< responses; + + bool ok = SQLQuery(/*db,*/ query, responses, timeout); + + if(responses.size()!=0){ + response = responses.front(); + if(responses.size()>1){ + std::cout<<"Warning: SQLQuery returned multiple rows, only first returned"<"}' - strip out contents - if(json_data.length()==0){ + if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found err = "GetDeviceConfig error: config for device "+name+" version "+std::to_string(version)+" not found"; - std::clog<"}' - strip out contents Store tmp; tmp.JsonParser(json_data); - int tmp_version; - bool ok = tmp.Get("version",tmp_version); - if(ok){ - //version = tmp_version; // cannot pass back... without a more complex signature - ok = tmp.Get("data", json_data); - } + bool ok = tmp.Get("data", json_data); if(!ok){ - std::clog<<"GetDeviceConfig error: invalid response: '"<"}' - strip out contents - if(json_data.length()==0){ + if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found - std::clog<<"GetRunConfig error: config "<"}' - strip out contents Store tmp; tmp.JsonParser(json_data); - int tmp_version; - bool ok = tmp.Get("version",tmp_version); - if(ok){ - //version = tmp_version; // cannot pass back - ok = tmp.Get("data", json_data); - } + bool ok = tmp.Get("data", json_data); if(!ok){ err="GetRunConfig error: invalid response: '"+json_data+"'"; - std::clog<>'"+name+"' FROM run_config WHERE config_id="+std::to_string(runconfig_id)+")::integer"; + + std::string err=""; - if(!get_ok){ - // redundant as GetRunConfig prints the error - but is this still useful as calling context? - // TODO we should be using exceptions, of course - //std::clog<<"GetRunDeviceConfig error getting run config id "<"}' - strip out contents + Store tmp; + tmp.JsonParser(json_data); + int tmp_version; + bool ok = tmp.Get("version",tmp_version); + if(ok && version){ + *version = tmp_version; + } + ok = ok && tmp.Get("data", json_data); + if(!ok){ + err="GetRunDeviceConfig error: invalid response: '"+json_data+"'"; + std::cerr<>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; - if(!get_ok){ - // redundant as GetRunConfig prints the error - but is this still useful as calling context? - // TODO we should be using exceptions, of course - //std::clog<<"GetRunDeviceConfig error getting run config '"<"}' - strip out contents + Store tmp; + tmp.JsonParser(json_data); + int tmp_version; + bool ok = tmp.Get("version",tmp_version); + if(ok && version){ + *version = tmp_version; + } + ok = ok && tmp.Get("data", json_data); + if(!ok){ + err="GetRunDeviceConfig error: invalid response: '"+json_data+"'"; + std::cerr<", "timestamp":, "version": , "data":""}' - size_t pos1=0, pos2=0; - std::string key; - std::map vals; - while(true){ - pos1=response.find('"',pos2); - if(pos1==std::string::npos) break; - pos2=response.find('"',++pos1); - if(pos2==std::string::npos) break; - std::string str = response.substr(pos1,(pos2-pos1)); - ++pos2; - if(key.empty()){ key = str; } - else if(key=="data"){ break; } - else { vals[key] = str; key=""; } - } - vals[key] = response.substr(pos1,response.find_last_of('"')-pos1); + Store plot; + plot.JsonParser(response); + + bool ok = plot.Get("data", json_data); + ok = ok && plot.Get("draw_options", draw_options); + ok = ok && plot.Get("version", version); - try{ - draw_options = vals["draw_options"]; - if(timestamp) *timestamp = vals["timestamp"]; - version = std::stoi(vals["version"]); - json_data = vals["data"]; - } catch(...){ - std::clog<<"GetROOTplot error: failed to parse response '"<& responses, const unsigned int timeout){ - - responses.clear(); - - const std::string& db = (database=="") ? m_dbname : database; - - const std::string command = "{ \"database\":\""+db+"\"" - + ", \"query\":\""+ query+"\" }"; - - std::string err=""; - - if(!m_backend_client.SendCommand("R_QUERY", command, &responses, &timeout, &err)){ - std::clog<<"SQLQuery error: "< responses; + Store plot; + plot.JsonParser(response); - bool ok = SQLQuery(db, query, responses, timeout); + bool ok = plot.Get("data", trace); + ok = ok && plot.Get("layout", layout); + ok = ok && plot.Get("version", version); - if(responses.size()!=0){ - response = responses.front(); - if(responses.size()>1){ - std::clog<<"Warning: SQLQuery returned multiple rows, only first returned"<655355){ - std::clog<<"Logging message is too long! Maximum length may be 655355 bytes"<MAX_UDP_PACKET_SIZE){ + std::cerr<<"Logging message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<655355){ - std::clog<<"Monitoring message is too long! Maximum length may be 655355 bytes"<MAX_UDP_PACKET_SIZE){ + std::cerr<<"Monitoring message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<12){ - // FIXME change to Store parsing so we can check this is the right key - response.replace(0,response.find_first_of(':')+1,""); - response.replace(response.find_last_of('}'),std::string::npos,""); - try { - if(version) *version = std::stoi(response); - } catch (...){ - std::clog<<"SendROOTplot error: invalid response '"<655355){ - std::clog<<"ROOT plot json is too long! Maximum length may be 655355 bytes"<MAX_UDP_PACKET_SIZE){ + std::cerr<<"ROOT plot json is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<= 0) cmd.Set("version", version); - - std::string cmd_string; - cmd >> cmd_string; - - std::string err; - std::string response; - if (!m_backend_client.SendCommand( - "R_PLOTLYPLOT", cmd_string, &response, &timeout, &err - )) - { - std::clog << "GetPlotlyPlot error: " << err << std::endl; - return false; - }; - - Store plot; - plot.JsonParser(response); - - trace = plot.Get("trace"); - layout = plot.Get("layout"); - version = plot.Get("version"); - if (timestamp) *timestamp = plot.Get("time"); - - return true; -} - -bool Services::SendPlotlyPlot( - const std::string& name, - const std::string& trace, - const std::string& layout, - int* version, - unsigned int timestamp, - unsigned int timeout -) { - return SendPlotlyPlot( - name, - std::vector { trace }, - layout, - version, - timestamp, - timeout - ); -} - -bool Services::SendPlotlyPlot( - const std::string& name, - const std::vector& traces, - const std::string& layout, - int* version, - unsigned int timestamp, - unsigned int timeout -) { - std::stringstream ss; - ss - << "{\"name\":\"" << name - << "\",\"layout\":" << layout; - if (version) ss << ",\"version\":" << *version; - if (timestamp) ss << ",\"timestamp\":" << timestamp; - ss << ",\"traces\":["; - bool first = true; - for (auto& trace : traces) { - if (first) - first = false; - else - ss << ','; - ss << trace; - }; - ss << "]}"; - - std::string err; - if (!m_backend_client.SendCommand( - "W_PLOTLYPLOT", ss.str(), static_cast(nullptr), &timeout, &err - )) - { - std::clog << "SendPlotlyPlot error: " << err << std::endl; - return false; - }; - - return true; -} - // =========================================================================== // Other functions // --------------- @@ -814,3 +923,39 @@ std::string Services::GetDeviceName(){ return m_name; } + +// ««-------------- ≪ °◇◆◇° ≫ --------------»» + +std::string Services::TimeStringFromUnixMs(const uint64_t timestamp){ + + if(timestamp==1) return "now()"; // remotely interpret 'now' + + time_t timestamp_sec; // time_t is equivalent to uint64_t + uint16_t timestamp_ms = 0; + if(timestamp==0){ + timestamp_sec = time(nullptr)*1000; // locally interpret 'now' + } else { + timestamp_ms = timestamp%1000; + timestamp_sec = timestamp/1000; + } + struct tm* timeptr = gmtime(×tamp_sec); + if(timeptr==0){ + //Log("gmtime error converting unix time '"+std::to_string(timestamp)+"' to time struct",v_error); + return "now()"; + } + char timestring[24]; + int nchars = strftime(×tring[0], 20, "%F %T", timeptr); + if(nchars==0){ + //Log("strftime error converting time struct '"+std::to_string(timestamp)+"' to string",v_error); + return "now()"; + } + // add the milliseconds + nchars = snprintf(×tring[19], 5, ".%03d", timestamp_ms); + if(nchars!=4){ + //Log("snprintf error converting '"+std::to_string(timestamp_ms)+"' to timestamp milliseconds",v_error); + //return "now()"; // just omit the milliseconds? or fall back to 'now'? + } + + return std::string{timestring}; + +} diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index 5554f83..c853e4d 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -30,29 +30,36 @@ namespace ToolFramework { bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service=false); bool Ready(const unsigned int timeout=10000); // default service discovery broadcast period is 5s, middleman also checks intermittently, compound total time should be <10s... - bool SQLQuery(const std::string& database, const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(const std::string& database, const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(const std::string& database, const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(/*const std::string& database,*/ const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const unsigned int timestamp=0); - bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", unsigned int timestamp=0); - bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetCalibrationData(std::string& json_data, int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const uint64_t timestamp=0); + bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", uint64_t timestamp=0); + bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetCalibrationData(std::string& json_data, int& version, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetCalibrationData(std::string& json_data, int&& version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetDeviceConfig(std::string& json_data, const int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunConfig(std::string& json_data, const std::string& name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunDeviceConfig(std::string& json_data, const std::string& runconfig_name, const int runconfig_version=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, bool persistent=false, int* version=nullptr, const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendTemporaryROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const unsigned int timestamp=0); - bool SendPersistentROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetROOTplot(const std::string& plot_name, int& version, std::string& draw_option, std::string& json_data, std::string* timestamp=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, unsigned int timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, unsigned int timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetPlotlyPlot(const std::string& name, int& version, std::string& json_trace, std::string& json_layout, unsigned int* timestamp=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendROOTplotZmq(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=SERVICES_DEFAULT_TIMEOUT); + bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int& version, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int&& version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetPlotlyPlot(const std::string& name, std::string& json_trace, std::string& json_layout, int& version, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetPlotlyPlot(const std::string& name, std::string& json_trace, std::string& json_layout, int&& version=-1, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + std::string TimeStringFromUnixMs(const uint64_t time); SlowControlCollection* GetSlowControlCollection(); SlowControlElement* GetSlowControlVariable(std::string key); diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index d07c2b3..fa5b935 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -2,9 +2,9 @@ using namespace ToolFramework; -Command::Command(std::string command_in, char type_in, std::string topic_in, const unsigned int timeout_ms_in){ +Command::Command(std::string command_in, char type_in, std::string topic_in, const uint32_t timeout_ms_in){ command = command_in; - type = type_in; + type = type_in; // TODO type is unnecessary, could just use topic[0] topic=topic_in; success=0; response=std::vector{}; @@ -96,22 +96,22 @@ void ServicesBackend::SetUp(zmq::context_t* in_context, std::function>tmp; - m_variables.JsonParser(tmp); +bool ServicesBackend::Initialise(Store &variables_in){ + + std::string tmp=""; + variables_in>>tmp; + m_variables.JsonParser(tmp); /* General Variables */ /* ----------------------------------------- */ @@ -280,13 +280,15 @@ bool ServicesBackend::InitMulticast(){ /* Multicast Setup */ /* ----------------------------------------- */ - int log_port = 55554; - int mon_port = 55553; - std::string multicast_address = "239.192.1.1"; + int log_port = 5000; + int mon_port = 5000; + std::string log_address = "239.192.1.2"; + std::string mon_address = "239.192.1.3"; m_variables.Get("log_port",log_port); m_variables.Get("mon_port",mon_port); - m_variables.Get("multicast_address",multicast_address); + m_variables.Get("log_address",log_address); + m_variables.Get("mon_address",mon_address); // set up multicast socket for sending logging & monitoring data log_socket = socket(AF_INET, SOCK_DGRAM, 0); @@ -323,10 +325,10 @@ bool ServicesBackend::InitMulticast(){ mon_addr = log_addr; mon_addr.sin_port = htons(mon_port); // convert destination address string to binary - get_ok = inet_aton(multicast_address.c_str(), &log_addr.sin_addr); - get_ok = get_ok && inet_aton(multicast_address.c_str(), &mon_addr.sin_addr); + get_ok = inet_aton(log_address.c_str(), &log_addr.sin_addr); + get_ok = get_ok && inet_aton(mon_address.c_str(), &mon_addr.sin_addr); if(get_ok==0){ // returns 0 on failure, not success - Log("Bad multicast address '"+multicast_address+"'",v_error,verbosity); + Log("Bad multicast address '"+log_address+"' or '"+mon_address+"'",v_error,verbosity); return false; } multicast_addrlen = sizeof(log_addr); @@ -346,9 +348,13 @@ bool ServicesBackend::RegisterServices(){ // we can make our lives a little easier by using a Utilities class utilities = new DAQUtilities(context); - // we can now register the client sockets with the following: - utilities->AddService("slowcontrol_write", clt_pub_port); - utilities->AddService("slowcontrol_read", clt_dlr_port); + // register our ports for advertisement + get_ok = utilities->AddPort("db_write", clt_pub_port); + get_ok = get_ok && utilities->AddPort("db_read", clt_dlr_port); + if(!get_ok){ + Log("Error advertising ports!",v_error,verbosity); + return false; + } return true; } @@ -386,8 +392,18 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std // only immediately evident errors are reported. receipt is not confirmed. if(verbosity>10) std::cout<<"ServicesBackend::SendMulticast invoked with command '"<lock(); int cnt = sendto(multicast_socket, command.c_str(), command.length()+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); + socket_mtx->unlock(); if(cnt < 0){ std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; Log(errmsg,v_error,verbosity); @@ -409,12 +427,12 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std return true; } -bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const unsigned int* timeout_ms, std::string* err){ +bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const uint32_t* timeout_ms, std::string* err){ // send a command and receive response. // This is a wrapper that ensures we always return within the requested timeout. if(verbosity>10) std::cout<<"ServicesBackend::SendCommand invoked with command '"< resultsvec; @@ -480,7 +498,7 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c return ret; } -bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ +bool ServicesBackend::DoCommand(Command& cmd, uint32_t timeout_ms){ if(verbosity>10) std::cout<<"ServicesBackend::DoCommand received command"< send_ticket; std::future send_receipt = send_ticket.get_future(); - send_queue_mutex.lock(); if(verbosity>10) std::cout<<"ServicesBackend::DoCommand putting command into waiting-to-send list"<(send_end - send_start).count(); - timeout_ms -= send_time_ms; - // juuuust in case, ensure our remaining time is not negative. :) - if(timeout_ms<0) timedout=true; + auto send_time_ms = std::chrono::duration_cast(send_end - send_start).count(); + // juuuust in case, make sure we actually still have time left to wait for the response + if(send_time_ms > timeout_ms) timedout=true; // shouldn't be possible really + else timeout_ms -= send_time_ms; } // did we get a response in time? @@ -549,7 +567,7 @@ bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ // sending timed out if(cmd.type=='w') ++write_commands_failed; else if(cmd.type=='r') ++read_commands_failed; - Log("Timed out sending command "+std::to_string(thismsgid),v_warning,verbosity); + Log("Timed out sending command "+std::to_string(thismsgid),v_error,verbosity); cmd.success = false; cmd.err = "Timed out sending command"; @@ -600,7 +618,7 @@ bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ // timed out if(cmd.type=='w') ++write_commands_failed; else if(cmd.type=='r') ++read_commands_failed; - Log("Timed out waiting for response for command "+std::to_string(thismsgid),v_warning,verbosity); + Log("Timed out waiting for response for command "+std::to_string(thismsgid),v_error,verbosity); cmd.success = false; cmd.err = "Timed out waiting for response"; return false; @@ -773,8 +791,9 @@ bool ServicesBackend::Finalise(){ background_thread.join(); Log("ServicesBackend Removing services",v_debug,verbosity); - if(utilities) utilities->RemoveService("slowcontrol_write"); - if(utilities) utilities->RemoveService("slowcontrol_read"); + //if(utilities) utilities->RemoveService("slowcontrol_write"); + if(utilities) utilities->RemovePort("db_read"); + if(utilities) utilities->RemovePort("db_write"); Log("ServicesBackend Closing multicast socket",v_debug,verbosity); close(log_socket); @@ -886,7 +905,7 @@ bool ServicesBackend::Send(zmq::socket_t* sock, bool more, std::vector& outputs){ +int ServicesBackend::PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, std::vector& outputs){ // poll the input socket for messages try { @@ -950,6 +969,9 @@ bool ServicesBackend::Ready(int timeout){ // only poll dealer socket, pub sockets always return true immediately so ignore the timeout // polling the input socket checks for a message, so don't do that. int ret; +// printf("ServicesBackend waiting for up to %d ms for connection on read/rep socket\n",timeout); +// auto timeout_ms = std::chrono::milliseconds(timeout); +// std::chrono::time_point start = std::chrono::steady_clock::now(); try { dlr_socket_mutex.lock(); ret = zmq::poll(&out_polls.at(1), 1, timeout); @@ -964,9 +986,12 @@ bool ServicesBackend::Ready(int timeout){ return false; } else if(ret==0){ // 'resource temoprarily unavailable' - no-one connected. +// printf("ServicesBackend::Ready - no one connected (%s)\n", zmq_strerror(errno)); } else if(out_polls.at(1).revents & ZMQ_POLLOUT){ +// printf("Connected!\n"); return true; } +// printf("returning after %ld/%d ms\n", std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(), timeout); return false; diff --git a/src/ServiceDiscovery/ServicesBackend.h b/src/ServiceDiscovery/ServicesBackend.h index 814df73..f0136da 100644 --- a/src/ServiceDiscovery/ServicesBackend.h +++ b/src/ServiceDiscovery/ServicesBackend.h @@ -14,6 +14,9 @@ #include #include #include +#include +#include // sleep_for / sleep_until +#include // this_thread #include // gethostname #include // toupper/tolower #include // std::function, std::negate @@ -27,7 +30,7 @@ namespace ToolFramework { struct Command { - Command(std::string command_in, char cmd_type_in, std::string topic_in, const unsigned int timeout_ms_in); + Command(std::string command_in, char cmd_type_in, std::string topic_in, const uint32_t timeout_ms_in); Command(const Command& cmd_in); // copy constructor Command(Command&& cmd_in); // move constructor @@ -60,8 +63,8 @@ class ServicesBackend { bool Finalise(); // interfaces called by clients. These return within timeout. - bool SendCommand(const std::string& topic, const std::string& command, std::vector* results=nullptr, const unsigned int* timeout_ms=nullptr, std::string* err=nullptr); - bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const unsigned int* timeout_ms=nullptr, std::string* err=nullptr); + bool SendCommand(const std::string& topic, const std::string& command, std::vector* results=nullptr, const uint32_t* timeout_ms=nullptr, std::string* err=nullptr); + bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const uint32_t* timeout_ms=nullptr, std::string* err=nullptr); // multicasts bool SendMulticast(MulticastType type, std::string command, std::string* err=nullptr); @@ -92,6 +95,9 @@ class ServicesBackend { // multicast socket file descriptors int log_socket=-1; int mon_socket=-1; + // mutexes to lock them + std::mutex log_socket_mtx; + std::mutex mon_socket_mtx; // multicast destination address structure struct sockaddr_in log_addr; struct sockaddr_in mon_addr; @@ -103,11 +109,11 @@ class ServicesBackend { std::map> waiting_recipients; void Log(std::string msg, int msg_verb, int verbosity); //?? generalise private - bool InitZMQ(); //private + bool InitZMQ(); //private bool InitMulticast(); // private bool RegisterServices(); //private // wrapper funtion; add command to outgoing queue, receive response. ~30s timeout. - bool DoCommand(Command& cmd, int timeout_ms); //private + bool DoCommand(Command& cmd, uint32_t timeout_ms); //private // actual send/receive functions bool SendNextCommand(); //private bool GetNextResponse(); //priavte @@ -118,9 +124,9 @@ class ServicesBackend { // TODO add retrying int max_retries; - int inpoll_timeout; - int outpoll_timeout; - int command_timeout; + uint32_t inpoll_timeout; + uint32_t outpoll_timeout; + uint32_t command_timeout; // TODO add stats reporting boost::posix_time::time_duration resend_period; // time between resends if not acknowledged @@ -129,8 +135,8 @@ class ServicesBackend { boost::posix_time::ptime last_read; // when we last sent a read command boost::posix_time::ptime last_printout; // when we last printed out stats about what we're doing - int read_commands_failed; - int write_commands_failed; + std::atomic read_commands_failed{0}; + std::atomic write_commands_failed{0}; // general int verbosity; @@ -148,14 +154,14 @@ class ServicesBackend { // since that's the one the middleman needs to know to send replies back std::string clt_ID; - uint32_t msg_id = 0; + std::atomic msg_id{0}; // ======================================================= // zmq helper functions // TODO move to separate class as these are shared by middleman - int PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, std::vector& outputs); + int PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, std::vector& outputs); bool Receive(zmq::socket_t* sock, std::vector& outputs); // base cases; send single (final) message part @@ -198,7 +204,7 @@ class ServicesBackend { // wrapper to do polling if required // version if one part template - int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message){ + int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, T&& message){ if(verbosity>10) std::cout<<__PRETTY_FUNCTION__<<" called"< - int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message, Rest&&... rest){ + int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, T&& message, Rest&&... rest){ if(verbosity>10) std::cout<<__PRETTY_FUNCTION__<<" called"<