diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 02d25588f..33c875782 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -14,6 +14,7 @@ import traceback import platform import tarfile +from copy import deepcopy try: from graphviz import Digraph havegraphviz=True @@ -65,6 +66,9 @@ parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup +parser.add_argument('--remove-files-early', type=str, default="", help="Delete intermediate files early (using the file graph information in the given file)") + + # Logging parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used') parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used') @@ -894,6 +898,38 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5) break +def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict: + """ + A utility function for the fileaccess logic. Takes a template and duplicates + for the multi-timeframe structure. + """ + tf_entries = [ + entry for entry in data.get("file_report", []) + if re.match(r"^\./tf\d+/", entry["file"]) + ] + + result = {} + for i in timeframes: + if i == -1: + continue + # Deepcopy to avoid modifying original + new_entries = deepcopy(tf_entries) + for entry in new_entries: + # Fix filepath + entry["file"] = re.sub(r"^\./tf\d+/", f"./tf{i}/", entry["file"]) + # Fix written_by and read_by (preserve prefix, change numeric suffix) + entry["written_by"] = [ + re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"] + ] + entry["read_by"] = [ + re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"] + ] + result[f"timeframe-{i}"] = new_entries + + return result + + + class WorkflowExecutor: # Constructor def __init__(self, workflowfile, args, jmax=100): @@ -929,6 +965,7 @@ def __init__(self, workflowfile, args, jmax=100): # construct task ID <-> task name lookup self.idtotask = [ 0 for _ in self.taskuniverse ] self.tasktoid = {} + self.idtotf = [ l['timeframe'] for l in self.workflowspec['stages'] ] for i, name in enumerate(self.taskuniverse): self.tasktoid[name]=i self.idtotask[i]=name @@ -970,6 +1007,72 @@ def __init__(self, workflowfile, args, jmax=100): # init alternative software environments self.init_alternative_software_environments() + # initialize container to keep track of file-task relationsships + self.file_removal_candidates = {} + self.do_early_file_removal = False + self.timeframeset = set([ task["timeframe"] for task in self.workflowspec['stages'] ]) + if args.remove_files_early != "": + with open(args.remove_files_early) as f: + filegraph_data = json.load(f) + self.do_early_file_removal = True + self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset) + + + def perform_early_file_removal(self, taskids): + """ + This function checks which files can be deleted upon completion of task + and optionally does so. + """ + + def remove_if_exists(filepath: str) -> None: + """ + Check if a file exists, and remove it if found. + """ + if os.path.exists(filepath): + fsize = os.path.getsize(filepath) + os.remove(filepath) + actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.") + return True + + return False + + def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes): + marked_for_removal = [] + + timeframestoscan = [ timeframe_id ] + if timeframe_id == -1: + timeframestoscan = [ i for i in listofalltimeframes if i != -1 ] + + # TODO: Note that this traversal of files is not certainly not optimal + # We should (and will) keep an mapping of tasks->potential files and just + # scan these. This is already provided by the FileIOGraph analysis tool. + for tid in timeframestoscan: + for i,file_entry in enumerate(file_dict[f"timeframe-{tid}"]): + filename = file_entry['file'] + read_by = file_entry['read_by'] + written_by = file_entry['written_by'] + if taskname in read_by: + file_entry['read_by'].remove(taskname) + if taskname in written_by: + file_entry['written_by'].remove(taskname) + + # TODO: in principle the written_by criterion might not be needed + if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0: + # the filename mentioned here is no longer needed and we can remove it + # make sure it is there and then delete it + if remove_if_exists(filename): + # also take out the file entry from the dict altogether + marked_for_removal.append(file_entry) + + #for k in marked_for_removal: + # file_dict[f"timeframe-{tid}"].remove(k) + + for tid in taskids: + taskname = self.idtotask[tid] + timeframe_id = self.idtotf[tid] + remove_for_task_id(taskname, self.file_removal_candidates, timeframe_id, self.timeframeset) + + def SIGHandler(self, signum, frame): """ basically forcing shut down of all child processes @@ -1737,6 +1840,10 @@ def speedup_ROOT_Init(): actionlogger.debug("finished now :" + str(finished_from_started)) finishedtasks = finishedtasks + finished + # perform file cleanup + if self.do_early_file_removal: + self.perform_early_file_removal(finished_from_started) + if self.is_productionmode: # we can do some generic cleanup of finished tasks in non-interactive/GRID mode # TODO: this can run asynchronously