Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# backups
*~
\#*#
# thumbnails
.DS_Store
# objects
*.o
# libraries
*.so
*.a
# dependency files
*.d
# root dictionary pcm files
*.pcm
# root dictionary sourcefiles
*Dict.cxx
*Dict.h
# python pre-compiled modules
*.pyc
# data files
*.root
# core dumps
core
# ToolFramework log files
log.e
log.o
# ToolFramework UUIDs
UUID

# the dependencies symlink
Dependencies
# executables
main
RemoteControl
NodeDaemon
# lib folder is just build products
lib/*
# the include folder is actually automatically populated
include/*

63 changes: 21 additions & 42 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#todo sym links not copy headers, use macro to seach for .so files in Usertools and add the libraries to libs list and symlink to libs folder

cmake_minimum_required (VERSION 2.6)

project (ToolDAQApplicaiton)

set(TOOLDAQ_PATH "${PROJECT_SOURCE_DIR}/ToolDAQ")

#include(${TOOLDAQ_PATH}/ToolDAQFramework/CMakeLists.include)
set(DEPENDENCIES_PATH "${PROJECT_SOURCE_DIR}/Dependencies")

set(ZMQ_INC "${TOOLDAQ_PATH}/zeromq-4.0.7/include/")
set(ZMQ_LIB_PATH "${TOOLDAQ_PATH}/zeromq-4.0.7/lib")
set(ZMQ_INC "${DEPENDENCIES_PATH}/zeromq-4.0.7/include/")
set(ZMQ_LIB_PATH "${DEPENDENCIES_PATH}/zeromq-4.0.7/lib")
set(ZMQ_LIBS zmq)

set(BOOST_INC "${TOOLDAQ_PATH}/boost_1_66_0/install/include/")
set(BOOST_LIB_PATH "${TOOLDAQ_PATH}/boost_1_66_0/install/lib")
set(BOOST_INC "${DEPENDENCIES_PATH}/boost_1_66_0/install/include/")
set(BOOST_LIB_PATH "${DEPENDENCIES_PATH}/boost_1_66_0/install/lib")
set(BOOST_LIBS boost_date_time boost_serialization boost_iostreams)

set(DATAMODEL_INC "")
Expand All @@ -22,21 +22,19 @@ set(MYTOOLS_INC "")
set(MYTOOLS_LIB_PATH "")
set(MYTOOLS_LIBS "")

#add_subdirectory(${TOOLDAQ_PATH}/ToolDAQFramework/ ./ToolDAQ/ToolDAQFramework/)
set(TOOLFRAMEWORK_INC "${DEPENDENCIES_PATH}/ToolFrameworkCore/include")
set(TOOLFRAMEWORK_LIBS_PATH "${DEPENDENCIES_PATH}/ToolFrameworkCore/lib")
set(TOOLFRAMEWORK_LIBS DataModelBase Logging Store ToolChain)

if(NOT(${PROJECT_SOURCE_DIR} STREQUAL ${PROJECT_BINARY_DIR}))
message("Not Building in source directory: Copying files")
FILE(COPY ${PROJECT_SOURCE_DIR}/configfiles DESTINATION ${PROJECT_BINARY_DIR}/)
FILE(COPY ${PROJECT_SOURCE_DIR}/UserTools DESTINATION ${PROJECT_BINARY_DIR}/)
FILE(COPY ${PROJECT_SOURCE_DIR}/DataModel DESTINATION ${PROJECT_BINARY_DIR}/)
FILE(COPY ${PROJECT_SOURCE_DIR}/Setup.sh DESTINATION ${PROJECT_BINARY_DIR}/)
endif()
set(TOOLDAQ_INC "${DEPENDENCIES_PATH}/ToolDAQFramework/include")
set(TOOLDAQ_LIBS_PATH "${DEPENDENCIES_PATH}/ToolDAQFramework/lib")
set(TOOLDAQ_LIBS DAQDataModelBase DAQLogging DAQStore ServiceDiscovery ToolDAQChain)

include_directories(${PROJECT_BINARY_DIR}/DataModel ${BOOST_INC} ${ZMQ_INC} ${DATAMODEL_INC} ${MYTOOLS_INC} ${TOOLDAQ_PATH}/ToolDAQFramework/include ${TOOLDAQ_PATH}/ToolDAQFramework/src/Tool ${TOOLDAQ_PATH}/ToolDAQFramework/src/ToolChain ${TOOLDAQ_PATH}/ToolDAQFramework/src/Logging ${TOOLDAQ_PATH}/ToolDAQFramework/src/Store ${TOOLDAQ_PATH}/ToolDAQFramework/src/ServiceDiscovery/)
link_directories("${PROJECT_BINARY_DIR}/lib" ${BOOST_LIB_PATH} ${ZMQ_LIB_PATH} ${DATAMODEL_LIB_PATH} ${MYTOOLS_LIB_PATH} ${TOOLDAQ_PATH}/ToolDAQFramework/lib)
include_directories (${DATAMODEL_INC} ${MYTOOLS_INC} ${TOOLFRAMEWORK_INC} ${TOOLDAQ_INC} ${ZMQ_INC} ${BOOST_INC})
link_directories(${DATAMODEL_LIB_PATH} ${MYTOOLS_LIB_PATH} ${TOOLFRAMEWORK_LIBS_PATH} ${TOOLDAQ_LIBS_PATH} ${ZMQ_LIB_PATH} ${BOOST_LIB_PATH})

MACRO(HEADER_DIRECTORIES return_list)
FILE(GLOB_RECURSE new_list ${PROJECT_BINARY_DIR}/UserTools/*.h)
FILE(GLOB_RECURSE new_list ${PROJECT_SOURCE_DIR}/src/*.h ${PROJECT_SOURCE_DIR}/DataModel/*.h ${PROJECT_SOURCE_DIR}/UserTools/*.h )
FILE(COPY ${new_list} DESTINATION ${PROJECT_BINARY_DIR}/include)
SET(dir_list "")
FOREACH(file_path ${new_list})
Expand All @@ -47,37 +45,18 @@ MACRO(HEADER_DIRECTORIES return_list)
SET(${return_list} ${dir_list})
ENDMACRO()

FILE(COPY ${PROJECT_SOURCE_DIR}/configfiles DESTINATION ${PROJECT_BINARY_DIR}/)

HEADER_DIRECTORIES(header_list)
include_directories(${header_list})

set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)

file(GLOB_RECURSE STORE_SRC RELATIVE ${CMAKE_SOURCE_DIR} "${TOOLDAQ_PATH}/ToolDAQFramework/src/Store/*.cpp")
add_library(Store SHARED ${STORE_SRC})

file(GLOB_RECURSE LOGGING_SRC RELATIVE ${CMAKE_SOURCE_DIR} "${TOOLDAQ_PATH}/ToolDAQFramework/src/Logging/*.cpp")
add_library(Logging SHARED ${LOGGING_SRC})

file(GLOB_RECURSE SERVICEDISCOVERY_SRC RELATIVE ${CMAKE_SOURCE_DIR} "${TOOLDAQ_PATH}/ToolDAQFramework/src/ServiceDiscovery/*.cpp")
add_library(ServiceDiscovery SHARED ${SERVICEDISCOVERY_SRC})

file(GLOB_RECURSE DATAMODEL_SRC RELATIVE ${CMAKE_BINARY_DIR} "DataModel/*.cpp")
file(GLOB_RECURSE DATAMODEL_SRC RELATIVE ${CMAKE_SOURCE_DIR} "DataModel/*.cpp")
add_library(DataModel SHARED ${DATAMODEL_SRC})

file(GLOB_RECURSE MYTOOLS_SRC RELATIVE ${CMAKE_BINARY_DIR} "UserTools/*.cpp")
file(GLOB_RECURSE MYTOOLS_SRC RELATIVE ${CMAKE_SOURCE_DIR} "UserTools/*.cpp")
add_library(MyTools SHARED ${MYTOOLS_SRC})

include_directories(${TOOLDAQ_PATH}/ToolDAQFramework/src/Logging)

file(GLOB_RECURSE TOOLCHAIN_SRC RELATIVE ${CMAKE_SOURCE_DIR} "${TOOLDAQ_PATH}/ToolDAQFramework/src/ToolChain/*.cpp")
add_library(ToolChain SHARED ${TOOLCHAIN_SRC})


add_executable (main ${PROJECT_SOURCE_DIR}/src/main.cpp)
target_link_libraries (main Store Logging ToolChain ServiceDiscovery MyTools DataModel ${ZMQ_LIBS} ${BOOST_LIBS} ${DATAMODEL_LIBS} ${MYTOOLS_LIBS})

add_executable ( NodeDaemon ${TOOLDAQ_PATH}/ToolDAQFramework/src/NodeDaemon/NodeDaemon.cpp)
target_link_libraries (NodeDaemon Store ServiceDiscovery ${ZMQ_LIBS} ${BOOST_LIBS})

add_executable ( RemoteControl ${TOOLDAQ_PATH}/ToolDAQFramework/src/RemoteControl/RemoteControl.cpp)
target_link_libraries (RemoteControl Store ServiceDiscovery ${ZMQ_LIBS} ${BOOST_LIBS})
target_link_libraries (main MyTools DataModel pthread ${DATAMODEL_LIBS} ${MYTOOLS_LIBS} ${TOOLFRAMEWORK_LIBS} ${TOOLDAQ_LIBS} ${ZMQ_LIBS} ${BOOST_LIBS})
4 changes: 2 additions & 2 deletions DataModel/DataModel.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "DataModel.h"

DataModel::DataModel(){}
DataModel::DataModel():DAQDataModelBase(){}

/*
TTree* DataModel::GetTTree(std::string name){
Expand All @@ -17,7 +17,7 @@ void DataModel::AddTTree(std::string name,TTree *tree){
}


void DataModel::DeleteTTree(std::string name){
void DataModel::DeleteTTree(std::string name,TTree *tree){

m_trees.erase(name);

Expand Down
172 changes: 126 additions & 46 deletions DataModel/DataModel.h
Original file line number Diff line number Diff line change
@@ -1,59 +1,139 @@
#ifndef DATAMODEL_H
#define DATAMODEL_H

#include <map>
#include <string>
#include <vector>

//#include "TTree.h"

#include "Store.h"
#include "BoostStore.h"
#include "Logging.h"
#include "Utilities.h"

#include <zmq.hpp>
#include <atomic>
#include <mutex>

#include "DAQDataModelBase.h"
#include "Pool.h"
#include "JobQueue.h"
#include "QueryBatch.h"
#include "ManagedSocket.h"
#include "query_topics.h"
#include "type_name_as_string.h" // mostly for debug
class MonitoringVariables;

/**
* \class DataModel
*
* This class Is a transient data model class for your Tools within the ToolChain. If Tools need to comunicate they pass all data objects through the data model. There fore inter tool data objects should be deffined in this class.
*
* This class is a transient data model class for your Tools within the ToolChain. If Tools need to communicate they pass all data through the data model. Therefore inter-tool data variables should be defined in this class.
*
*
* $Author: B.Richards $
* $Date: 2019/05/26 18:34:00 $
* Contact: b.richards@qmul.ac.uk
*
*/

