diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 14cc866ff..02d25588f 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -85,7 +85,8 @@ def setup_logger(name, log_file, level=logging.INFO): return logger # first file logger -actionlogger = setup_logger('pipeline_action_logger', ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None], level=logging.DEBUG) +actionlogger_file = ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None] +actionlogger = setup_logger('pipeline_action_logger', actionlogger_file, level=logging.DEBUG) # second file logger metriclogger = setup_logger('pipeline_metric_logger', ('pipeline_metric_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None]) @@ -1808,5 +1809,46 @@ def speedup_ROOT_Init(): exit(code) actionlogger.info("Running in cgroup") -executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args) -exit (executor.execute()) + +# This starts the fanotify fileaccess monitoring process +# if asked for +o2dpg_filegraph_exec = os.getenv("O2DPG_PRODUCE_FILEGRAPH") # switches filegraph monitoring on and contains the executable name +if o2dpg_filegraph_exec: + env = os.environ.copy() + env["FILEACCESS_MON_ROOTPATH"] = os.getcwd() + env["MAXMOTHERPID"] = f"{os.getpid()}" + + fileaccess_log_file_name = f"pipeline_fileaccess_{os.getpid()}.log" + fileaccess_log_file = open(fileaccess_log_file_name, "w") + fileaccess_monitor_proc = subprocess.Popen( + [o2dpg_filegraph_exec], + stdout=fileaccess_log_file, + stderr=subprocess.STDOUT, + env=env) +else: + fileaccess_monitor_proc = None + +try: + # This is core workflow runner invocation + executor=WorkflowExecutor(args.workflowfile,jmax=int(args.maxjobs),args=args) + rc = executor.execute() +finally: + if fileaccess_monitor_proc: + fileaccess_monitor_proc.terminate() # sends SIGTERM + try: + fileaccess_monitor_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + fileaccess_monitor_proc.kill() # force kill if not stopping + # now produce the final filegraph output + o2dpg_root = os.getenv("O2DPG_ROOT") + analyse_cmd = [ + sys.executable, # runs with same Python interpreter + f"{o2dpg_root}/UTILS/FileIOGraph/analyse_FileIO.py", + "--actionFile", actionlogger_file, + "--monitorFile", fileaccess_log_file_name, + "-o", f"pipeline_fileaccess_report_{os.getpid()}.json", + "--basedir", os.getcwd() ] + print (f"Producing FileIOGraph with command {analyse_cmd}") + subprocess.run(analyse_cmd, check=True) + +sys.exit(rc) \ No newline at end of file diff --git a/UTILS/FileIOGraph/analyse_FileIO.py b/UTILS/FileIOGraph/analyse_FileIO.py index 5ad4523f9..f2e35c736 100755 --- a/UTILS/FileIOGraph/analyse_FileIO.py +++ b/UTILS/FileIOGraph/analyse_FileIO.py @@ -22,7 +22,7 @@ # the run-number of data taking or default if unanchored parser.add_argument('--actionFile', type=str, help="O2DPG pipeline runner action file") parser.add_argument('--monitorFile', type=str, help="monitoring file provided by fanotify tool. See O2DPG/UTILS/FileIOGraph.") -parser.add_argument('--basedir', type=str, help="O2DPG workflow dir") +parser.add_argument('--basedir', default="/", type=str, help="O2DPG workflow dir") parser.add_argument('--file-filters', nargs='+', default=[r'.*'], help="Filters (regular expressions) to select files (default all = '.*')") parser.add_argument('--graphviz', type=str, help="Produce a graphviz plot") parser.add_argument('-o','--output', type=str, help="Output JSON report") @@ -60,7 +60,8 @@ file_written_task = {} file_consumed_task = {} -pattern = re.compile(args.basedir + r'([^,]+),((?:read|write)),(.*)') +pattern = re.compile(r'"?([^"]+)"?,((?:read|write)),(.*)') +basedir_pattern = re.compile("^" + args.basedir) # neglecting some framework file names file_exclude_filter = re.compile(r'(.*)\.log(.*)|(ccdb/log)|(.*)dpl-config\.json') @@ -76,6 +77,13 @@ mode = match.group(2) pids = match.group(3).split(";") + # see if matches the workdir + if not basedir_pattern.match(file_name): + continue + + # remove basedir from file_name + file_name = file_name.replace(args.basedir + '/', "./", 1) + # implement file name filter if file_exclude_filter.match(file_name): continue diff --git a/UTILS/FileIOGraph/monitor_fileaccess_v2.cpp b/UTILS/FileIOGraph/monitor_fileaccess_v2.cpp index e12e3c6b9..2f3750bc6 100644 --- a/UTILS/FileIOGraph/monitor_fileaccess_v2.cpp +++ b/UTILS/FileIOGraph/monitor_fileaccess_v2.cpp @@ -129,8 +129,8 @@ int main(int argc, char **argv) auto ROOT_PATH_ENV = getenv("FILEACCESS_MON_ROOTPATH"); std::string root_path = "/"; if (ROOT_PATH_ENV) { - std::cerr << "Observing file access below " << root_path << "\n"; root_path = std::string(ROOT_PATH_ENV); + std::cerr << "Observing file access below " << root_path << "\n"; } CHK(fan = fanotify_init(FAN_CLASS_NOTIF, O_RDONLY), -1);