From 34800924a1f5060025376a50f6c1690a9d81f318 Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Thu, 13 Nov 2025 21:58:43 -0500 Subject: [PATCH 1/4] #230 - add fallback bridge component to log events without portal Signed-off-by: Lance-Drane --- ipsframework/bridges/__init__.py | 6 + ipsframework/bridges/basic_bridge.py | 119 ++++++ ipsframework/bridges/local_event_logger.py | 344 ++++++++++++++++ .../portal_bridge.py} | 366 ++---------------- ipsframework/configurationManager.py | 71 ++-- ipsframework/services.py | 26 +- 6 files changed, 552 insertions(+), 380 deletions(-) create mode 100644 ipsframework/bridges/__init__.py create mode 100644 ipsframework/bridges/basic_bridge.py create mode 100644 ipsframework/bridges/local_event_logger.py rename ipsframework/{portalBridge.py => bridges/portal_bridge.py} (51%) diff --git a/ipsframework/bridges/__init__.py b/ipsframework/bridges/__init__.py new file mode 100644 index 00000000..9df9d819 --- /dev/null +++ b/ipsframework/bridges/__init__.py @@ -0,0 +1,6 @@ +"""Bridges are components which handle supplementary tasks in the IPS Framework. Applications should not need to import these classes directly. + +There are two bridges available: +- `BasicBridge`, which provides simple system logging +- `PortalBridge`, which provides all functionality of `BasicBridge` and additionally allows interfacing with a remote IPS Portal +""" diff --git a/ipsframework/bridges/basic_bridge.py b/ipsframework/bridges/basic_bridge.py new file mode 100644 index 00000000..1b5f5756 --- /dev/null +++ b/ipsframework/bridges/basic_bridge.py @@ -0,0 +1,119 @@ +# ------------------------------------------------------------------------------- +# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. +# ------------------------------------------------------------------------------- +import hashlib +import time +from typing import Literal + +from ipsframework import Component +from ipsframework.bridges.local_event_logger import LocalEventLogger, SimulationData +from ipsframework.cca_es_spec import Event + + +class BasicBridge(Component): + """ + Framework component meant to handle simple event logging. + + This component should not exist in the event that the simulation is interacting with the web framework - see the PortalBridge component instead. + """ + + def __init__(self, services, config): + """ + Declaration of private variables and initialization of + :py:class:`component.Component` object. + """ + super().__init__(services, config) + self.sim_map: dict[str, SimulationData] = {} + self.done = False + self.local_event_logger = LocalEventLogger() + + def init(self, timestamp=0.0, **keywords): + """ + Subscribe to *_IPS_MONITOR* events and register callback :py:meth:`.process_event`. + """ + self.services.subscribe('_IPS_MONITOR', 'process_event') + self.local_event_logger.init(self.services) + + def step(self, timestamp=0.0, **keywords): + """ + Poll for events. + """ + while not self.done: + self.services.process_events() + time.sleep(0.5) + + def finalize(self, timestamp=0.0, **keywords): + self.local_event_logger.finalize(self.sim_map) + + def process_event(self, topicName: str, theEvent: Event): + """ + Process a single event *theEvent* on topic *topicName*. + """ + event_body = theEvent.getBody() + sim_name = event_body['sim_name'] + portal_data = event_body['portal_data'] + try: + portal_data['sim_name'] = event_body['real_sim_name'] + except KeyError: + portal_data['sim_name'] = sim_name + + if portal_data['eventtype'] == 'IPS_START': + sim_root = event_body['sim_root'] + self.init_simulation(sim_name, sim_root) + + sim_data = self.sim_map[sim_name] + if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': + sim_data.phys_time_stamp = portal_data['phystimestamp'] + return + else: + portal_data['phystimestamp'] = sim_data.phys_time_stamp + + if portal_data['eventtype'] == 'PORTAL_REGISTER_NOTEBOOK': + return + + if portal_data['eventtype'] == 'PORTAL_ADD_JUPYTER_DATA': + return + + if portal_data['eventtype'] == 'PORTAL_UPLOAD_ENSEMBLE_PARAMS': + return + + portal_data['portal_runid'] = sim_data.portal_runid + + if portal_data['eventtype'] == 'IPS_SET_MONITOR_URL': + sim_data.monitor_url = portal_data['vizurl'] + elif sim_data.monitor_url: + portal_data['vizurl'] = sim_data.monitor_url + + if portal_data['eventtype'] == 'IPS_START' and 'parent_portal_runid' not in portal_data: + portal_data['parent_portal_runid'] = sim_data.parent_portal_runid + portal_data['seqnum'] = sim_data.counter + + if 'trace' in portal_data: + portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + + self.local_event_logger.send_event(self.services, sim_data, portal_data) + + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) + + def init_simulation(self, sim_name: str, sim_root: str): + """ + Create and send information about simulation *sim_name* living in + *sim_root* so the portal can set up corresponding structures to manage + data from the sim. + """ + self.services.debug('Initializing simulation using BasicBridge: %s -- %s ', sim_name, sim_root) + sim_data = self.local_event_logger.init_simulation(self.services, sim_name, sim_root, self.HOST, self.USER) + self.sim_map[sim_data.sim_name] = sim_data + + def terminate(self, status: Literal[0, 1]): + """ + Clean up services and call :py:obj:`sys_exit`. + """ + + Component.terminate(self, status) diff --git a/ipsframework/bridges/local_event_logger.py b/ipsframework/bridges/local_event_logger.py new file mode 100644 index 00000000..ff35de45 --- /dev/null +++ b/ipsframework/bridges/local_event_logger.py @@ -0,0 +1,344 @@ +import datetime +import glob +import hashlib +import itertools +import json +import os +import re +import time +from collections import defaultdict +from typing import TYPE_CHECKING, Any + +from ipsframework import ipsutil +from ipsframework.convert_log_function import convert_logdata_to_html +from ipsframework.services import ServicesProxy + +if TYPE_CHECKING: + from io import FileIO + + +def hash_file(file_name): # pragma: no cover + """ + Return the MD5 hash of a file + :rtype: str + :param file_name: Full path to file + :return: MD5 of file_name + """ + BLOCKSIZE = 65536 + hasher = hashlib.md5() + with open(file_name, 'rb') as afile: + buf = afile.read(BLOCKSIZE) + while len(buf) > 0: + hasher.update(buf) + buf = afile.read(BLOCKSIZE) + return hasher.hexdigest() + + +class SimulationData: + """ + Container for simulation data. + """ + + def __init__(self): + self.counter = 0 + self.monitor_file_name = '' + self.portal_runid = '' + self.parent_portal_runid = '' + self.sim_name = '' + self.sim_root = '' + self.monitor_file: FileIO = None # type: ignore + self.json_monitor_file: FileIO = None # type: ignore + self.phys_time_stamp = -1 + self.monitor_url = '' + self.mpo_steps = [None] + self.mpo_wid = None + self.bigbuf = '' + + +class LocalEventLogger: + """The LocalEventLogger class manages event logging for the supplemental bridge components.""" + + def __init__(self) -> None: + # self.curTime = time.localtime() + # self.startTime = self.curTime + self.mpo = None + self.mpo_name_counter: defaultdict[str, int] = defaultdict(lambda: 0) + self.counter = 0 + self.dump_freq = 10 + self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation + self.last_dump_time = time.time() + self.write_to_htmldir = True + self.html_dir = '' + self.first_portal_runid = None + + def init(self, services: ServicesProxy) -> None: + """Called from the bridge component's `init` function.""" + try: + freq = int(services.get_config_param('HTML_DUMP_FREQ', silent=True)) + except Exception: + pass + else: + self.dump_freq = freq + + try: + self.html_dir = services.get_config_param('USER_W3_DIR', silent=True) or '' + except Exception: + services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + if self.html_dir.strip() == '': + services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + try: + os.mkdir(self.html_dir) + except FileExistsError: + pass + except Exception: + services.warning('Unable to create HTML directory - disabling web-visible logging') + self.write_to_htmldir = False + + def finalize(self, sim_map: dict[str, SimulationData]) -> None: + for sim_data in sim_map.values(): + try: + sim_data.monitor_file.close() + sim_data.json_monitor_file.close() + except Exception: + pass + + def init_simulation(self, services: ServicesProxy, sim_name: str, sim_root: str, hostname: str, username: str) -> SimulationData: + """ + Create and send information about simulation *sim_name* living in + *sim_root* so the portal can set up corresponding structures to manage + data from the sim. + + :returns: + """ + sim_data = SimulationData() + sim_data.sim_name = sim_name + sim_data.sim_root = sim_root + + d = datetime.datetime.now() + date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) + sim_data.portal_runid = f'{sim_name}_{hostname}_{username}_{date_str}' + try: + services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) + except Exception: + services.error('Simulation %s is not accessible', sim_name) + return + + if self.first_portal_runid: + sim_data.parent_portal_runid = self.first_portal_runid + else: + self.first_portal_runid = sim_data.portal_runid + + if sim_data.sim_root.strip() == '.': + sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] + sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') + try: + os.makedirs(sim_log_dir, exist_ok=True) + except OSError as oserr: + services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) + raise + + sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') + try: + sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) + except IOError as oserr: + services.error('Error opening file %s: error(%s): %s' % (sim_data.monitor_file_name, oserr.errno, oserr.strerror)) + services.error('Using /dev/null instead') + sim_data.monitor_file = open('/dev/null', 'w') + json_fname = sim_data.monitor_file_name.replace('eventlog', 'jsonl') + sim_data.json_monitor_file = open(json_fname, 'w') + + if self.mpo: # pragma: no cover + try: + sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') + print('sim_data.mpo_wid = ', sim_data.mpo_wid) + except Exception as e: + print(e) + print('sim_data.mpo_wid = ', sim_data.mpo_wid) + sim_data.mpo_wid = None + else: + sim_data.mpo_steps = [sim_data.mpo_wid['uid']] + + return sim_data + + def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_data: dict[str, Any]): + """ + Send contents of *event_data* and *sim_data* to portal. + """ + timestamp = ipsutil.getTimeString() + buf = '%8d %s ' % (sim_data.counter, timestamp) + for k, v in event_data.items(): + if len(str(v).strip()) == 0: + continue + if ' ' in str(v): + buf += "%s='%s' " % (k, str(v)) + else: + buf += '%s=%s ' % (k, str(v)) + buf += '\n' + sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) + sim_data.bigbuf += buf + + buf = json.dumps(event_data) + sim_data.json_monitor_file.write('%s\n' % buf) + + freq = self.dump_freq + if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): + self.last_dump_time = time.time() + html_filename = sim_data.monitor_file_name.replace('eventlog', 'html') + html_page = convert_logdata_to_html(sim_data.bigbuf) + open(html_filename, 'w').writelines(html_page) + if self.write_to_htmldir: + html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) + try: + open(html_file, 'w').writelines(html_page) + except Exception: + services.exception('Error writing html file into USER_W3_DIR directory') + self.write_to_htmldir = False + + if sim_data.mpo_wid: + self.send_mpo_data(event_data, sim_data) + + def send_mpo_data(self, event_data, sim_data: SimulationData): # pragma: no cover + def md5(fname): + "Courtesy of stackoverflow 3431825" + hash_md5 = hashlib.md5() + with open(fname, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b''): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): + """Add a local file to the workflow attaching to parent. Calculate + checksum and if the file is already in the mpo database, use the + already the UID of the already existing file when adding the data + object - this creates a linkage to the original. The checksum and + local file path and name are added as metadata. + + This function relies on the user space metadata, ips_checksum + and ips_filename. The checksum is the md5 sum and the filename + is expected should have at least a relative qualifying path. + + workflow : workflow_id + parent : parent_id + """ + # if file exist, look for its checksum in the database + try: + checksum = md5(file) + except Exception: + print(('checksum could not find file:', file)) + raise + + is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) + # search always returns a list of dictionaries + # if checksum exists, use first dataobject that has it + # api search results are sorted by time + # Note, check this with eqdsk dataobject in test-api + print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') + print(len(is_checksum), file) + print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') + + if len(is_checksum) > 0: + # uid is chosen to be first occurrence + # parent_uid is uid of object metadata is attached to. + file_uid = is_checksum[0]['parent_uid'] + + # Create dataobject reference by uid in the workflow + dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) + self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) + # add filename metadata the dataobject reference + self.mpo.meta(dataobject['uid'], 'ips_filename', file) + else: + print(('file', file)) + file_uri = file + # Create new dataobject by uri and insert reference in to workflow + dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) + # add checksum metadata to original data object + # add function currently only returns uri field, so fetch full record + full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] + # add checksum so dataobject and also + self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) + self.mpo.meta(dataobject['uid'], 'ips_filename', file) + self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) + dataobject = full_dataobject + return dataobject + + recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] + recordable_mpo_activities = ['IPS_CALL_BEGIN'] + comment = event_data['comment'] + event_type = event_data['eventtype'] + + if event_type not in recordable_events: + return + inp_objs = [] + if event_type == 'IPS_CALL_END': + del sim_data.mpo_steps[-1] + return + try: + if event_type == 'IPS_STAGE_INPUTS': + r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') + o = r.match(comment) + (_, path, files) = o.groups() + glist = [glob.glob(os.path.join(path, f)) for f in files.split()] + for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: + mpo_data_obj = mpo_add_file( + sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' + ) + inp_objs.append(mpo_data_obj['uid']) + + if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: + return + + count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] + if event_type == 'IPS_CALL_BEGIN': + target = event_data['comment'].split()[-1] + step_name = '%s %d' % (target, count) + else: + step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) + + if event_type == 'IPS_STAGE_OUTPUTS': + r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') + o = r.match(comment) + (_, path, files) = o.groups() + if not files: + return + activity = self.mpo.step( + workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] + ) + self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 + if event_type == 'IPS_STAGE_OUTPUTS': + glist = [glob.glob(os.path.join(path, f)) for f in files.split()] + for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: + """ + (f_uid, f_hash) = get_file_uid(path, file_name) + if f_uid: + mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, + parentobj_ID=activity['uid'], + name=file_name, + desc="An output file", + uri='file:' + os.path.join(path, file_name), + uid=f_uid, + source = f_uid) + else: + mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, + parentobj_ID=activity['uid'], + name=file_name, + desc="An output file", + uri='file:' + os.path.join(path, file_name)) + """ + mpo_data_obj = mpo_add_file( + sim_data.mpo_wid, + activity['uid'], + # sim_data.mpo_wid['uid'], + os.path.join(path, file_name), + shortname=file_name, + longdesc='An output file', + ) + + except Exception as e: + print('*************', e) + else: + if event_type in recordable_mpo_activities: + sim_data.mpo_steps.append(activity['uid']) diff --git a/ipsframework/portalBridge.py b/ipsframework/bridges/portal_bridge.py similarity index 51% rename from ipsframework/portalBridge.py rename to ipsframework/bridges/portal_bridge.py index 528924c5..425442c2 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/bridges/portal_bridge.py @@ -1,16 +1,11 @@ # ------------------------------------------------------------------------------- # Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. # ------------------------------------------------------------------------------- -import datetime -import glob import hashlib -import itertools import json import os -import re import tarfile import time -from collections import defaultdict from multiprocessing import Event, Pipe, Process from multiprocessing.connection import Connection from multiprocessing.synchronize import Event as EventType @@ -18,25 +13,8 @@ import urllib3 -from ipsframework import Component, ipsutil -from ipsframework.convert_log_function import convert_logdata_to_html - - -def hash_file(file_name): # pragma: no cover - """ - Return the MD5 hash of a file - :rtype: str - :param file_name: Full path to file - :return: MD5 of file_name - """ - BLOCKSIZE = 65536 - hasher = hashlib.md5() - with open(file_name, 'rb') as afile: - buf = afile.read(BLOCKSIZE) - while len(buf) > 0: - hasher.update(buf) - buf = afile.read(BLOCKSIZE) - return hasher.hexdigest() +from ipsframework import Component +from ipsframework.bridges.local_event_logger import LocalEventLogger, SimulationData def send_post(conn: Connection, stop: EventType, url: str): @@ -203,40 +181,19 @@ def __init__(self, target: Callable, *args): class PortalBridge(Component): """ - Framework component to communicate with the SWIM web portal. + Framework component to communicate with the IPS web portal. """ - class SimulationData: - """ - Container for simulation data. - """ - - def __init__(self): - self.counter = 0 - self.monitor_file_name = '' - self.portal_runid = None - self.parent_portal_runid = None - self.sim_name = '' - self.sim_root = '' - self.monitor_file = None - self.json_monitor_file = None - self.phys_time_stamp = -1 - self.monitor_url = None - self.mpo_steps = [None] - self.mpo_wid = None - self.bigbuf = '' - def __init__(self, services, config): """ Declaration of private variables and initialization of :py:class:`component.Component` object. """ super().__init__(services, config) - self.curTime = time.localtime() - self.startTime = self.curTime - self.sim_map = {} - self.portal_url = None + self.sim_map: dict[str, SimulationData] = {} self.done = False + self.local_event_logger = LocalEventLogger() + self.portal_url = '' self.first_event = True self.childProcess = None self.childProcessStop = None @@ -244,15 +201,6 @@ def __init__(self, services, config): self.url_manager_jupyter_notebook = None self.url_manager_jupyter_data = None self.url_manager_ensemble_uploads = None - self.mpo = None - self.mpo_name_counter = defaultdict(lambda: 0) - self.counter = 0 - self.dump_freq = 10 - self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation - self.last_dump_time = time.time() - self.write_to_htmldir = True - self.html_dir = '' - self.first_portal_runid = None def init(self, timestamp=0.0, **keywords): """ @@ -267,31 +215,9 @@ def init(self, timestamp=0.0, **keywords): self.portal_api_key = self._IPS_PORTAL_API_KEY except AttributeError: pass - self.services.subscribe('_IPS_MONITOR', 'process_event') - try: - freq = int(self.services.get_config_param('HTML_DUMP_FREQ', silent=True)) - except Exception: - pass - else: - self.dump_freq = freq - try: - self.html_dir = self.services.get_config_param('USER_W3_DIR', silent=True) or '' - except Exception: - self.services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - if self.html_dir.strip() == '': - self.services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - try: - os.mkdir(self.html_dir) - except FileExistsError: - pass - except Exception: - self.services.warning('Unable to create HTML directory - disabling web-visible logging') - self.write_to_htmldir = False + self.services.subscribe('_IPS_MONITOR', 'process_event') + self.local_event_logger.init(self.services) def step(self, timestamp=0.0, **keywords): """ @@ -302,12 +228,7 @@ def step(self, timestamp=0.0, **keywords): time.sleep(0.5) def finalize(self, timestamp=0.0, **keywords): - for sim_data in self.sim_map.values(): - try: - sim_data.monitor_file.close() - sim_data.json_monitor_file.close() - except Exception: - pass + self.local_event_logger.finalize(self.sim_map) def process_event(self, topicName, theEvent): """ @@ -370,55 +291,7 @@ def process_event(self, topicName, theEvent): if 'trace' in portal_data: portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() - self.send_event(sim_data, portal_data) - sim_data.counter += 1 - self.counter += 1 - - if portal_data['eventtype'] == 'IPS_END': - del self.sim_map[sim_name] - - if len(self.sim_map) == 0: - if self.childProcess: - self.childProcessStop.set() - self.childProcess.join() - self.check_send_post_responses() - self.done = True - self.services.debug('No more simulation to monitor - exiting') - time.sleep(1) - - def send_event(self, sim_data, event_data): - """ - Send contents of *event_data* and *sim_data* to portal. - """ - timestamp = ipsutil.getTimeString() - buf = '%8d %s ' % (sim_data.counter, timestamp) - for k, v in event_data.items(): - if len(str(v).strip()) == 0: - continue - if ' ' in str(v): - buf += "%s='%s' " % (k, str(v)) - else: - buf += '%s=%s ' % (k, str(v)) - buf += '\n' - sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) - sim_data.bigbuf += buf - - buf = json.dumps(event_data) - sim_data.json_monitor_file.write('%s\n' % buf) - - freq = self.dump_freq - if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): - self.last_dump_time = time.time() - html_filename = sim_data.monitor_file_name.replace('eventlog', 'html') - html_page = convert_logdata_to_html(sim_data.bigbuf) - open(html_filename, 'w').writelines(html_page) - if self.write_to_htmldir: - html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) - try: - open(html_file, 'w').writelines(html_page) - except Exception: - self.services.exception('Error writing html file into USER_W3_DIR directory') - self.write_to_htmldir = False + self.local_event_logger.send_event(self.services, sim_data, portal_data) if self.portal_url: if self.first_event: # First time, launch sendPost.py daemon @@ -429,14 +302,23 @@ def send_event(self, sim_data, event_data): self.first_event = False try: - self.parent_conn.send(event_data) + self.parent_conn.send(portal_data) except OSError: pass self.check_send_post_responses() - if sim_data.mpo_wid: - self.send_mpo_data(event_data, sim_data) + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + if self.childProcess: + self.childProcessStop.set() + self.childProcess.join() + self.check_send_post_responses() + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) def check_send_post_responses(self): while self.parent_conn.poll(): @@ -459,7 +341,7 @@ def check_send_post_responses(self): self.services.error('Portal Error: %d %s', code, msg) elif code == -1: # disable portal, stop trying to send more data - self.portal_url = None + self.portal_url = '' self.services.error('Disabling portal because: %s', msg) else: self.services.debug('Portal Response: %d %s', code, msg) @@ -478,14 +360,14 @@ def http_req_and_response(self, manager: UrlRequestProcessManager, event_data): if code == -1: # disable portal, stop trying to send more data - self.portal_url = None + self.portal_url = '' self.services.error('Disabling portal because: %s', msg) elif code >= 400: self.services.error('Portal Error: %d %s', code, msg) else: self.services.debug('Portal Response: %d %s', code, msg) - def send_jupyter_notebook(self, sim_data, event_data): + def send_jupyter_notebook(self, sim_data: SimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_notebook: self.url_manager_jupyter_notebook = UrlRequestProcessManager( @@ -493,7 +375,7 @@ def send_jupyter_notebook(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_jupyter_notebook, event_data) - def send_notebook_data(self, sim_data, event_data): + def send_notebook_data(self, sim_data: SimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_data: self.url_manager_jupyter_data = UrlRequestProcessManager( @@ -501,7 +383,7 @@ def send_notebook_data(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_jupyter_data, event_data) - def send_ensemble_variables(self, sim_data, event_data): + def send_ensemble_variables(self, sim_data: SimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_ensemble_uploads: self.url_manager_ensemble_uploads = UrlRequestProcessManager( @@ -509,205 +391,17 @@ def send_ensemble_variables(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_ensemble_uploads, event_data) - def send_mpo_data(self, event_data, sim_data): # pragma: no cover - def md5(fname): - "Courtesy of stackoverflow 3431825" - hash_md5 = hashlib.md5() - with open(fname, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b''): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): - """Add a local file to the workflow attaching to parent. Calculate - checksum and if the file is already in the mpo database, use the - already the UID of the already existing file when adding the data - object - this creates a linkage to the original. The checksum and - local file path and name are added as metadata. - - This function relies on the user space metadata, ips_checksum - and ips_filename. The checksum is the md5 sum and the filename - is expected should have at least a relative qualifying path. - - workflow : workflow_id - parent : parent_id - """ - # if file exist, look for its checksum in the database - try: - checksum = md5(file) - except Exception: - print(('checksum could not find file:', file)) - raise - - is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) - # search always returns a list of dictionaries - # if checksum exists, use first dataobject that has it - # api search results are sorted by time - # Note, check this with eqdsk dataobject in test-api - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - print(len(is_checksum), file) - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - - if len(is_checksum) > 0: - # uid is chosen to be first occurrence - # parent_uid is uid of object metadata is attached to. - file_uid = is_checksum[0]['parent_uid'] - - # Create dataobject reference by uid in the workflow - dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - # add filename metadata the dataobject reference - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - else: - print(('file', file)) - file_uri = file - # Create new dataobject by uri and insert reference in to workflow - dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) - # add checksum metadata to original data object - # add function currently only returns uri field, so fetch full record - full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] - # add checksum so dataobject and also - self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - dataobject = full_dataobject - return dataobject - - recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] - recordable_mpo_activities = ['IPS_CALL_BEGIN'] - comment = event_data['comment'] - event_type = event_data['eventtype'] - - if event_type not in recordable_events: - return - inp_objs = [] - if event_type == 'IPS_CALL_END': - del sim_data.mpo_steps[-1] - return - try: - if event_type == 'IPS_STAGE_INPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' - ) - inp_objs.append(mpo_data_obj['uid']) - - if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: - return - - count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] - if event_type == 'IPS_CALL_BEGIN': - target = event_data['comment'].split()[-1] - step_name = '%s %d' % (target, count) - else: - step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) - - if event_type == 'IPS_STAGE_OUTPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - if not files: - return - activity = self.mpo.step( - workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] - ) - self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 - if event_type == 'IPS_STAGE_OUTPUTS': - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - """ - (f_uid, f_hash) = get_file_uid(path, file_name) - if f_uid: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name), - uid=f_uid, - source = f_uid) - else: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name)) - """ - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, - activity['uid'], - # sim_data.mpo_wid['uid'], - os.path.join(path, file_name), - shortname=file_name, - longdesc='An output file', - ) - - except Exception as e: - print('*************', e) - else: - if event_type in recordable_mpo_activities: - sim_data.mpo_steps.append(activity['uid']) - - def init_simulation(self, sim_name, sim_root): + def init_simulation(self, sim_name: str, sim_root: str): """ Create and send information about simulation *sim_name* living in *sim_root* so the portal can set up corresponding structures to manage data from the sim. """ - self.services.debug('Initializing simulation : %s -- %s ', sim_name, sim_root) - sim_data = self.SimulationData() - sim_data.sim_name = sim_name - sim_data.sim_root = sim_root + self.services.debug('Initializing simulation using PortalBridge: %s -- %s ', sim_name, sim_root) if hasattr(self, '_IPS_PORTAL_API_KEY'): self.services.set_config_param('_IPS_PORTAL_API_KEY', self._IPS_PORTAL_API_KEY, target_sim_name=sim_name) - d = datetime.datetime.now() - date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) - sim_data.portal_runid = f'{sim_name}_{self.HOST}_{self.USER}_{date_str}' - try: - self.services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) - except Exception: - self.services.error('Simulation %s is not accessible', sim_name) - return - - if self.first_portal_runid: - sim_data.parent_portal_runid = self.first_portal_runid - else: - self.first_portal_runid = sim_data.portal_runid - - if sim_data.sim_root.strip() == '.': - sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] - sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') - try: - os.makedirs(sim_log_dir, exist_ok=True) - except OSError as oserr: - self.services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) - raise - - sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') - try: - sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) - except IOError as oserr: - self.services.error('Error opening file %s: error(%s): %s' % (sim_data.monitor_file_name, oserr.errno, oserr.strerror)) - self.services.error('Using /dev/null instead') - sim_data.monitor_file = open('/dev/null', 'w') - json_fname = sim_data.monitor_file_name.replace('eventlog', 'json') - sim_data.json_monitor_file = open(json_fname, 'w') - - if self.mpo: # pragma: no cover - try: - sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - except Exception as e: - print(e) - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - sim_data.mpo_wid = None - else: - sim_data.mpo_steps = [sim_data.mpo_wid['uid']] - + sim_data = self.local_event_logger.init_simulation(self.services, sim_name, sim_root, self.HOST, self.USER) self.sim_map[sim_data.sim_name] = sim_data def terminate(self, status: Literal[0, 1]): diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index e30eae8b..826ee649 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -358,42 +358,53 @@ def _initialize_fwk_components(self): # SIMYAN: set up The Portal bridge, allowing for an absence of a portal use_portal = True - if 'USE_PORTAL' in self.sim_map[self.fwk_sim_name].sim_conf: + # If users explicitly disable the Portal via 'USE_PORTAL=false', or do not include a PORTAL_URL, assume the no-portal workflow. + # Otherwise, always initialize the Portal workflow + if 'PORTAL_URL' not in self.sim_map[self.fwk_sim_name].sim_conf: + use_portal = False + elif 'USE_PORTAL' in self.sim_map[self.fwk_sim_name].sim_conf: use_portal = self.sim_map[self.fwk_sim_name].sim_conf['USE_PORTAL'] if use_portal.lower() == 'false': use_portal = False - if use_portal: - portal_conf = {} - portal_conf['CLASS'] = 'FWK' - portal_conf['SUB_CLASS'] = 'COMP' - portal_conf['NAME'] = 'PortalBridge' - if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: - portal_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] - portal_conf['SCRIPT'] = os.path.join(portal_conf['BIN_PATH'], 'portalBridge.py') - else: - portal_conf['SCRIPT'] = '' - portal_conf['MODULE'] = 'ipsframework.portalBridge' - portal_conf['INPUT_DIR'] = '/dev/null' - portal_conf['INPUT_FILES'] = '' - portal_conf['DATA_FILES'] = '' - portal_conf['OUTPUT_FILES'] = '' - portal_conf['NPROC'] = 1 - portal_conf['LOG_LEVEL'] = 'INFO' - try: - portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] - except KeyError: - portal_conf['USER'] = self.platform_conf['USER'] - portal_conf['HOST'] = self.platform_conf['HOST'] - if self.fwk.log_level == logging.DEBUG: - portal_conf['LOG_LEVEL'] = 'DEBUG' - portal_conf['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + if use_portal: + bridge_module = 'portal_bridge' + bridge_name = 'PortalBridge' + else: + bridge_module = 'basic_bridge' + bridge_name = 'BasicBridge' + + bridge_conf = {} + bridge_conf['CLASS'] = 'FWK' + bridge_conf['SUB_CLASS'] = 'COMP' + bridge_conf['NAME'] = bridge_name + if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: + bridge_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] + bridge_conf['SCRIPT'] = os.path.join(bridge_conf['BIN_PATH'], 'bridges', f'{bridge_module}.py') + else: + bridge_conf['SCRIPT'] = '' + bridge_conf['MODULE'] = f'ipsframework.bridges.{bridge_module}' + bridge_conf['INPUT_DIR'] = '/dev/null' + bridge_conf['INPUT_FILES'] = '' + bridge_conf['DATA_FILES'] = '' + bridge_conf['OUTPUT_FILES'] = '' + bridge_conf['NPROC'] = 1 + bridge_conf['LOG_LEVEL'] = 'INFO' + try: + bridge_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] + except KeyError: + bridge_conf['USER'] = self.platform_conf['USER'] + bridge_conf['HOST'] = self.platform_conf['HOST'] + if self.fwk.log_level == logging.DEBUG: + bridge_conf['LOG_LEVEL'] = 'DEBUG' - if portal_conf['PORTAL_URL']: - portal_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) + if use_portal: + bridge_conf['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + if bridge_conf['PORTAL_URL']: + bridge_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) - component_id = self._create_component(portal_conf, self.sim_map[self.fwk_sim_name]) - self.fwk_components.append(component_id) + component_id = self._create_component(bridge_conf, self.sim_map[self.fwk_sim_name]) + self.fwk_components.append(component_id) def _initialize_sim(self, sim_data): """ diff --git a/ipsframework/services.py b/ipsframework/services.py index c7005128..115a9bad 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -64,7 +64,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a * `timeout` - The timeout in seconds for the task to complete. * `cpus_per_proc` - The number of cpus per process to use for the task. This implies that the DVMPlugin has set up a DVM daemon for this node. * `oversubscribe` - If `True`, then the number of processes can exceed the number of cores on the node. Default is `False`. - + If the worker has the attribute `dvm_uri_file`, then we are running with a DVM (Distributed Virtual Machine) so the `binary` needs a `prun` prepended pointing to that. @@ -139,18 +139,18 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a worker.logger.info(f'Using DVM URI file: {dvm_uri_file}') print(f'Using DVM URI file: {dvm_uri_file}', flush=True) - if task_env is not None and task_env is not {}: - if not 'PMIX_SERVER_URI41' in task_env: + if task_env is not None and task_env != {}: + if 'PMIX_SERVER_URI41' not in task_env: worker.logger.error('DVM environment variable PMIX_SERVER_URI41 not set in task_env') print('DVM environment variable PMIX_SERVER_URI41 not set in task_env', flush=True) else: - worker.logger.info(f"DVM environment variable PMIX_SERVER_URI41 set in task_env to {{task_env['PMIX_SERVER_URI41']}}") + worker.logger.info("DVM environment variable PMIX_SERVER_URI41 set in task_env to {task_env['PMIX_SERVER_URI41']}") print(f'DVM environment variable PMIX_SERVER_URI41 set in task_env to {task_env["PMIX_SERVER_URI41"]}', flush=True) - if not 'PMIX_SERVER_URI41' in os.environ: + if 'PMIX_SERVER_URI41' not in os.environ: worker.logger.error('DVM environment variable PMIX_SERVER_URI41 not set in os.environ') print('DVM environment variable PMIX_SERVER_URI41 not set in os.environ', flush=True) else: - worker.logger.info(f"DVM environment variable PMIX_SERVER_URI41 set in os.environ to {{os.environ['PMIX_SERVER_URI41']}}") + worker.logger.info("DVM environment variable PMIX_SERVER_URI41 set in os.environ to {os.environ['PMIX_SERVER_URI41']}") print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ to {os.environ["PMIX_SERVER_URI41"]}', flush=True) timeout = float(keywords.get('timeout', 1.0e9)) @@ -1229,7 +1229,8 @@ def get_config_param(self, param: str, silent: bool = False, log: bool = True) - val = self._get_service_response(msg_id, block=True) except Exception: if not silent: - self.exception('Error retrieving value of config parameter %s', param) + if log: + self.exception('Error retrieving value of config parameter %s', param) raise return None return val @@ -2525,7 +2526,6 @@ def send_ensemble_instance_to_portal(ensemble_name: str, data_path: Path) -> Non self.info(f'Preparing to run ensembles in {run_dir}') - # Ensure that we create a unique task pool name for this using the # instance prefix `name` # check this first to ensure uniqueness of `name` parameter @@ -2651,7 +2651,7 @@ def setup(self, worker: Worker): # invoking client.forward_logging() elsewhere, so I shouldn't have to # do this. self.logger.setLevel(logging.DEBUG) - self.logger.info(f'Launching DVM') + self.logger.info('Launching DVM') # TODO could make this more OS agnostic by using tempfile.mkstemp self.worker.dvm_uri_file = f'/tmp/dvm.uri.{os.getpid()}' command = ['prte', '--report-uri', self.worker.dvm_uri_file] @@ -2659,11 +2659,11 @@ def setup(self, worker: Worker): mapping_policy = 'core' # by default bind to cores if self.hwthreads: # ... unless you want to bind to hardware threads - self.logger.info(f'Binding to hardware threads') + self.logger.info('Binding to hardware threads') mapping_policy = 'hwtcpus' if self.oversubscribe: - self.logger.info(f'Allowing oversubscription of nodes') + self.logger.info('Allowing oversubscription of nodes') mapping_policy += ':oversubscribe' # This environment variable is specific to OpenMPI's PRTE @@ -2682,7 +2682,6 @@ def setup(self, worker: Worker): os.environ['PMIX_SERVER_URI41'] = self.worker.dvm_uri - return def teardown(self, worker: Worker): self.logger.info(f'Shutting down DVM at {self.worker.dvm_uri}') @@ -2942,8 +2941,7 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi self.services.debug(f'Dask scheduler pid: {self.dask_sched_pid}') if not Path(self.dask_scheduler_file).exists(): - self.services.critical(f'Dask scheduler file ' - f'{self.dask_scheduler_file} does not exist') + self.services.critical(f'Dask scheduler file {self.dask_scheduler_file} does not exist') dask_nodes = 1 if dask_nodes is None else dask_nodes if services.get_config_param('MPIRUN') == 'eval': From 650cd7d2bbcedd6e4cb912a97d753e6f05cf2f13 Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Fri, 14 Nov 2025 11:43:55 -0500 Subject: [PATCH 2/4] remove mpo logic Signed-off-by: Lance-Drane --- ipsframework/bridges/local_event_logger.py | 160 --------------------- 1 file changed, 160 deletions(-) diff --git a/ipsframework/bridges/local_event_logger.py b/ipsframework/bridges/local_event_logger.py index ff35de45..97b57242 100644 --- a/ipsframework/bridges/local_event_logger.py +++ b/ipsframework/bridges/local_event_logger.py @@ -50,8 +50,6 @@ def __init__(self): self.json_monitor_file: FileIO = None # type: ignore self.phys_time_stamp = -1 self.monitor_url = '' - self.mpo_steps = [None] - self.mpo_wid = None self.bigbuf = '' @@ -61,8 +59,6 @@ class LocalEventLogger: def __init__(self) -> None: # self.curTime = time.localtime() # self.startTime = self.curTime - self.mpo = None - self.mpo_name_counter: defaultdict[str, int] = defaultdict(lambda: 0) self.counter = 0 self.dump_freq = 10 self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation @@ -151,17 +147,6 @@ def init_simulation(self, services: ServicesProxy, sim_name: str, sim_root: str, json_fname = sim_data.monitor_file_name.replace('eventlog', 'jsonl') sim_data.json_monitor_file = open(json_fname, 'w') - if self.mpo: # pragma: no cover - try: - sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - except Exception as e: - print(e) - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - sim_data.mpo_wid = None - else: - sim_data.mpo_steps = [sim_data.mpo_wid['uid']] - return sim_data def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_data: dict[str, Any]): @@ -197,148 +182,3 @@ def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_da except Exception: services.exception('Error writing html file into USER_W3_DIR directory') self.write_to_htmldir = False - - if sim_data.mpo_wid: - self.send_mpo_data(event_data, sim_data) - - def send_mpo_data(self, event_data, sim_data: SimulationData): # pragma: no cover - def md5(fname): - "Courtesy of stackoverflow 3431825" - hash_md5 = hashlib.md5() - with open(fname, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b''): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): - """Add a local file to the workflow attaching to parent. Calculate - checksum and if the file is already in the mpo database, use the - already the UID of the already existing file when adding the data - object - this creates a linkage to the original. The checksum and - local file path and name are added as metadata. - - This function relies on the user space metadata, ips_checksum - and ips_filename. The checksum is the md5 sum and the filename - is expected should have at least a relative qualifying path. - - workflow : workflow_id - parent : parent_id - """ - # if file exist, look for its checksum in the database - try: - checksum = md5(file) - except Exception: - print(('checksum could not find file:', file)) - raise - - is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) - # search always returns a list of dictionaries - # if checksum exists, use first dataobject that has it - # api search results are sorted by time - # Note, check this with eqdsk dataobject in test-api - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - print(len(is_checksum), file) - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - - if len(is_checksum) > 0: - # uid is chosen to be first occurrence - # parent_uid is uid of object metadata is attached to. - file_uid = is_checksum[0]['parent_uid'] - - # Create dataobject reference by uid in the workflow - dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - # add filename metadata the dataobject reference - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - else: - print(('file', file)) - file_uri = file - # Create new dataobject by uri and insert reference in to workflow - dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) - # add checksum metadata to original data object - # add function currently only returns uri field, so fetch full record - full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] - # add checksum so dataobject and also - self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - dataobject = full_dataobject - return dataobject - - recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] - recordable_mpo_activities = ['IPS_CALL_BEGIN'] - comment = event_data['comment'] - event_type = event_data['eventtype'] - - if event_type not in recordable_events: - return - inp_objs = [] - if event_type == 'IPS_CALL_END': - del sim_data.mpo_steps[-1] - return - try: - if event_type == 'IPS_STAGE_INPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' - ) - inp_objs.append(mpo_data_obj['uid']) - - if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: - return - - count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] - if event_type == 'IPS_CALL_BEGIN': - target = event_data['comment'].split()[-1] - step_name = '%s %d' % (target, count) - else: - step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) - - if event_type == 'IPS_STAGE_OUTPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - if not files: - return - activity = self.mpo.step( - workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] - ) - self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 - if event_type == 'IPS_STAGE_OUTPUTS': - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - """ - (f_uid, f_hash) = get_file_uid(path, file_name) - if f_uid: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name), - uid=f_uid, - source = f_uid) - else: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name)) - """ - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, - activity['uid'], - # sim_data.mpo_wid['uid'], - os.path.join(path, file_name), - shortname=file_name, - longdesc='An output file', - ) - - except Exception as e: - print('*************', e) - else: - if event_type in recordable_mpo_activities: - sim_data.mpo_steps.append(activity['uid']) From 8dcde272a57f21654496a59bbe68d138b07a35cf Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Fri, 14 Nov 2025 12:02:07 -0500 Subject: [PATCH 3/4] a few linting fixes Signed-off-by: Lance-Drane --- ipsframework/services.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 8b988081..4fb87b0c 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -147,7 +147,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a # It can be defined in `task_env` or in `os.environ`, so we look in # both locations to just echo its presence. The flushes are necessary # in some HPC environments to ensure the output appears in the logs. - if task_env is not None and task_env is not {}: + if task_env is not None and task_env != {}: if 'PMIX_SERVER_URI41' in task_env: worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 " f"set in task_env to " @@ -2664,12 +2664,12 @@ def setup(self, worker: Worker): 'for Dask worker') # Necessary to ensure the DVM "sees" all the resources to manage - os.environ['PRTE_MCA_ras_slurm_use_entire_allocation'] = "1" + os.environ['PRTE_MCA_ras_slurm_use_entire_allocation'] = '1' self.worker = worker worker.logger = self.logger - self.logger.info(f'Launching DVM') + self.logger.info('Launching DVM') self.worker.dvm_uri_file = f'/tmp/dvm.uri.{os.getpid()}' command = [#'srun', '--mpi=pmix_v4', '-N', os.environ['SLURM_NNODES'], '--ntasks-per-node=1', 'prte', #'--no-daemonize', From 385cb16760b0e9f75f89211374e21d01fc726abb Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Thu, 11 Dec 2025 12:59:15 -0500 Subject: [PATCH 4/4] create separate logging component and separate portal component Signed-off-by: Lance-Drane --- ipsframework/bridges/__init__.py | 6 +- ipsframework/bridges/basic_bridge.py | 119 --------- ipsframework/bridges/local_event_logger.py | 184 ------------- ipsframework/bridges/local_logging_bridge.py | 255 +++++++++++++++++++ ipsframework/bridges/portal_bridge.py | 63 +++-- ipsframework/configurationManager.py | 76 +++--- ipsframework/ips.py | 10 +- ipsframework/services.py | 1 - 8 files changed, 355 insertions(+), 359 deletions(-) delete mode 100644 ipsframework/bridges/basic_bridge.py delete mode 100644 ipsframework/bridges/local_event_logger.py create mode 100644 ipsframework/bridges/local_logging_bridge.py diff --git a/ipsframework/bridges/__init__.py b/ipsframework/bridges/__init__.py index 9df9d819..288854b0 100644 --- a/ipsframework/bridges/__init__.py +++ b/ipsframework/bridges/__init__.py @@ -1,6 +1,6 @@ -"""Bridges are components which handle supplementary tasks in the IPS Framework. Applications should not need to import these classes directly. +"""Bridges are components which handle supplementary tasks in the IPS Framework. Applications should not need to import these classes directly, these are automatically initialized by the framework. There are two bridges available: -- `BasicBridge`, which provides simple system logging -- `PortalBridge`, which provides all functionality of `BasicBridge` and additionally allows interfacing with a remote IPS Portal +- `LocalLoggingBridge`, which provides simple system logging (always created) +- `PortalBridge`, which allows interfacing with a remote IPS Portal (created if user provides a PORTAL_URL and does not disable the portal) """ diff --git a/ipsframework/bridges/basic_bridge.py b/ipsframework/bridges/basic_bridge.py deleted file mode 100644 index 1b5f5756..00000000 --- a/ipsframework/bridges/basic_bridge.py +++ /dev/null @@ -1,119 +0,0 @@ -# ------------------------------------------------------------------------------- -# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. -# ------------------------------------------------------------------------------- -import hashlib -import time -from typing import Literal - -from ipsframework import Component -from ipsframework.bridges.local_event_logger import LocalEventLogger, SimulationData -from ipsframework.cca_es_spec import Event - - -class BasicBridge(Component): - """ - Framework component meant to handle simple event logging. - - This component should not exist in the event that the simulation is interacting with the web framework - see the PortalBridge component instead. - """ - - def __init__(self, services, config): - """ - Declaration of private variables and initialization of - :py:class:`component.Component` object. - """ - super().__init__(services, config) - self.sim_map: dict[str, SimulationData] = {} - self.done = False - self.local_event_logger = LocalEventLogger() - - def init(self, timestamp=0.0, **keywords): - """ - Subscribe to *_IPS_MONITOR* events and register callback :py:meth:`.process_event`. - """ - self.services.subscribe('_IPS_MONITOR', 'process_event') - self.local_event_logger.init(self.services) - - def step(self, timestamp=0.0, **keywords): - """ - Poll for events. - """ - while not self.done: - self.services.process_events() - time.sleep(0.5) - - def finalize(self, timestamp=0.0, **keywords): - self.local_event_logger.finalize(self.sim_map) - - def process_event(self, topicName: str, theEvent: Event): - """ - Process a single event *theEvent* on topic *topicName*. - """ - event_body = theEvent.getBody() - sim_name = event_body['sim_name'] - portal_data = event_body['portal_data'] - try: - portal_data['sim_name'] = event_body['real_sim_name'] - except KeyError: - portal_data['sim_name'] = sim_name - - if portal_data['eventtype'] == 'IPS_START': - sim_root = event_body['sim_root'] - self.init_simulation(sim_name, sim_root) - - sim_data = self.sim_map[sim_name] - if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': - sim_data.phys_time_stamp = portal_data['phystimestamp'] - return - else: - portal_data['phystimestamp'] = sim_data.phys_time_stamp - - if portal_data['eventtype'] == 'PORTAL_REGISTER_NOTEBOOK': - return - - if portal_data['eventtype'] == 'PORTAL_ADD_JUPYTER_DATA': - return - - if portal_data['eventtype'] == 'PORTAL_UPLOAD_ENSEMBLE_PARAMS': - return - - portal_data['portal_runid'] = sim_data.portal_runid - - if portal_data['eventtype'] == 'IPS_SET_MONITOR_URL': - sim_data.monitor_url = portal_data['vizurl'] - elif sim_data.monitor_url: - portal_data['vizurl'] = sim_data.monitor_url - - if portal_data['eventtype'] == 'IPS_START' and 'parent_portal_runid' not in portal_data: - portal_data['parent_portal_runid'] = sim_data.parent_portal_runid - portal_data['seqnum'] = sim_data.counter - - if 'trace' in portal_data: - portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() - - self.local_event_logger.send_event(self.services, sim_data, portal_data) - - if portal_data['eventtype'] == 'IPS_END': - del self.sim_map[sim_name] - - if len(self.sim_map) == 0: - self.done = True - self.services.debug('No more simulation to monitor - exiting') - time.sleep(1) - - def init_simulation(self, sim_name: str, sim_root: str): - """ - Create and send information about simulation *sim_name* living in - *sim_root* so the portal can set up corresponding structures to manage - data from the sim. - """ - self.services.debug('Initializing simulation using BasicBridge: %s -- %s ', sim_name, sim_root) - sim_data = self.local_event_logger.init_simulation(self.services, sim_name, sim_root, self.HOST, self.USER) - self.sim_map[sim_data.sim_name] = sim_data - - def terminate(self, status: Literal[0, 1]): - """ - Clean up services and call :py:obj:`sys_exit`. - """ - - Component.terminate(self, status) diff --git a/ipsframework/bridges/local_event_logger.py b/ipsframework/bridges/local_event_logger.py deleted file mode 100644 index 97b57242..00000000 --- a/ipsframework/bridges/local_event_logger.py +++ /dev/null @@ -1,184 +0,0 @@ -import datetime -import glob -import hashlib -import itertools -import json -import os -import re -import time -from collections import defaultdict -from typing import TYPE_CHECKING, Any - -from ipsframework import ipsutil -from ipsframework.convert_log_function import convert_logdata_to_html -from ipsframework.services import ServicesProxy - -if TYPE_CHECKING: - from io import FileIO - - -def hash_file(file_name): # pragma: no cover - """ - Return the MD5 hash of a file - :rtype: str - :param file_name: Full path to file - :return: MD5 of file_name - """ - BLOCKSIZE = 65536 - hasher = hashlib.md5() - with open(file_name, 'rb') as afile: - buf = afile.read(BLOCKSIZE) - while len(buf) > 0: - hasher.update(buf) - buf = afile.read(BLOCKSIZE) - return hasher.hexdigest() - - -class SimulationData: - """ - Container for simulation data. - """ - - def __init__(self): - self.counter = 0 - self.monitor_file_name = '' - self.portal_runid = '' - self.parent_portal_runid = '' - self.sim_name = '' - self.sim_root = '' - self.monitor_file: FileIO = None # type: ignore - self.json_monitor_file: FileIO = None # type: ignore - self.phys_time_stamp = -1 - self.monitor_url = '' - self.bigbuf = '' - - -class LocalEventLogger: - """The LocalEventLogger class manages event logging for the supplemental bridge components.""" - - def __init__(self) -> None: - # self.curTime = time.localtime() - # self.startTime = self.curTime - self.counter = 0 - self.dump_freq = 10 - self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation - self.last_dump_time = time.time() - self.write_to_htmldir = True - self.html_dir = '' - self.first_portal_runid = None - - def init(self, services: ServicesProxy) -> None: - """Called from the bridge component's `init` function.""" - try: - freq = int(services.get_config_param('HTML_DUMP_FREQ', silent=True)) - except Exception: - pass - else: - self.dump_freq = freq - - try: - self.html_dir = services.get_config_param('USER_W3_DIR', silent=True) or '' - except Exception: - services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - if self.html_dir.strip() == '': - services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - try: - os.mkdir(self.html_dir) - except FileExistsError: - pass - except Exception: - services.warning('Unable to create HTML directory - disabling web-visible logging') - self.write_to_htmldir = False - - def finalize(self, sim_map: dict[str, SimulationData]) -> None: - for sim_data in sim_map.values(): - try: - sim_data.monitor_file.close() - sim_data.json_monitor_file.close() - except Exception: - pass - - def init_simulation(self, services: ServicesProxy, sim_name: str, sim_root: str, hostname: str, username: str) -> SimulationData: - """ - Create and send information about simulation *sim_name* living in - *sim_root* so the portal can set up corresponding structures to manage - data from the sim. - - :returns: - """ - sim_data = SimulationData() - sim_data.sim_name = sim_name - sim_data.sim_root = sim_root - - d = datetime.datetime.now() - date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) - sim_data.portal_runid = f'{sim_name}_{hostname}_{username}_{date_str}' - try: - services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) - except Exception: - services.error('Simulation %s is not accessible', sim_name) - return - - if self.first_portal_runid: - sim_data.parent_portal_runid = self.first_portal_runid - else: - self.first_portal_runid = sim_data.portal_runid - - if sim_data.sim_root.strip() == '.': - sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] - sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') - try: - os.makedirs(sim_log_dir, exist_ok=True) - except OSError as oserr: - services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) - raise - - sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') - try: - sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) - except IOError as oserr: - services.error('Error opening file %s: error(%s): %s' % (sim_data.monitor_file_name, oserr.errno, oserr.strerror)) - services.error('Using /dev/null instead') - sim_data.monitor_file = open('/dev/null', 'w') - json_fname = sim_data.monitor_file_name.replace('eventlog', 'jsonl') - sim_data.json_monitor_file = open(json_fname, 'w') - - return sim_data - - def send_event(self, services: ServicesProxy, sim_data: SimulationData, event_data: dict[str, Any]): - """ - Send contents of *event_data* and *sim_data* to portal. - """ - timestamp = ipsutil.getTimeString() - buf = '%8d %s ' % (sim_data.counter, timestamp) - for k, v in event_data.items(): - if len(str(v).strip()) == 0: - continue - if ' ' in str(v): - buf += "%s='%s' " % (k, str(v)) - else: - buf += '%s=%s ' % (k, str(v)) - buf += '\n' - sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) - sim_data.bigbuf += buf - - buf = json.dumps(event_data) - sim_data.json_monitor_file.write('%s\n' % buf) - - freq = self.dump_freq - if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): - self.last_dump_time = time.time() - html_filename = sim_data.monitor_file_name.replace('eventlog', 'html') - html_page = convert_logdata_to_html(sim_data.bigbuf) - open(html_filename, 'w').writelines(html_page) - if self.write_to_htmldir: - html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) - try: - open(html_file, 'w').writelines(html_page) - except Exception: - services.exception('Error writing html file into USER_W3_DIR directory') - self.write_to_htmldir = False diff --git a/ipsframework/bridges/local_logging_bridge.py b/ipsframework/bridges/local_logging_bridge.py new file mode 100644 index 00000000..09f8f226 --- /dev/null +++ b/ipsframework/bridges/local_logging_bridge.py @@ -0,0 +1,255 @@ +# ------------------------------------------------------------------------------- +# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. +# ------------------------------------------------------------------------------- +import hashlib +import json +import os +import time +from typing import TYPE_CHECKING, Any, Literal, Union + +from ipsframework import Component, ipsutil +from ipsframework.cca_es_spec import Event +from ipsframework.convert_log_function import convert_logdata_to_html + +if TYPE_CHECKING: + from io import FileIO + + +class SimulationData: + """ + Container for simulation data. + """ + + def __init__(self): + self.counter = 0 + self.monitor_file_prefix = '' + """The name of the file, minus the extension ('.html', '.jsonl', etc.). + + If this is empty, you must either check to see if you can create the file, or you should assume that you can't create the file. + """ + self.portal_runid: Union[str, None] = None + """Portal RunID, set by component which publishes the IPS_START event. Only used for logging here.""" + self.parent_portal_runid: Union[str, None] = None + """Parent portal RunID, derived from locally determined portal RunID. Should explicitly be None (not empty string) if not set. Only used for logging.""" + self.sim_name = '' + self.sim_root = '' + self.monitor_file: FileIO = None # type: ignore + self.json_monitor_file: FileIO = None # type: ignore + self.bigbuf = '' + self.phys_time_stamp = -1 + self.monitor_url = '' + + +class LocalLoggingBridge(Component): + """ + Framework component meant to handle simple event logging. + + This component should not exist in the event that the simulation is interacting with the web framework - see the PortalBridge component instead. + """ + + def __init__(self, services, config): + """ + Declaration of private variables and initialization of + :py:class:`component.Component` object. + """ + super().__init__(services, config) + self.sim_map: dict[str, SimulationData] = {} + self.done = False + self.counter = 0 + self.dump_freq = 10 + self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation + self.last_dump_time = time.time() + self.write_to_htmldir = True + self.html_dir = '' + self.first_portal_runid = None + + def init(self, timestamp=0.0, **keywords): + """ + Subscribe to *_IPS_MONITOR* events and register callback :py:meth:`.process_event`. + """ + self.services.subscribe('_IPS_MONITOR', 'process_event') + + try: + freq = int(self.services.get_config_param('HTML_DUMP_FREQ', silent=True)) + except Exception: + pass + else: + self.dump_freq = freq + + try: + self.html_dir = self.services.get_config_param('USER_W3_DIR', silent=True) or '' + except Exception: + self.services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + if self.html_dir.strip() == '': + self.services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + try: + os.mkdir(self.html_dir) + except FileExistsError: + pass + except Exception: + self.services.warning('Unable to create HTML directory - disabling web-visible logging') + self.write_to_htmldir = False + + def step(self, timestamp=0.0, **keywords): + """ + Poll for events. + """ + while not self.done: + self.services.process_events() + time.sleep(0.5) + + def finalize(self, timestamp=0.0, **keywords): + for sim_data in self.sim_map.values(): + try: + sim_data.monitor_file.close() + sim_data.json_monitor_file.close() + except Exception: + pass + + def process_event(self, topicName: str, theEvent: Event): + """ + Process a single event *theEvent* on topic *topicName*. + """ + event_body = theEvent.getBody() + sim_name = event_body['sim_name'] + portal_data = event_body['portal_data'] + try: + portal_data['sim_name'] = event_body['real_sim_name'] + except KeyError: + portal_data['sim_name'] = sim_name + + if portal_data['eventtype'] == 'IPS_START': + sim_root = event_body['sim_root'] + self.init_simulation(sim_name, sim_root, portal_data['portal_runid']) + + sim_data = self.sim_map[sim_name] + if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': + sim_data.phys_time_stamp = portal_data['phystimestamp'] + return + else: + portal_data['phystimestamp'] = sim_data.phys_time_stamp + + if portal_data['eventtype'] == 'PORTAL_REGISTER_NOTEBOOK': + return + + if portal_data['eventtype'] == 'PORTAL_ADD_JUPYTER_DATA': + return + + if portal_data['eventtype'] == 'PORTAL_UPLOAD_ENSEMBLE_PARAMS': + return + + portal_data['portal_runid'] = sim_data.portal_runid + + if portal_data['eventtype'] == 'IPS_SET_MONITOR_URL': + sim_data.monitor_url = portal_data['vizurl'] + elif sim_data.monitor_url: + portal_data['vizurl'] = sim_data.monitor_url + + if portal_data['eventtype'] == 'IPS_START' and 'parent_portal_runid' not in portal_data: + portal_data['parent_portal_runid'] = sim_data.parent_portal_runid + portal_data['seqnum'] = sim_data.counter + + if 'trace' in portal_data: + portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + + self.send_event(sim_data, portal_data) + + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) + + def init_simulation(self, sim_name: str, sim_root: str, portal_runid: str): + """ + Create and send information about simulation *sim_name* living in + *sim_root* so the portal can set up corresponding structures to manage + data from the sim. + """ + self.services.debug('Initializing simulation using BasicBridge: %s -- %s ', sim_name, sim_root) + + sim_data = SimulationData() + sim_data.sim_name = sim_name + sim_data.sim_root = sim_root + + sim_data.portal_runid = portal_runid + + if self.first_portal_runid: + sim_data.parent_portal_runid = self.first_portal_runid + else: + self.first_portal_runid = sim_data.portal_runid + + if sim_data.sim_root.strip() == '.': + sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] + sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') + try: + os.makedirs(sim_log_dir, exist_ok=True) + except OSError as oserr: + self.services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) + raise + + sim_data.monitor_file_prefix = os.path.join(sim_log_dir, sim_data.portal_runid) + eventlog_fname = f'{sim_data.monitor_file_prefix}.eventlog' + try: + sim_data.monitor_file = open(eventlog_fname, 'wb', 0) + except IOError as oserr: + self.services.error('Error opening file %s: error(%s): %s' % (eventlog_fname, oserr.errno, oserr.strerror)) + self.services.error('Using /dev/null instead') + sim_data.monitor_file_prefix = '' + sim_data.monitor_file = open('/dev/null', 'w') + + if sim_data.monitor_file_prefix: + json_fname = os.path.join(sim_log_dir, sim_data.portal_runid + '.jsonl') + sim_data.json_monitor_file = open(json_fname, 'w') + else: + sim_data.json_monitor_file = open('/dev/null', 'w') + + self.sim_map[sim_data.sim_name] = sim_data + + def terminate(self, status: Literal[0, 1]): + """ + Clean up services and call :py:obj:`sys_exit`. + """ + + Component.terminate(self, status) + + def send_event(self, sim_data: SimulationData, event_data: dict[str, Any]): + """ + Send contents of *event_data* and *sim_data* to portal. + """ + timestamp = ipsutil.getTimeString() + buf = '%8d %s ' % (sim_data.counter, timestamp) + for k, v in event_data.items(): + if len(str(v).strip()) == 0: + continue + if ' ' in str(v): + buf += "%s='%s' " % (k, str(v)) + else: + buf += '%s=%s ' % (k, str(v)) + buf += '\n' + sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) + sim_data.bigbuf += buf + + buf = json.dumps(event_data) + sim_data.json_monitor_file.write('%s\n' % buf) + + freq = self.dump_freq + if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): + self.last_dump_time = time.time() + if sim_data.monitor_file_prefix: + html_filename = f'{sim_data.monitor_file_prefix}.html' + html_page = convert_logdata_to_html(sim_data.bigbuf) + open(html_filename, 'w').writelines(html_page) + if self.write_to_htmldir: + html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) + try: + open(html_file, 'w').writelines(html_page) + except Exception: + self.services.exception('Error writing html file into USER_W3_DIR directory') + self.write_to_htmldir = False diff --git a/ipsframework/bridges/portal_bridge.py b/ipsframework/bridges/portal_bridge.py index 425442c2..aa61b05f 100644 --- a/ipsframework/bridges/portal_bridge.py +++ b/ipsframework/bridges/portal_bridge.py @@ -9,12 +9,11 @@ from multiprocessing import Event, Pipe, Process from multiprocessing.connection import Connection from multiprocessing.synchronize import Event as EventType -from typing import Any, Callable, Literal +from typing import Any, Callable, Literal, Union import urllib3 from ipsframework import Component -from ipsframework.bridges.local_event_logger import LocalEventLogger, SimulationData def send_post(conn: Connection, stop: EventType, url: str): @@ -179,6 +178,28 @@ def __init__(self, target: Callable, *args): self.childProcess.start() +class PortalSimulationData: + """ + Container for simulation data. + """ + + def __init__(self): + self.counter = 0 + self.monitor_file_prefix = '' + """The name of the file, minus the extension ('.html', '.jsonl', etc.). + + If this is empty, you must either check to see if you can create the file, or you should assume that you can't create the file. + """ + self.portal_runid: Union[str, None] = None + """Locally determined portal runid, also sent to the portal.""" + self.parent_portal_runid: Union[str, None] = None + """Parent portal runid, derived from locally determined portal runid. Should explicitly be None (not empty string) if not set.""" + self.sim_name = '' + self.sim_root = '' + self.phys_time_stamp = -1 + self.monitor_url = '' + + class PortalBridge(Component): """ Framework component to communicate with the IPS web portal. @@ -190,9 +211,9 @@ def __init__(self, services, config): :py:class:`component.Component` object. """ super().__init__(services, config) - self.sim_map: dict[str, SimulationData] = {} + self.sim_map: dict[str, PortalSimulationData] = {} self.done = False - self.local_event_logger = LocalEventLogger() + self.first_portal_runid = None self.portal_url = '' self.first_event = True self.childProcess = None @@ -217,7 +238,6 @@ def init(self, timestamp=0.0, **keywords): pass self.services.subscribe('_IPS_MONITOR', 'process_event') - self.local_event_logger.init(self.services) def step(self, timestamp=0.0, **keywords): """ @@ -228,7 +248,7 @@ def step(self, timestamp=0.0, **keywords): time.sleep(0.5) def finalize(self, timestamp=0.0, **keywords): - self.local_event_logger.finalize(self.sim_map) + pass def process_event(self, topicName, theEvent): """ @@ -244,7 +264,7 @@ def process_event(self, topicName, theEvent): if portal_data['eventtype'] == 'IPS_START': sim_root = event_body['sim_root'] - self.init_simulation(sim_name, sim_root) + self.init_simulation(sim_name, sim_root, portal_data['portal_runid']) sim_data = self.sim_map[sim_name] if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': @@ -291,8 +311,6 @@ def process_event(self, topicName, theEvent): if 'trace' in portal_data: portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() - self.local_event_logger.send_event(self.services, sim_data, portal_data) - if self.portal_url: if self.first_event: # First time, launch sendPost.py daemon self.parent_conn, child_conn = Pipe() @@ -367,7 +385,7 @@ def http_req_and_response(self, manager: UrlRequestProcessManager, event_data): else: self.services.debug('Portal Response: %d %s', code, msg) - def send_jupyter_notebook(self, sim_data: SimulationData, event_data): + def send_jupyter_notebook(self, sim_data: PortalSimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_notebook: self.url_manager_jupyter_notebook = UrlRequestProcessManager( @@ -375,7 +393,7 @@ def send_jupyter_notebook(self, sim_data: SimulationData, event_data): ) self.http_req_and_response(self.url_manager_jupyter_notebook, event_data) - def send_notebook_data(self, sim_data: SimulationData, event_data): + def send_notebook_data(self, sim_data: PortalSimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_data: self.url_manager_jupyter_data = UrlRequestProcessManager( @@ -383,7 +401,7 @@ def send_notebook_data(self, sim_data: SimulationData, event_data): ) self.http_req_and_response(self.url_manager_jupyter_data, event_data) - def send_ensemble_variables(self, sim_data: SimulationData, event_data): + def send_ensemble_variables(self, sim_data: PortalSimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_ensemble_uploads: self.url_manager_ensemble_uploads = UrlRequestProcessManager( @@ -391,7 +409,7 @@ def send_ensemble_variables(self, sim_data: SimulationData, event_data): ) self.http_req_and_response(self.url_manager_ensemble_uploads, event_data) - def init_simulation(self, sim_name: str, sim_root: str): + def init_simulation(self, sim_name: str, sim_root: str, portal_runid: str): """ Create and send information about simulation *sim_name* living in *sim_root* so the portal can set up corresponding structures to manage @@ -401,8 +419,23 @@ def init_simulation(self, sim_name: str, sim_root: str): if hasattr(self, '_IPS_PORTAL_API_KEY'): self.services.set_config_param('_IPS_PORTAL_API_KEY', self._IPS_PORTAL_API_KEY, target_sim_name=sim_name) - sim_data = self.local_event_logger.init_simulation(self.services, sim_name, sim_root, self.HOST, self.USER) - self.sim_map[sim_data.sim_name] = sim_data + sim_data = PortalSimulationData() + sim_data.sim_name = sim_name + sim_data.sim_root = sim_root + + sim_data.portal_runid = portal_runid + try: + self.services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) + except Exception: + self.services.error('Simulation %s is not accessible', sim_name) + return + + if self.first_portal_runid: + sim_data.parent_portal_runid = self.first_portal_runid + else: + self.first_portal_runid = sim_data.portal_runid + + self.sim_map[sim_name] = sim_data def terminate(self, status: Literal[0, 1]): """ diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index 826ee649..d20040e3 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -11,7 +11,7 @@ import time import uuid from multiprocessing import Process, Queue, set_start_method -from typing import Optional, Union +from typing import Any, Optional, Union from configobj import ConfigObj @@ -367,44 +367,48 @@ def _initialize_fwk_components(self): if use_portal.lower() == 'false': use_portal = False - if use_portal: - bridge_module = 'portal_bridge' - bridge_name = 'PortalBridge' - else: - bridge_module = 'basic_bridge' - bridge_name = 'BasicBridge' - - bridge_conf = {} - bridge_conf['CLASS'] = 'FWK' - bridge_conf['SUB_CLASS'] = 'COMP' - bridge_conf['NAME'] = bridge_name - if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: - bridge_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] - bridge_conf['SCRIPT'] = os.path.join(bridge_conf['BIN_PATH'], 'bridges', f'{bridge_module}.py') - else: - bridge_conf['SCRIPT'] = '' - bridge_conf['MODULE'] = f'ipsframework.bridges.{bridge_module}' - bridge_conf['INPUT_DIR'] = '/dev/null' - bridge_conf['INPUT_FILES'] = '' - bridge_conf['DATA_FILES'] = '' - bridge_conf['OUTPUT_FILES'] = '' - bridge_conf['NPROC'] = 1 - bridge_conf['LOG_LEVEL'] = 'INFO' - try: - bridge_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] - except KeyError: - bridge_conf['USER'] = self.platform_conf['USER'] - bridge_conf['HOST'] = self.platform_conf['HOST'] - if self.fwk.log_level == logging.DEBUG: - bridge_conf['LOG_LEVEL'] = 'DEBUG' + fwk_components = [('local_logging_bridge', 'LocalLoggingBridge', lambda _config: None)] + """tuple of the component module, the component name, and a specialized configuration function which takes in a dict and modifies it""" if use_portal: - bridge_conf['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) - if bridge_conf['PORTAL_URL']: - bridge_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) - component_id = self._create_component(bridge_conf, self.sim_map[self.fwk_sim_name]) - self.fwk_components.append(component_id) + def _config_portal(config: dict[str, Any]): + config['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + if config['PORTAL_URL']: + bridge_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) + + fwk_components.append(('portal_bridge', 'PortalBridge', _config_portal)) + + for fwk_comp in fwk_components: + bridge_conf = {} + bridge_conf['CLASS'] = 'FWK' + bridge_conf['SUB_CLASS'] = 'COMP' + bridge_conf['NAME'] = fwk_comp[1] + if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: + bridge_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] + bridge_conf['SCRIPT'] = os.path.join(bridge_conf['BIN_PATH'], 'bridges', f'{fwk_comp[0]}.py') + else: + bridge_conf['SCRIPT'] = '' + bridge_conf['MODULE'] = f'ipsframework.bridges.{fwk_comp[0]}' + bridge_conf['INPUT_DIR'] = '/dev/null' + bridge_conf['INPUT_FILES'] = '' + bridge_conf['DATA_FILES'] = '' + bridge_conf['OUTPUT_FILES'] = '' + bridge_conf['NPROC'] = 1 + bridge_conf['LOG_LEVEL'] = 'INFO' + try: + bridge_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] + except KeyError: + bridge_conf['USER'] = self.platform_conf['USER'] + bridge_conf['HOST'] = self.platform_conf['HOST'] + if self.fwk.log_level == logging.DEBUG: + bridge_conf['LOG_LEVEL'] = 'DEBUG' + + # additional configuration + fwk_comp[2](bridge_conf) + + component_id = self._create_component(bridge_conf, self.sim_map[self.fwk_sim_name]) + self.fwk_components.append(component_id) def _initialize_sim(self, sim_data): """ diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 57e4cda7..6c675f5a 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -53,6 +53,7 @@ """ import argparse +import datetime import hashlib import inspect import logging @@ -587,8 +588,14 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta # portal_data['phystimestamp'] = self.timeStamp get_config = self.config_manager.get_config_parameter if eventType == 'IPS_START': + user = self.config_manager.get_platform_parameter('USER') + host = self.config_manager.get_platform_parameter('HOST') + d = datetime.datetime.now() + date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) + portal_runid = f'{sim_name}_{host}_{user}_{date_str}' + portal_data['state'] = 'Running' - portal_data['host'] = self.config_manager.get_platform_parameter('HOST') + portal_data['host'] = host try: portal_data['outputprefix'] = get_config(sim_name, 'OUTPUT_PREFIX') except KeyError: @@ -631,6 +638,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta portal_data['startat'] = getTimeString(time.localtime(self.config_manager.sim_map[sim_name].start_time)) portal_data['ips_version'] = get_versions()['version'] + portal_data['portal_runid'] = portal_runid try: portal_data['parent_portal_runid'] = get_config(sim_name, 'PARENT_PORTAL_RUNID') except KeyError: diff --git a/ipsframework/services.py b/ipsframework/services.py index 4fb87b0c..a67f1994 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -2054,7 +2054,6 @@ def add_analysis_data_files(self, current_data_file_paths: list[str], timestamp: portal_data['replace'] = replace portal_data['portal_runid'] = portal_runid event_data['portal_data'] = portal_data - # TODO make sure that we do NOT log the raw data in the IPS log file self.publish('_IPS_MONITOR', 'PORTAL_ADD_JUPYTER_DATA', event_data) self._send_monitor_event('IPS_PORTAL_ADD_JUPYTER_DATA', f'SOURCE = {source} TIMESTAMP = {timestamp} REPLACE = {replace}')