diff --git a/ipsframework/bridges/__init__.py b/ipsframework/bridges/__init__.py new file mode 100644 index 00000000..288854b0 --- /dev/null +++ b/ipsframework/bridges/__init__.py @@ -0,0 +1,6 @@ +"""Bridges are components which handle supplementary tasks in the IPS Framework. Applications should not need to import these classes directly, these are automatically initialized by the framework. + +There are two bridges available: +- `LocalLoggingBridge`, which provides simple system logging (always created) +- `PortalBridge`, which allows interfacing with a remote IPS Portal (created if user provides a PORTAL_URL and does not disable the portal) +""" diff --git a/ipsframework/bridges/local_logging_bridge.py b/ipsframework/bridges/local_logging_bridge.py new file mode 100644 index 00000000..09f8f226 --- /dev/null +++ b/ipsframework/bridges/local_logging_bridge.py @@ -0,0 +1,255 @@ +# ------------------------------------------------------------------------------- +# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. +# ------------------------------------------------------------------------------- +import hashlib +import json +import os +import time +from typing import TYPE_CHECKING, Any, Literal, Union + +from ipsframework import Component, ipsutil +from ipsframework.cca_es_spec import Event +from ipsframework.convert_log_function import convert_logdata_to_html + +if TYPE_CHECKING: + from io import FileIO + + +class SimulationData: + """ + Container for simulation data. + """ + + def __init__(self): + self.counter = 0 + self.monitor_file_prefix = '' + """The name of the file, minus the extension ('.html', '.jsonl', etc.). + + If this is empty, you must either check to see if you can create the file, or you should assume that you can't create the file. + """ + self.portal_runid: Union[str, None] = None + """Portal RunID, set by component which publishes the IPS_START event. Only used for logging here.""" + self.parent_portal_runid: Union[str, None] = None + """Parent portal RunID, derived from locally determined portal RunID. Should explicitly be None (not empty string) if not set. Only used for logging.""" + self.sim_name = '' + self.sim_root = '' + self.monitor_file: FileIO = None # type: ignore + self.json_monitor_file: FileIO = None # type: ignore + self.bigbuf = '' + self.phys_time_stamp = -1 + self.monitor_url = '' + + +class LocalLoggingBridge(Component): + """ + Framework component meant to handle simple event logging. + + This component should not exist in the event that the simulation is interacting with the web framework - see the PortalBridge component instead. + """ + + def __init__(self, services, config): + """ + Declaration of private variables and initialization of + :py:class:`component.Component` object. + """ + super().__init__(services, config) + self.sim_map: dict[str, SimulationData] = {} + self.done = False + self.counter = 0 + self.dump_freq = 10 + self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation + self.last_dump_time = time.time() + self.write_to_htmldir = True + self.html_dir = '' + self.first_portal_runid = None + + def init(self, timestamp=0.0, **keywords): + """ + Subscribe to *_IPS_MONITOR* events and register callback :py:meth:`.process_event`. + """ + self.services.subscribe('_IPS_MONITOR', 'process_event') + + try: + freq = int(self.services.get_config_param('HTML_DUMP_FREQ', silent=True)) + except Exception: + pass + else: + self.dump_freq = freq + + try: + self.html_dir = self.services.get_config_param('USER_W3_DIR', silent=True) or '' + except Exception: + self.services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + if self.html_dir.strip() == '': + self.services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') + self.write_to_htmldir = False + else: + try: + os.mkdir(self.html_dir) + except FileExistsError: + pass + except Exception: + self.services.warning('Unable to create HTML directory - disabling web-visible logging') + self.write_to_htmldir = False + + def step(self, timestamp=0.0, **keywords): + """ + Poll for events. + """ + while not self.done: + self.services.process_events() + time.sleep(0.5) + + def finalize(self, timestamp=0.0, **keywords): + for sim_data in self.sim_map.values(): + try: + sim_data.monitor_file.close() + sim_data.json_monitor_file.close() + except Exception: + pass + + def process_event(self, topicName: str, theEvent: Event): + """ + Process a single event *theEvent* on topic *topicName*. + """ + event_body = theEvent.getBody() + sim_name = event_body['sim_name'] + portal_data = event_body['portal_data'] + try: + portal_data['sim_name'] = event_body['real_sim_name'] + except KeyError: + portal_data['sim_name'] = sim_name + + if portal_data['eventtype'] == 'IPS_START': + sim_root = event_body['sim_root'] + self.init_simulation(sim_name, sim_root, portal_data['portal_runid']) + + sim_data = self.sim_map[sim_name] + if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': + sim_data.phys_time_stamp = portal_data['phystimestamp'] + return + else: + portal_data['phystimestamp'] = sim_data.phys_time_stamp + + if portal_data['eventtype'] == 'PORTAL_REGISTER_NOTEBOOK': + return + + if portal_data['eventtype'] == 'PORTAL_ADD_JUPYTER_DATA': + return + + if portal_data['eventtype'] == 'PORTAL_UPLOAD_ENSEMBLE_PARAMS': + return + + portal_data['portal_runid'] = sim_data.portal_runid + + if portal_data['eventtype'] == 'IPS_SET_MONITOR_URL': + sim_data.monitor_url = portal_data['vizurl'] + elif sim_data.monitor_url: + portal_data['vizurl'] = sim_data.monitor_url + + if portal_data['eventtype'] == 'IPS_START' and 'parent_portal_runid' not in portal_data: + portal_data['parent_portal_runid'] = sim_data.parent_portal_runid + portal_data['seqnum'] = sim_data.counter + + if 'trace' in portal_data: + portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() + + self.send_event(sim_data, portal_data) + + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) + + def init_simulation(self, sim_name: str, sim_root: str, portal_runid: str): + """ + Create and send information about simulation *sim_name* living in + *sim_root* so the portal can set up corresponding structures to manage + data from the sim. + """ + self.services.debug('Initializing simulation using BasicBridge: %s -- %s ', sim_name, sim_root) + + sim_data = SimulationData() + sim_data.sim_name = sim_name + sim_data.sim_root = sim_root + + sim_data.portal_runid = portal_runid + + if self.first_portal_runid: + sim_data.parent_portal_runid = self.first_portal_runid + else: + self.first_portal_runid = sim_data.portal_runid + + if sim_data.sim_root.strip() == '.': + sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] + sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') + try: + os.makedirs(sim_log_dir, exist_ok=True) + except OSError as oserr: + self.services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) + raise + + sim_data.monitor_file_prefix = os.path.join(sim_log_dir, sim_data.portal_runid) + eventlog_fname = f'{sim_data.monitor_file_prefix}.eventlog' + try: + sim_data.monitor_file = open(eventlog_fname, 'wb', 0) + except IOError as oserr: + self.services.error('Error opening file %s: error(%s): %s' % (eventlog_fname, oserr.errno, oserr.strerror)) + self.services.error('Using /dev/null instead') + sim_data.monitor_file_prefix = '' + sim_data.monitor_file = open('/dev/null', 'w') + + if sim_data.monitor_file_prefix: + json_fname = os.path.join(sim_log_dir, sim_data.portal_runid + '.jsonl') + sim_data.json_monitor_file = open(json_fname, 'w') + else: + sim_data.json_monitor_file = open('/dev/null', 'w') + + self.sim_map[sim_data.sim_name] = sim_data + + def terminate(self, status: Literal[0, 1]): + """ + Clean up services and call :py:obj:`sys_exit`. + """ + + Component.terminate(self, status) + + def send_event(self, sim_data: SimulationData, event_data: dict[str, Any]): + """ + Send contents of *event_data* and *sim_data* to portal. + """ + timestamp = ipsutil.getTimeString() + buf = '%8d %s ' % (sim_data.counter, timestamp) + for k, v in event_data.items(): + if len(str(v).strip()) == 0: + continue + if ' ' in str(v): + buf += "%s='%s' " % (k, str(v)) + else: + buf += '%s=%s ' % (k, str(v)) + buf += '\n' + sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) + sim_data.bigbuf += buf + + buf = json.dumps(event_data) + sim_data.json_monitor_file.write('%s\n' % buf) + + freq = self.dump_freq + if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): + self.last_dump_time = time.time() + if sim_data.monitor_file_prefix: + html_filename = f'{sim_data.monitor_file_prefix}.html' + html_page = convert_logdata_to_html(sim_data.bigbuf) + open(html_filename, 'w').writelines(html_page) + if self.write_to_htmldir: + html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) + try: + open(html_file, 'w').writelines(html_page) + except Exception: + self.services.exception('Error writing html file into USER_W3_DIR directory') + self.write_to_htmldir = False diff --git a/ipsframework/portalBridge.py b/ipsframework/bridges/portal_bridge.py similarity index 52% rename from ipsframework/portalBridge.py rename to ipsframework/bridges/portal_bridge.py index 528924c5..aa61b05f 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/bridges/portal_bridge.py @@ -1,42 +1,19 @@ # ------------------------------------------------------------------------------- # Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information. # ------------------------------------------------------------------------------- -import datetime -import glob import hashlib -import itertools import json import os -import re import tarfile import time -from collections import defaultdict from multiprocessing import Event, Pipe, Process from multiprocessing.connection import Connection from multiprocessing.synchronize import Event as EventType -from typing import Any, Callable, Literal +from typing import Any, Callable, Literal, Union import urllib3 -from ipsframework import Component, ipsutil -from ipsframework.convert_log_function import convert_logdata_to_html - - -def hash_file(file_name): # pragma: no cover - """ - Return the MD5 hash of a file - :rtype: str - :param file_name: Full path to file - :return: MD5 of file_name - """ - BLOCKSIZE = 65536 - hasher = hashlib.md5() - with open(file_name, 'rb') as afile: - buf = afile.read(BLOCKSIZE) - while len(buf) > 0: - hasher.update(buf) - buf = afile.read(BLOCKSIZE) - return hasher.hexdigest() +from ipsframework import Component def send_post(conn: Connection, stop: EventType, url: str): @@ -201,30 +178,32 @@ def __init__(self, target: Callable, *args): self.childProcess.start() -class PortalBridge(Component): +class PortalSimulationData: """ - Framework component to communicate with the SWIM web portal. + Container for simulation data. """ - class SimulationData: - """ - Container for simulation data. + def __init__(self): + self.counter = 0 + self.monitor_file_prefix = '' + """The name of the file, minus the extension ('.html', '.jsonl', etc.). + + If this is empty, you must either check to see if you can create the file, or you should assume that you can't create the file. """ + self.portal_runid: Union[str, None] = None + """Locally determined portal runid, also sent to the portal.""" + self.parent_portal_runid: Union[str, None] = None + """Parent portal runid, derived from locally determined portal runid. Should explicitly be None (not empty string) if not set.""" + self.sim_name = '' + self.sim_root = '' + self.phys_time_stamp = -1 + self.monitor_url = '' - def __init__(self): - self.counter = 0 - self.monitor_file_name = '' - self.portal_runid = None - self.parent_portal_runid = None - self.sim_name = '' - self.sim_root = '' - self.monitor_file = None - self.json_monitor_file = None - self.phys_time_stamp = -1 - self.monitor_url = None - self.mpo_steps = [None] - self.mpo_wid = None - self.bigbuf = '' + +class PortalBridge(Component): + """ + Framework component to communicate with the IPS web portal. + """ def __init__(self, services, config): """ @@ -232,11 +211,10 @@ def __init__(self, services, config): :py:class:`component.Component` object. """ super().__init__(services, config) - self.curTime = time.localtime() - self.startTime = self.curTime - self.sim_map = {} - self.portal_url = None + self.sim_map: dict[str, PortalSimulationData] = {} self.done = False + self.first_portal_runid = None + self.portal_url = '' self.first_event = True self.childProcess = None self.childProcessStop = None @@ -244,15 +222,6 @@ def __init__(self, services, config): self.url_manager_jupyter_notebook = None self.url_manager_jupyter_data = None self.url_manager_ensemble_uploads = None - self.mpo = None - self.mpo_name_counter = defaultdict(lambda: 0) - self.counter = 0 - self.dump_freq = 10 - self.min_dump_interval = 300 # Minimum time interval in Sec for HTML dump operation - self.last_dump_time = time.time() - self.write_to_htmldir = True - self.html_dir = '' - self.first_portal_runid = None def init(self, timestamp=0.0, **keywords): """ @@ -267,31 +236,8 @@ def init(self, timestamp=0.0, **keywords): self.portal_api_key = self._IPS_PORTAL_API_KEY except AttributeError: pass - self.services.subscribe('_IPS_MONITOR', 'process_event') - try: - freq = int(self.services.get_config_param('HTML_DUMP_FREQ', silent=True)) - except Exception: - pass - else: - self.dump_freq = freq - try: - self.html_dir = self.services.get_config_param('USER_W3_DIR', silent=True) or '' - except Exception: - self.services.warning('Missing USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - if self.html_dir.strip() == '': - self.services.warning('Empty USER_W3_DIR configuration - disabling web-visible logging') - self.write_to_htmldir = False - else: - try: - os.mkdir(self.html_dir) - except FileExistsError: - pass - except Exception: - self.services.warning('Unable to create HTML directory - disabling web-visible logging') - self.write_to_htmldir = False + self.services.subscribe('_IPS_MONITOR', 'process_event') def step(self, timestamp=0.0, **keywords): """ @@ -302,12 +248,7 @@ def step(self, timestamp=0.0, **keywords): time.sleep(0.5) def finalize(self, timestamp=0.0, **keywords): - for sim_data in self.sim_map.values(): - try: - sim_data.monitor_file.close() - sim_data.json_monitor_file.close() - except Exception: - pass + pass def process_event(self, topicName, theEvent): """ @@ -323,7 +264,7 @@ def process_event(self, topicName, theEvent): if portal_data['eventtype'] == 'IPS_START': sim_root = event_body['sim_root'] - self.init_simulation(sim_name, sim_root) + self.init_simulation(sim_name, sim_root, portal_data['portal_runid']) sim_data = self.sim_map[sim_name] if portal_data['eventtype'] == 'PORTALBRIDGE_UPDATE_TIMESTAMP': @@ -370,56 +311,6 @@ def process_event(self, topicName, theEvent): if 'trace' in portal_data: portal_data['trace']['traceId'] = hashlib.md5(sim_data.portal_runid.encode()).hexdigest() - self.send_event(sim_data, portal_data) - sim_data.counter += 1 - self.counter += 1 - - if portal_data['eventtype'] == 'IPS_END': - del self.sim_map[sim_name] - - if len(self.sim_map) == 0: - if self.childProcess: - self.childProcessStop.set() - self.childProcess.join() - self.check_send_post_responses() - self.done = True - self.services.debug('No more simulation to monitor - exiting') - time.sleep(1) - - def send_event(self, sim_data, event_data): - """ - Send contents of *event_data* and *sim_data* to portal. - """ - timestamp = ipsutil.getTimeString() - buf = '%8d %s ' % (sim_data.counter, timestamp) - for k, v in event_data.items(): - if len(str(v).strip()) == 0: - continue - if ' ' in str(v): - buf += "%s='%s' " % (k, str(v)) - else: - buf += '%s=%s ' % (k, str(v)) - buf += '\n' - sim_data.monitor_file.write(bytes(buf, encoding='UTF-8')) - sim_data.bigbuf += buf - - buf = json.dumps(event_data) - sim_data.json_monitor_file.write('%s\n' % buf) - - freq = self.dump_freq - if ((self.counter % freq == 0) and (time.time() - self.last_dump_time > self.min_dump_interval)) or (event_data['eventtype'] == 'IPS_END'): - self.last_dump_time = time.time() - html_filename = sim_data.monitor_file_name.replace('eventlog', 'html') - html_page = convert_logdata_to_html(sim_data.bigbuf) - open(html_filename, 'w').writelines(html_page) - if self.write_to_htmldir: - html_file = os.path.join(self.html_dir, os.path.basename(html_filename)) - try: - open(html_file, 'w').writelines(html_page) - except Exception: - self.services.exception('Error writing html file into USER_W3_DIR directory') - self.write_to_htmldir = False - if self.portal_url: if self.first_event: # First time, launch sendPost.py daemon self.parent_conn, child_conn = Pipe() @@ -429,14 +320,23 @@ def send_event(self, sim_data, event_data): self.first_event = False try: - self.parent_conn.send(event_data) + self.parent_conn.send(portal_data) except OSError: pass self.check_send_post_responses() - if sim_data.mpo_wid: - self.send_mpo_data(event_data, sim_data) + if portal_data['eventtype'] == 'IPS_END': + del self.sim_map[sim_name] + + if len(self.sim_map) == 0: + if self.childProcess: + self.childProcessStop.set() + self.childProcess.join() + self.check_send_post_responses() + self.done = True + self.services.debug('No more simulation to monitor - exiting') + time.sleep(1) def check_send_post_responses(self): while self.parent_conn.poll(): @@ -459,7 +359,7 @@ def check_send_post_responses(self): self.services.error('Portal Error: %d %s', code, msg) elif code == -1: # disable portal, stop trying to send more data - self.portal_url = None + self.portal_url = '' self.services.error('Disabling portal because: %s', msg) else: self.services.debug('Portal Response: %d %s', code, msg) @@ -478,14 +378,14 @@ def http_req_and_response(self, manager: UrlRequestProcessManager, event_data): if code == -1: # disable portal, stop trying to send more data - self.portal_url = None + self.portal_url = '' self.services.error('Disabling portal because: %s', msg) elif code >= 400: self.services.error('Portal Error: %d %s', code, msg) else: self.services.debug('Portal Response: %d %s', code, msg) - def send_jupyter_notebook(self, sim_data, event_data): + def send_jupyter_notebook(self, sim_data: PortalSimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_notebook: self.url_manager_jupyter_notebook = UrlRequestProcessManager( @@ -493,7 +393,7 @@ def send_jupyter_notebook(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_jupyter_notebook, event_data) - def send_notebook_data(self, sim_data, event_data): + def send_notebook_data(self, sim_data: PortalSimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_jupyter_data: self.url_manager_jupyter_data = UrlRequestProcessManager( @@ -501,7 +401,7 @@ def send_notebook_data(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_jupyter_data, event_data) - def send_ensemble_variables(self, sim_data, event_data): + def send_ensemble_variables(self, sim_data: PortalSimulationData, event_data): if self.portal_url and self.portal_api_key: if not self.url_manager_ensemble_uploads: self.url_manager_ensemble_uploads = UrlRequestProcessManager( @@ -509,164 +409,21 @@ def send_ensemble_variables(self, sim_data, event_data): ) self.http_req_and_response(self.url_manager_ensemble_uploads, event_data) - def send_mpo_data(self, event_data, sim_data): # pragma: no cover - def md5(fname): - "Courtesy of stackoverflow 3431825" - hash_md5 = hashlib.md5() - with open(fname, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b''): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def mpo_add_file(workflow, parent, file, shortname='Need a name', longdesc='did not add a description.'): - """Add a local file to the workflow attaching to parent. Calculate - checksum and if the file is already in the mpo database, use the - already the UID of the already existing file when adding the data - object - this creates a linkage to the original. The checksum and - local file path and name are added as metadata. - - This function relies on the user space metadata, ips_checksum - and ips_filename. The checksum is the md5 sum and the filename - is expected should have at least a relative qualifying path. - - workflow : workflow_id - parent : parent_id - """ - # if file exist, look for its checksum in the database - try: - checksum = md5(file) - except Exception: - print(('checksum could not find file:', file)) - raise - - is_checksum = self.mpo.search('metadata', params={'key': 'ips_checksum', 'value': checksum}) - # search always returns a list of dictionaries - # if checksum exists, use first dataobject that has it - # api search results are sorted by time - # Note, check this with eqdsk dataobject in test-api - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - print(len(is_checksum), file) - print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%') - - if len(is_checksum) > 0: - # uid is chosen to be first occurrence - # parent_uid is uid of object metadata is attached to. - file_uid = is_checksum[0]['parent_uid'] - - # Create dataobject reference by uid in the workflow - dataobject = self.mpo.add(workflow, parent, uid=file_uid, name=shortname, desc=longdesc) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - # add filename metadata the dataobject reference - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - else: - print(('file', file)) - file_uri = file - # Create new dataobject by uri and insert reference in to workflow - dataobject = self.mpo.add(workflow, parent, uri=file_uri, name=shortname, desc=longdesc) - # add checksum metadata to original data object - # add function currently only returns uri field, so fetch full record - full_dataobject = self.mpo.search('dataobject/' + dataobject['uid'])[0] - # add checksum so dataobject and also - self.mpo.meta(full_dataobject['do_uid'], 'ips_checksum', checksum) - self.mpo.meta(dataobject['uid'], 'ips_filename', file) - self.mpo.meta(dataobject['uid'], 'ips_checksum', checksum) - dataobject = full_dataobject - return dataobject - - recordable_events = ['IPS_CALL_BEGIN', 'IPS_STAGE_INPUTS', 'IPS_STAGE_OUTPUTS', 'IPS_CALL_END'] - recordable_mpo_activities = ['IPS_CALL_BEGIN'] - comment = event_data['comment'] - event_type = event_data['eventtype'] - - if event_type not in recordable_events: - return - inp_objs = [] - if event_type == 'IPS_CALL_END': - del sim_data.mpo_steps[-1] - return - try: - if event_type == 'IPS_STAGE_INPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, sim_data.mpo_wid['uid'], os.path.join(path, file_name), shortname=file_name, longdesc='An input file' - ) - inp_objs.append(mpo_data_obj['uid']) - - if event_type == 'IPS_STAGE_INPUTS' and not inp_objs: - return - - count = self.mpo_name_counter[sim_data.sim_name + event_data['code']] - if event_type == 'IPS_CALL_BEGIN': - target = event_data['comment'].split()[-1] - step_name = '%s %d' % (target, count) - else: - step_name = '{0:s} {1:s} {2:d}'.format(event_data['code'].split('_')[-1], event_type, count) - - if event_type == 'IPS_STAGE_OUTPUTS': - r = re.compile(r'^Elapsed time = ([0-9]*\.[0-9]*) Path = ([^ ]*) Files = (.*)') - o = r.match(comment) - (_, path, files) = o.groups() - if not files: - return - activity = self.mpo.step( - workflow_ID=sim_data.mpo_wid, parentobj_ID=sim_data.mpo_steps[-1], input_objs=inp_objs, name=step_name, desc='%s' % event_data['comment'] - ) - self.mpo_name_counter[sim_data.sim_name + event_data['code']] += 1 - if event_type == 'IPS_STAGE_OUTPUTS': - glist = [glob.glob(os.path.join(path, f)) for f in files.split()] - for file_name in [os.path.basename(f) for f in itertools.chain(*glist)]: - """ - (f_uid, f_hash) = get_file_uid(path, file_name) - if f_uid: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name), - uid=f_uid, - source = f_uid) - else: - mpo_data_obj = self.mpo.add(workflow_ID=sim_data.mpo_wid, - parentobj_ID=activity['uid'], - name=file_name, - desc="An output file", - uri='file:' + os.path.join(path, file_name)) - """ - mpo_data_obj = mpo_add_file( - sim_data.mpo_wid, - activity['uid'], - # sim_data.mpo_wid['uid'], - os.path.join(path, file_name), - shortname=file_name, - longdesc='An output file', - ) - - except Exception as e: - print('*************', e) - else: - if event_type in recordable_mpo_activities: - sim_data.mpo_steps.append(activity['uid']) - - def init_simulation(self, sim_name, sim_root): + def init_simulation(self, sim_name: str, sim_root: str, portal_runid: str): """ Create and send information about simulation *sim_name* living in *sim_root* so the portal can set up corresponding structures to manage data from the sim. """ - self.services.debug('Initializing simulation : %s -- %s ', sim_name, sim_root) - sim_data = self.SimulationData() - sim_data.sim_name = sim_name - sim_data.sim_root = sim_root + self.services.debug('Initializing simulation using PortalBridge: %s -- %s ', sim_name, sim_root) if hasattr(self, '_IPS_PORTAL_API_KEY'): self.services.set_config_param('_IPS_PORTAL_API_KEY', self._IPS_PORTAL_API_KEY, target_sim_name=sim_name) - d = datetime.datetime.now() - date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) - sim_data.portal_runid = f'{sim_name}_{self.HOST}_{self.USER}_{date_str}' + sim_data = PortalSimulationData() + sim_data.sim_name = sim_name + sim_data.sim_root = sim_root + + sim_data.portal_runid = portal_runid try: self.services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) except Exception: @@ -678,37 +435,7 @@ def init_simulation(self, sim_name, sim_root): else: self.first_portal_runid = sim_data.portal_runid - if sim_data.sim_root.strip() == '.': - sim_data.sim_root = os.environ['IPS_INITIAL_CWD'] - sim_log_dir = os.path.join(sim_data.sim_root, 'simulation_log') - try: - os.makedirs(sim_log_dir, exist_ok=True) - except OSError as oserr: - self.services.exception('Error creating Simulation Log directory %s : %d %s' % (sim_log_dir, oserr.errno, oserr.strerror)) - raise - - sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') - try: - sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) - except IOError as oserr: - self.services.error('Error opening file %s: error(%s): %s' % (sim_data.monitor_file_name, oserr.errno, oserr.strerror)) - self.services.error('Using /dev/null instead') - sim_data.monitor_file = open('/dev/null', 'w') - json_fname = sim_data.monitor_file_name.replace('eventlog', 'json') - sim_data.json_monitor_file = open(json_fname, 'w') - - if self.mpo: # pragma: no cover - try: - sim_data.mpo_wid = self.mpo.init(name='SWIM Workflow ' + os.environ['USER'], desc=sim_data.sim_name, wtype='SWIM') - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - except Exception as e: - print(e) - print('sim_data.mpo_wid = ', sim_data.mpo_wid) - sim_data.mpo_wid = None - else: - sim_data.mpo_steps = [sim_data.mpo_wid['uid']] - - self.sim_map[sim_data.sim_name] = sim_data + self.sim_map[sim_name] = sim_data def terminate(self, status: Literal[0, 1]): """ diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index e30eae8b..d20040e3 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -11,7 +11,7 @@ import time import uuid from multiprocessing import Process, Queue, set_start_method -from typing import Optional, Union +from typing import Any, Optional, Union from configobj import ConfigObj @@ -358,41 +358,56 @@ def _initialize_fwk_components(self): # SIMYAN: set up The Portal bridge, allowing for an absence of a portal use_portal = True - if 'USE_PORTAL' in self.sim_map[self.fwk_sim_name].sim_conf: + # If users explicitly disable the Portal via 'USE_PORTAL=false', or do not include a PORTAL_URL, assume the no-portal workflow. + # Otherwise, always initialize the Portal workflow + if 'PORTAL_URL' not in self.sim_map[self.fwk_sim_name].sim_conf: + use_portal = False + elif 'USE_PORTAL' in self.sim_map[self.fwk_sim_name].sim_conf: use_portal = self.sim_map[self.fwk_sim_name].sim_conf['USE_PORTAL'] if use_portal.lower() == 'false': use_portal = False + + fwk_components = [('local_logging_bridge', 'LocalLoggingBridge', lambda _config: None)] + """tuple of the component module, the component name, and a specialized configuration function which takes in a dict and modifies it""" + if use_portal: - portal_conf = {} - portal_conf['CLASS'] = 'FWK' - portal_conf['SUB_CLASS'] = 'COMP' - portal_conf['NAME'] = 'PortalBridge' + + def _config_portal(config: dict[str, Any]): + config['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + if config['PORTAL_URL']: + bridge_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) + + fwk_components.append(('portal_bridge', 'PortalBridge', _config_portal)) + + for fwk_comp in fwk_components: + bridge_conf = {} + bridge_conf['CLASS'] = 'FWK' + bridge_conf['SUB_CLASS'] = 'COMP' + bridge_conf['NAME'] = fwk_comp[1] if 'FWK_COMPS_PATH' in self.sim_map[self.fwk_sim_name].sim_conf: - portal_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] - portal_conf['SCRIPT'] = os.path.join(portal_conf['BIN_PATH'], 'portalBridge.py') + bridge_conf['BIN_PATH'] = self.sim_map[self.fwk_sim_name].sim_conf['FWK_COMPS_PATH'] + bridge_conf['SCRIPT'] = os.path.join(bridge_conf['BIN_PATH'], 'bridges', f'{fwk_comp[0]}.py') else: - portal_conf['SCRIPT'] = '' - portal_conf['MODULE'] = 'ipsframework.portalBridge' - portal_conf['INPUT_DIR'] = '/dev/null' - portal_conf['INPUT_FILES'] = '' - portal_conf['DATA_FILES'] = '' - portal_conf['OUTPUT_FILES'] = '' - portal_conf['NPROC'] = 1 - portal_conf['LOG_LEVEL'] = 'INFO' + bridge_conf['SCRIPT'] = '' + bridge_conf['MODULE'] = f'ipsframework.bridges.{fwk_comp[0]}' + bridge_conf['INPUT_DIR'] = '/dev/null' + bridge_conf['INPUT_FILES'] = '' + bridge_conf['DATA_FILES'] = '' + bridge_conf['OUTPUT_FILES'] = '' + bridge_conf['NPROC'] = 1 + bridge_conf['LOG_LEVEL'] = 'INFO' try: - portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] + bridge_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] except KeyError: - portal_conf['USER'] = self.platform_conf['USER'] - portal_conf['HOST'] = self.platform_conf['HOST'] + bridge_conf['USER'] = self.platform_conf['USER'] + bridge_conf['HOST'] = self.platform_conf['HOST'] if self.fwk.log_level == logging.DEBUG: - portal_conf['LOG_LEVEL'] = 'DEBUG' - - portal_conf['PORTAL_URL'] = self.get_platform_parameter('PORTAL_URL', silent=True) + bridge_conf['LOG_LEVEL'] = 'DEBUG' - if portal_conf['PORTAL_URL']: - portal_conf['_IPS_PORTAL_API_KEY'] = self.get_platform_parameter('_IPS_PORTAL_API_KEY', silent=True) + # additional configuration + fwk_comp[2](bridge_conf) - component_id = self._create_component(portal_conf, self.sim_map[self.fwk_sim_name]) + component_id = self._create_component(bridge_conf, self.sim_map[self.fwk_sim_name]) self.fwk_components.append(component_id) def _initialize_sim(self, sim_data): diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 57e4cda7..6c675f5a 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -53,6 +53,7 @@ """ import argparse +import datetime import hashlib import inspect import logging @@ -587,8 +588,14 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta # portal_data['phystimestamp'] = self.timeStamp get_config = self.config_manager.get_config_parameter if eventType == 'IPS_START': + user = self.config_manager.get_platform_parameter('USER') + host = self.config_manager.get_platform_parameter('HOST') + d = datetime.datetime.now() + date_str = '%s.%03d' % (d.strftime('%Y-%m-%dT%H:%M:%S'), int(d.microsecond / 1000)) + portal_runid = f'{sim_name}_{host}_{user}_{date_str}' + portal_data['state'] = 'Running' - portal_data['host'] = self.config_manager.get_platform_parameter('HOST') + portal_data['host'] = host try: portal_data['outputprefix'] = get_config(sim_name, 'OUTPUT_PREFIX') except KeyError: @@ -631,6 +638,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta portal_data['startat'] = getTimeString(time.localtime(self.config_manager.sim_map[sim_name].start_time)) portal_data['ips_version'] = get_versions()['version'] + portal_data['portal_runid'] = portal_runid try: portal_data['parent_portal_runid'] = get_config(sim_name, 'PARENT_PORTAL_RUNID') except KeyError: diff --git a/ipsframework/services.py b/ipsframework/services.py index 53bf1a86..a67f1994 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -64,7 +64,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a * `timeout` - The timeout in seconds for the task to complete. * `cpus_per_proc` - The number of cpus per process to use for the task. This implies that the DVMPlugin has set up a DVM daemon for this node. * `oversubscribe` - If `True`, then the number of processes can exceed the number of cores on the node. Default is `False`. - + If the worker has the attribute `dvm_uri_file`, then we are running with a DVM (Distributed Virtual Machine) so the `binary` needs a `prun` prepended pointing to that. @@ -147,7 +147,7 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a # It can be defined in `task_env` or in `os.environ`, so we look in # both locations to just echo its presence. The flushes are necessary # in some HPC environments to ensure the output appears in the logs. - if task_env is not None and task_env is not {}: + if task_env is not None and task_env != {}: if 'PMIX_SERVER_URI41' in task_env: worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 " f"set in task_env to " @@ -1240,7 +1240,8 @@ def get_config_param(self, param: str, silent: bool = False, log: bool = True) - val = self._get_service_response(msg_id, block=True) except Exception: if not silent: - self.exception('Error retrieving value of config parameter %s', param) + if log: + self.exception('Error retrieving value of config parameter %s', param) raise return None return val @@ -2053,7 +2054,6 @@ def add_analysis_data_files(self, current_data_file_paths: list[str], timestamp: portal_data['replace'] = replace portal_data['portal_runid'] = portal_runid event_data['portal_data'] = portal_data - # TODO make sure that we do NOT log the raw data in the IPS log file self.publish('_IPS_MONITOR', 'PORTAL_ADD_JUPYTER_DATA', event_data) self._send_monitor_event('IPS_PORTAL_ADD_JUPYTER_DATA', f'SOURCE = {source} TIMESTAMP = {timestamp} REPLACE = {replace}') @@ -2536,7 +2536,6 @@ def send_ensemble_instance_to_portal(ensemble_name: str, data_path: Path) -> Non self.info(f'Preparing to run ensembles in {run_dir}') - # Ensure that we create a unique task pool name for this using the # instance prefix `name` # check this first to ensure uniqueness of `name` parameter @@ -2664,12 +2663,12 @@ def setup(self, worker: Worker): 'for Dask worker') # Necessary to ensure the DVM "sees" all the resources to manage - os.environ['PRTE_MCA_ras_slurm_use_entire_allocation'] = "1" + os.environ['PRTE_MCA_ras_slurm_use_entire_allocation'] = '1' self.worker = worker worker.logger = self.logger - self.logger.info(f'Launching DVM') + self.logger.info('Launching DVM') self.worker.dvm_uri_file = f'/tmp/dvm.uri.{os.getpid()}' command = [#'srun', '--mpi=pmix_v4', '-N', os.environ['SLURM_NNODES'], '--ntasks-per-node=1', 'prte', #'--no-daemonize', @@ -2678,11 +2677,11 @@ def setup(self, worker: Worker): mapping_policy = 'core' # by default bind to cores if self.hwthreads: # ... unless you want to bind to hardware threads - self.logger.info(f'Binding to hardware threads') + self.logger.info('Binding to hardware threads') mapping_policy = 'hwtcpus' if self.oversubscribe: - self.logger.info(f'Allowing oversubscription of nodes') + self.logger.info('Allowing oversubscription of nodes') mapping_policy += ':oversubscribe' # This environment variable is specific to OpenMPI's PRTE @@ -2703,9 +2702,6 @@ def setup(self, worker: Worker): os.environ['PMIX_SERVER_URI41'] = self.worker.dvm_uri - - return - def teardown(self, worker: Worker): self.logger.info(f'Shutting down DVM at {self.worker.dvm_uri}') command = ['pterm', '--dvm-uri', self.worker.dvm_uri] @@ -2964,8 +2960,7 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi self.services.debug(f'Dask scheduler pid: {self.dask_sched_pid}') if not Path(self.dask_scheduler_file).exists(): - self.services.critical(f'Dask scheduler file ' - f'{self.dask_scheduler_file} does not exist') + self.services.critical(f'Dask scheduler file {self.dask_scheduler_file} does not exist') dask_nodes = 1 if dask_nodes is None else dask_nodes if services.get_config_param('MPIRUN') == 'eval':