class DataModel {


public:

DataModel(); ///< Simple constructor

//TTree* GetTTree(std::string name);
//void AddTTree(std::string name,TTree *tree);
//void DeleteTTree(std::string name);

Store vars; ///< This Store can be used for any variables. It is an inefficent ascii based storage
BoostStore CStore; ///< This is a more efficent binary BoostStore that can be used to store a dynamic set of inter Tool variables.
std::map<std::string,BoostStore*> Stores; ///< This is a map of named BooStore pointers which can be deffined to hold a nammed collection of any tipe of BoostStore. It is usefull to store data that needs subdividing into differnt stores.

Logging *Log; ///< Log class pointer for use in Tools, it can be used to send messages which can have multiple error levels and destination end points

zmq::context_t* context; ///< ZMQ contex used for producing zmq sockets for inter thread, process, or computer communication


private:



//std::map<std::string,TTree*> m_trees;



* $Date: 2019/05/26 $
* Contact: benjamin.richards@warwick.ac.uk
*
*/

using namespace ToolFramework;

class DataModel : public DAQDataModelBase {

public:
DataModel(); ///< Simple constructor

Utilities utils; ///< for thread management

bool change_config; ///< signaller for Tools to reload their configuration variables

// Tools can add connections to this and the SocketManager
// will periodically invoke UpdateConnections to connect clients
std::map<std::string, ManagedSocket*> managed_sockets;
std::mutex managed_sockets_mtx;

Pool<Job> job_pool; ///< pool of job structures to encapsulate jobs
JobQueue job_queue; ///< job queue to submit jobs to job manager
uint32_t thread_cap; ///< total number of thread cap to use in the program
std::atomic<uint32_t> num_threads; ///< current number of threads
unsigned int worker_threads;
unsigned int max_worker_threads;

std::map<std::string, MonitoringVariables*> monitoring_variables;
std::mutex monitoring_variables_mtx;

/* ----------------------------------------- */
/* MulticastReceiveSender */
/* ----------------------------------------- */

// pool of string buffers:
// the receiver thread grabs a vector from the pool, fills it,
// the pushes the filled vector into the in_multicast_msg_queue
// and grabs a new vector from the pool
// FIXME base pool size on available RAM and struct size / make configurable
// Pool::Pool(bool in_manage=false, uint16_t period_ms=1000, size_t in_object_cap=1)
Pool<std::vector<std::string>> multicast_buffer_pool{true, 5000, 100};

// batches of received messages, both logging and monitoring
// FIXME make these pairs or structs, container+mtx
// FIXME if instead of just a vector<string> we used MulticastBatch, we could accumulate the length
// and then reserve in advance the length of the string needed for the combined message....?
// XXX actually only if we tracked by topic, as one vector<string> gets turned into 5 topical concat'd strings...
std::vector<std::vector<std::string>*> in_multicast_msg_queue;
std::mutex in_multicast_msg_queue_mtx;

// outgoing logging messages
std::vector<std::string> out_log_msg_queue;
std::mutex out_log_msg_queue_mtx;

// outgoing monitoring messages
std::vector<std::string> out_mon_msg_queue;
std::mutex out_mon_msg_queue_mtx;

// pool is shared between read and write query receivers
Pool<QueryBatch> querybatch_pool{true, 5000, 100};

/* ----------------------------------------- */
/* PubReceiver */
/* ----------------------------------------- */
std::vector<QueryBatch*> write_msg_queue;
std::mutex write_msg_queue_mtx;

/* ----------------------------------------- */
/* ReadReply */
/* ----------------------------------------- */
// TODO Tool monitoring struct?
std::vector<QueryBatch*> read_msg_queue;
std::mutex read_msg_queue_mtx;
std::deque<QueryBatch*> query_replies;
std::mutex query_replies_mtx;

/* ----------------------------------------- */
/* MulticastWorkers */
/* ----------------------------------------- */
// each element is a batch of JSON that can be inserted by the DatabaseWorkers
// FIXME these strings represent batches of multicast messages, so could be very large.
// each push_back could require reallocation, which could involve moving a lot of very large message buffers
// FIXME make these pointers, put the strings (maybe make a struct? maybe just a typedef/alias?) in a pool?
Pool<std::string> multicast_batch_pool{true, 5000, 100};

std::vector<std::string*> log_query_queue;
std::mutex log_query_queue_mtx;

std::vector<std::string*> mon_query_queue;
std::mutex mon_query_queue_mtx;

std::vector<std::string*> rootplot_query_queue;
std::mutex rootplot_query_queue_mtx;

std::vector<std::string*> plotlyplot_query_queue;
std::mutex plotlyplot_query_queue_mtx;

/* ----------------------------------------- */
/* WriteWorkers */
/* ----------------------------------------- */
std::vector<QueryBatch*> write_query_queue;
std::mutex write_query_queue_mtx;

/* ----------------------------------------- */
/* DatabaseWorkers */
/* ----------------------------------------- */

std::vector<QueryBatch*> query_results; // output, awaiting for result conversion
std::mutex query_results_mtx;

private:

};


Expand Down
18 changes: 18 additions & 0 deletions DataModel/ManagedSocket.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#ifndef ManagedSocket_H
#define ManagedSocket_H

#include <map>
#include <mutex>
#include <zmq.hpp>

struct ManagedSocket {
std::mutex socket_mtx;
bool socket_manager_request=false;
zmq::socket_t* socket=nullptr;
std::string service_name;
/* std::string remote_port;*/
std::string remote_port_name;
std::map<std::string,ToolFramework::Store*> connections;
};

#endif
Loading