diff --git a/livegrid/tes_livegrid_worker.conf b/livegrid/tes_livegrid_worker.conf new file mode 100644 index 0000000..eff6e37 --- /dev/null +++ b/livegrid/tes_livegrid_worker.conf @@ -0,0 +1,4 @@ +# this will not work without an X Window connection +[program: tes_livegrid_worker] +command=/home/xf08bm/conda_envs/bluesky_workers/bin/python /home/xf08bm/tes-workers/livegrid/tes_livegrid_worker.py +user=xf08bm \ No newline at end of file diff --git a/livegrid/tes_livegrid_worker.py b/livegrid/tes_livegrid_worker.py new file mode 100644 index 0000000..cefe3b9 --- /dev/null +++ b/livegrid/tes_livegrid_worker.py @@ -0,0 +1,222 @@ +import argparse +import logging +import pprint + +import matplotlib +matplotlib.use('qt5agg') +import matplotlib.backends.backend_qt5 +import matplotlib.pyplot as plt + +import numpy as np + +from bluesky_kafka import RemoteDispatcher +from caproto.threading.client import Context +from event_model import DocumentRouter, RunRouter + + +matplotlib.backends.backend_qt5._create_qApp() + +log = logging.getLogger("tes.worker.livegrid") +log.addHandler(logging.StreamHandler()) +log.setLevel("DEBUG") + +""" +Typical usages: + +python tes_livegrid_server.py \ + --topics srx.bluesky.documents \ + --bootstrap-servers 10.0.137.8:9092 \ + --group-id srx.livegrid + +python tes_livegrid_server.py \ + --topics tes.bluesky.documents \ + --bootstrap-servers 10.0.137.8:9092 \ + --group-id tes.livegrid +""" + + +class LiveGridDocumentRouter(DocumentRouter): + def __init__(self, array_counter_data_key, *args, **kwargs): + super().__init__(*args, **kwargs) + self.array_counter_name = array_counter_data_key + + self.fig = None + self.ax = None + self.axes_image = None + self.image_array = None + + self.run_uid = None + self.array_counter_descriptor_uid = None + self.roi_pv_name = None + + self.epics_context = Context() + self.roi_pv = None + + def start(self, doc): + log.debug("start") + log.debug(pprint.pformat(doc)) + log.info(f"starting a LiveGrid for run {doc['uid']}") + self.run_uid = doc["uid"] + + # TODO: is this information in the document? + self.image_array = np.zeros((10, 10)) + self.fig, self.ax = plt.subplots(nrows=1, ncols=1) + self.axes_image = self.ax.imshow(self.image_array) + plt.show(block=False) + + super().start(doc) + + def descriptor(self, doc): + """ + In [15]: list_scans[0].descriptors[0]["data_keys"]["xs_channel1_rois_roi01_value"] + Out[15]: + {'source': 'PV:XF:08BM-ES{Xsp:1}:C1_ROI1:Value_RBV', + 'dtype': 'number', + 'shape': [], + 'precision': 4, + 'units': '', + 'lower_ctrl_limit': 0.0, + 'upper_ctrl_limit': 0.0, + 'object_name': 'xs'} + """ + if self.run_uid == doc["run_start"]: + log.debug("descriptor:") + log.debug(pprint.pformat(doc)) + if self.array_counter_name in doc["data_keys"]: + log.info("found the Array_Counter") + self.array_counter_descriptor_uid = doc["uid"] + else: + log.info("no ArrayCounter") + + if "xs_channel1_rois_roi1_value" in doc["data_keys"]: + self.roi_pv_name = doc["data_keys"]["xs_channel1_rois_roi1_value"]["source"] + self.roi_pv, = self.epics_context.get_pvs(self.roi_pv_name) + log.info(f"found the ROI PV name: {self.roi_pv_name}") + else: + pass + + super().descriptor(doc) + + def event(self, doc): + super().event(doc) + raise Exception("wasn't expecting this") + + def event_page(self, doc): + """ + Respond to array counter monitor events. + + Read ROI PV. + + In [41]: db[-1].descriptors[0]["data_keys"]["xs_channel1_rois_roi01_value"] + Out[41]: + {'source': 'PV:XF:08BM-ES{Xsp:1}:C1_ROI1:Value_RBV', + 'dtype': 'number', + 'shape': [], + 'precision': 4, + 'units': '', + 'lower_ctrl_limit': 0.0, + 'upper_ctrl_limit': 0.0, + 'object_name': 'xs'} + + In [42]: db[-1].descriptors[0]["data_keys"]["xs_channel1_rois_roi01_value_sum"] + Out[42]: + {'source': 'PV:XF:08BM-ES{Xsp:1}:C1_ROI1:ValueSum_RBV', + 'dtype': 'number', + 'shape': [], + 'precision': 4, + 'units': '', + 'lower_ctrl_limit': 0.0, + 'upper_ctrl_limit': 0.0, + 'object_name': 'xs'} + + In [16]: list_scans = list(db(plan_name="list_scan") + In [17]: len(list_scans) + Out[17]: 230 + """ + log.debug("event page:") + log.debug(pprint.pformat(doc)) + if doc["descriptor"] == self.array_counter_descriptor_uid: + log.debug("plot a point!") + array_counter = doc["data"]["ArrayCounter"][0] + row = (array_counter - 1) // 10 + col = (array_counter - 1) % 10 + log.debug("array_counter: %s, row: %s, column: %s", array_counter, row, col) + roi_value = self.roi_pv.read() + self.image_array[row, col] = roi_value.data[0] + log.debug(self.image_array) + self.ax.imshow(self.image_array) + self.fig.canvas.draw_idle() + #self.axes_image.set_data(self.image_array) + else: + log.debug("not an array counter event") + + super().event_page(doc) + + def stop(self, doc): + log.debug("stop:") + log.debug(pprint.pformat(doc)) + + super().stop(doc) + + +def livegrid_dispatcher(manager, busy_function, topics, bootstrap_servers, group_id, **kwargs): + dispatcher = RemoteDispatcher( + topics=topics, + bootstrap_servers=bootstrap_servers, + group_id=group_id + ) + + dispatcher.subscribe(func=manager) + + dispatcher.start(work_during_wait=busy_function) + + +if __name__ == "__main__": + argparser = argparse.ArgumentParser() + argparser.add_argument( + "--topics", + type=str, + help="comma-delimited list", + nargs="+", + default=["tes.bluesky.documents"], + ) + # CMB01 broker: 10.0.137.8:9092 + argparser.add_argument( + "--bootstrap-servers", + type=str, + help="comma-delimited list", + default="10.0.137.8:9092", + ) + argparser.add_argument( + "--group-id", type=str, help="a string", default="tes-livegrid-worker" + ) + argparser.add_argument( + "--array-counter-data-key", + type=str, + help="Xspress3 array counter data key", + default="ArrayCounter" + ) + + args_ = argparser.parse_args() + print(args_) + + # factory('start', start_doc) -> List[Callbacks], List[SubFactories] + def livegrid_document_router_factory(start_doc_name, start_doc): + # create a DocumentRouter only for list_scans + if start_doc["plan_name"] == "list_scan": + log.info("we have a list_scan") + livegrid_document_router = LiveGridDocumentRouter( + array_counter_data_key=args_.array_counter_data_key, + #array_counter_name="ArrayCounter" + ) + livegrid_document_router(start_doc_name, start_doc) + return [livegrid_document_router], [] + else: + log.info("not a list_scan!") + return [], [] + + livegrid_dispatcher( + manager=RunRouter(factories=[livegrid_document_router_factory]), + busy_function=lambda: plt.pause(1), + **vars(args_) + ) diff --git a/livegrid/testing/bitnami-kafka-docker-compose.yml b/livegrid/testing/bitnami-kafka-docker-compose.yml new file mode 100644 index 0000000..3e34540 --- /dev/null +++ b/livegrid/testing/bitnami-kafka-docker-compose.yml @@ -0,0 +1,33 @@ +version: '2' + +services: + zookeeper: + image: 'bitnami/zookeeper:3' + ports: + - '2181:2181' + volumes: + - 'zookeeper_data:/bitnami' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: 'bitnami/kafka:2' + ports: + - '9092:9092' + - '29092:29092' + volumes: + - 'kafka_data:/bitnami' + environment: + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true + depends_on: + - zookeeper + +volumes: + zookeeper_data: + driver: local + kafka_data: + driver: local \ No newline at end of file diff --git a/livegrid/testing/send_documents_to_livegrid_worker.py b/livegrid/testing/send_documents_to_livegrid_worker.py new file mode 100644 index 0000000..15c5c88 --- /dev/null +++ b/livegrid/testing/send_documents_to_livegrid_worker.py @@ -0,0 +1,144 @@ +import argparse +import datetime +import logging +import time + +from bluesky_kafka import Publisher +from event_model import compose_run + + +logging.basicConfig(level=logging.DEBUG) + + +""" +Local development: +Start docker if necessary. + sudo systemctl start docker +Start a Kafka broker. + docker-compose -f testing/bitnami-kafka-docker-compose.yml up +Add the necessary topic if automatic topic creation is not enabled: + ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic srx.bluesky.documents +or check that the necessary topic exists: + ./bin/kafka-topics.sh --list --zookeeper localhost:2181 +Start simulated Xspress3. + python testing/sim_xspress3.py --list-pvs +Start tes_livegrid_worker.py. + python tes_livegrid_worker.py +Start send_documents_to_livegrid_worker.py + python testing/send_documents_to_livegrid_worker.py --topic tes.bluesky.documents --bootstrap-servers 10.0.137.8:9092 +""" + + +def send_documents(topic, bootstrap_servers): + print("send documents to kafka broker") + kafka_publisher = Publisher( + topic=topic, + bootstrap_servers=bootstrap_servers, + key="testing", + producer_config={"enable.idempotence": False} + ) + + run_start_doc, compose_desc, compose_resource, compose_stop = compose_run() + run_start_doc["scan_id"] = 1 + run_start_doc["plan_name"] = "list_scan" + kafka_publisher("start", run_start_doc) + + # copied from run 588bed89-b8e9-4882-86b2-c9471612914e at SRX + roi_event_descriptor_doc, compose_roi_event, compose_roi_event_page = compose_desc( + data_keys={ + "xs_channel1_rois_roi1_value": { + "source": "PV:XF:08BM-ES{Xsp:1}:C1_ROI1:Value_RBV", + "dtype": "number", + "shape": [], + "precision": 4, + "units": "", + "lower_ctrl_limit": 0.0, + "upper_ctrl_limit": 0.0, + } + }, + configuration={ + "xs_channel1_rois_roi1_value": { + "data": {"xs_channel1_rois_roi1_value": 6201.48337647908}, + "timestamps": {"xs_channel1_rois_roi1_value": 1572730676.801648}, + "data_keys": { + "xs_channel1_rois_roi1_value": { + "source": "PV:XF:08BM-ES{Xsp:1}:C1_ROI1:Value_RBV", + "dtype": "number", + "shape": [], + "precision": 4, + "units": "", + "lower_ctrl_limit": 0.0, + "upper_ctrl_limit": 0.0, + } + }, + } + }, + name="ROI_01_monitor", + object_keys={"ROI_01": ["ROI_01"]}, + ) + kafka_publisher("descriptor", roi_event_descriptor_doc) + + # copied from run 588bed89-b8e9-4882-86b2-c9471612914e at SRX + array_counter_event_descriptor_doc, compose_array_counter_event, compose_array_counter_event_page = compose_desc( + data_keys={ + "ArrayCounter": { + "source": "PV:XF:08BM-ES{Xsp:1}:C1_ROI1:ArrayCounter_RBV", + "dtype": "number", + "shape": [], + "precision": 4, + "units": "", + "lower_ctrl_limit": 0.0, + "upper_ctrl_limit": 0.0, + } + }, + configuration={ + "ArrayCounter": { + "data": {"ArrayCounter": 0}, + "timestamps": {"ArrayCounter": datetime.datetime.now().timestamp()}, + "data_keys": { + "ArrayCounter": { + "source": "PV:XF:08BM-ES{Xsp:1}:C1_ROI1:ArrayCounter_RBV", + "dtype": "number", + "shape": [], + "precision": 4, + "units": "", + "lower_ctrl_limit": 0.0, + "upper_ctrl_limit": 0.0, + } + }, + } + }, + name="array_counter_monitor", + object_keys={"ArrayCounter": ["ArrayCounter"]}, + ) + kafka_publisher("descriptor", array_counter_event_descriptor_doc) + + for array_counter in range(1, 20): + time.sleep(1) + array_counter_event_doc = compose_array_counter_event( + data={"ArrayCounter": array_counter}, + timestamps={"ArrayCounter": datetime.datetime.now().timestamp()} + ) + kafka_publisher("event", array_counter_event_doc) + # make this random? + roi_event_doc = compose_roi_event( + data={'xs_channel1_rois_roi1_value': 5.0}, + timestamps={"xs_channel1_rois_roi1_value": datetime.datetime.now().timestamp()}, + ) + kafka_publisher("event", roi_event_doc) + + run_stop_doc = compose_stop() + kafka_publisher("stop", run_stop_doc) + + kafka_publisher.flush() + + +if __name__ == "__main__": + argparser = argparse.ArgumentParser() + argparser.add_argument("--topic", type=str, default="srx.bluesky.documents") + argparser.add_argument("--bootstrap-servers", type=str, help="comma-delimited list", default="localhost:9092") + + args = argparser.parse_args() + print(args) + + send_documents(**vars(args)) diff --git a/livegrid/testing/sim_xspress3.py b/livegrid/testing/sim_xspress3.py new file mode 100644 index 0000000..1000bc2 --- /dev/null +++ b/livegrid/testing/sim_xspress3.py @@ -0,0 +1,15 @@ +from caproto.server import pvproperty, PVGroup, ioc_arg_parser, run +from textwrap import dedent + + +class SimulatedXspress3IOC(PVGroup): + roi1_rbv = pvproperty(value=1, name="{{Xsp:1}}:C1_ROI1:Value_RBV") + + +if __name__ == '__main__': + ioc_options, run_options = ioc_arg_parser( + default_prefix='PV:XF:08BM-ES', + desc="simulated Xspress3" + ) + ioc = SimulatedXspress3IOC(**ioc_options) + run(ioc.pvdb, **run_options)