diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 33c875782..30f3462fa 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -100,6 +100,7 @@ def setup_logger(name, log_file, level=logging.INFO): meta["cpu_limit"] = args.cpu_limit meta["mem_limit"] = args.mem_limit meta["workflow_file"] = os.path.abspath(args.workflowfile) +args.target_tasks = [f.strip('"').strip("'") for f in args.target_tasks] # strip quotes from the shell meta["target_task"] = args.target_tasks meta["rerun_from"] = args.rerun_from meta["target_labels"] = args.target_labels @@ -321,12 +322,12 @@ def load_json(workflowfile): # filters the original workflowspec according to wanted targets or labels -# returns a new workflowspec +# returns a new workflowspec and the list of "final" workflowtargets def filter_workflow(workflowspec, targets=[], targetlabels=[]): if len(targets)==0: - return workflowspec + return workflowspec, [] if len(targetlabels)==0 and len(targets)==1 and targets[0]=="*": - return workflowspec + return workflowspec, [] transformedworkflowspec = workflowspec @@ -334,7 +335,7 @@ def task_matches(t): for filt in targets: if filt=="*": return True - if re.match(filt, t)!=None: + if re.match(filt, t) != None: return True return False @@ -372,6 +373,8 @@ def canBeDone(t,cache={}): ok = False break cache[t['name']] = ok + if ok == False: + print (f"Disabling target {t['name']} due to unsatisfied requirements") return ok okcache = {} @@ -404,7 +407,7 @@ def needed_by_targets(name): # we finaly copy everything matching the targets as well # as all their requirements transformedworkflowspec['stages']=[ l for l in workflowspec['stages'] if needed_by_targets(l['name']) ] - return transformedworkflowspec + return transformedworkflowspec, full_target_name_list # builds topological orderings (for each timeframe) @@ -898,7 +901,7 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5) break -def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict: +def filegraph_expand_timeframes(data: dict, timeframes: set, target_namelist) -> dict: """ A utility function for the fileaccess logic. Takes a template and duplicates for the multi-timeframe structure. @@ -921,6 +924,12 @@ def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict: entry["written_by"] = [ re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"] ] + # for now we mark some files as keep if they are written + # by a target in the runner targetlist. TODO: Add other mechanisms + # to ask for file keeping (such as via regex or the like) + for e in entry["written_by"]: + if e in target_namelist: + entry["keep"] = True entry["read_by"] = [ re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"] ] @@ -945,7 +954,8 @@ def __init__(self, workflowfile, args, jmax=100): os.environ[e] = str(value) # only keep those tasks that are necessary to be executed based on user's filters - self.workflowspec = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels) + self.full_target_namelist = [] + self.workflowspec, self.full_target_namelist = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels) if not self.workflowspec['stages']: if args.target_tasks: @@ -1015,7 +1025,7 @@ def __init__(self, workflowfile, args, jmax=100): with open(args.remove_files_early) as f: filegraph_data = json.load(f) self.do_early_file_removal = True - self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset) + self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset, self.full_target_namelist) def perform_early_file_removal(self, taskids): @@ -1031,7 +1041,7 @@ def remove_if_exists(filepath: str) -> None: if os.path.exists(filepath): fsize = os.path.getsize(filepath) os.remove(filepath) - actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.") + actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024./1024.} MB.") return True return False @@ -1057,7 +1067,7 @@ def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes): file_entry['written_by'].remove(taskname) # TODO: in principle the written_by criterion might not be needed - if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0: + if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0 and file_entry.get('keep', False) == False: # the filename mentioned here is no longer needed and we can remove it # make sure it is there and then delete it if remove_if_exists(filename): @@ -1329,6 +1339,17 @@ def monitor(self, process_list): globalPSS=0. resources_per_task = {} + # On a global level, we are interested in total disc space used (not differential in tasks) + # We can call system "du" as the fastest impl + def disk_usage_du(path: str) -> int: + """Use system du to get total size in bytes.""" + out = subprocess.check_output(['du', '-sb', path], text=True) + return int(out.split()[0]) + + disc_usage = -1 + if os.getenv("MONITOR_DISC_USAGE"): + disc_usage = disk_usage_du(os.getcwd()) / 1024. / 1024 # in MB + for tid, proc in process_list: # proc is Popen object @@ -1399,7 +1420,15 @@ def monitor(self, process_list): totalUSS = totalUSS / 1024 / 1024 totalPSS = totalPSS / 1024 / 1024 nice_value = proc.nice() - resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS, 'pss':totalPSS, 'nice':nice_value, 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']} + resources_per_task[tid]={'iter':self.internalmonitorid, + 'name':self.idtotask[tid], + 'cpu':totalCPU, + 'uss':totalUSS, + 'pss':totalPSS, + 'nice':nice_value, + 'swap':totalSWAP, + 'label':self.workflowspec['stages'][tid]['labels'], + 'disc': disc_usage} self.resource_manager.add_monitored_resources(tid, time_delta, totalCPU / 100, totalPSS) if nice_value == self.resource_manager.nice_default: globalCPU += totalCPU diff --git a/MC/bin/o2dpg_qc_finalization_workflow.py b/MC/bin/o2dpg_qc_finalization_workflow.py index 78792b424..e05bf83d1 100755 --- a/MC/bin/o2dpg_qc_finalization_workflow.py +++ b/MC/bin/o2dpg_qc_finalization_workflow.py @@ -46,7 +46,7 @@ def add_QC_finalization(taskName, qcConfigPath, needs=None): if standalone == True: needs = [] elif needs == None: - needs = [taskName + '_local' + str(tf) for tf in range(1, ntimeframes + 1)] + needs = [taskName + '_local_' + str(tf) for tf in range(1, ntimeframes + 1)] task = createTask(name=QC_finalize_name(taskName), needs=needs, cwd=qcdir, lab=["QC"], cpu=1, mem='2000') def remove_json_prefix(path): diff --git a/MC/bin/o2dpg_sim_workflow.py b/MC/bin/o2dpg_sim_workflow.py index 795545199..6923a50cc 100755 --- a/MC/bin/o2dpg_sim_workflow.py +++ b/MC/bin/o2dpg_sim_workflow.py @@ -1726,7 +1726,7 @@ def getDigiTaskName(det): if includeFullQC or includeLocalQC: def addQCPerTF(taskName, needs, readerCommand, configFilePath, objectsFile=''): - task = createTask(name=taskName + '_local' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000') + task = createTask(name=taskName + '_local_' + str(tf), needs=needs, tf=tf, cwd=timeframeworkdir, lab=["QC"], cpu=1, mem='2000') objectsFile = objectsFile if len(objectsFile) > 0 else taskName + '.root' def remove_json_prefix(path): diff --git a/MC/run/ANCHOR/anchorMC.sh b/MC/run/ANCHOR/anchorMC.sh index 70ac64a87..77fb37af3 100755 --- a/MC/run/ANCHOR/anchorMC.sh +++ b/MC/run/ANCHOR/anchorMC.sh @@ -338,6 +338,10 @@ fi TIMESTAMP=`grep "Determined timestamp to be" ${anchoringLogFile} | awk '//{print $6}'` echo_info "TIMESTAMP IS ${TIMESTAMP}" +if [ "${ONLY_WORKFLOW_CREATION}" ]; then + exit 0 +fi + # check if this job is exluded because it falls inside a bad data-taking period ISEXCLUDED=$(grep "TIMESTAMP IS EXCLUDED IN RUN" ${anchoringLogFile}) if [ "${ISEXCLUDED}" ]; then @@ -383,30 +387,36 @@ export FAIRMQ_IPC_PREFIX=./ echo_info "Ready to start main workflow" -${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${ALIEN_JDL_O2DPGWORKFLOWTARGET:-aod} --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources -MCRC=$? # <--- we'll report back this code -if [[ "${MCRC}" == "0" && "${ALIEN_JDL_ADDTIMESERIESINMC}" != "0" ]]; then - # Default value is 1 so this is run by default. - echo_info "Running TPC time series" - ${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt tpctimes - # Note: We could maybe avoid this if-else by including `tpctimes` directly in the workflow-targets above -fi - -if [[ "${MCRC}" == "0" && "${ALIEN_JDL_DOTPCRESIDUALEXTRACTION}" = "1" ]]; then - echo_info "Running TPC residuals extraction, aggregation and merging" - ${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt tpcresidmerge +# Let us construct the workflow targets +targetString="" +if [ "${ALIEN_JDL_O2DPGWORKFLOWTARGET}" ]; then + # The user gave ${ALIEN_JDL_O2DPGWORKFLOWTARGET}. This is an expert mode not used in production. + # In this case, we will build just that. No QC, no TPC timeseries, ... + targetString=${ALIEN_JDL_O2DPGWORKFLOWTARGET} +else + targetString="'aodmerge.*'" + # Now add more targets depending on options + # -) The TPC timeseries targets + if [[ "${ALIEN_JDL_ADDTIMESERIESINMC}" == "1" ]]; then + targetString="${targetString} 'tpctimes.*'" + fi + # -) TPC residual calibration + if [ "${ALIEN_JDL_DOTPCRESIDUALSEXTRACTION}" ]; then + targetString="${targetString} 'tpcresidmerge.*'" + fi + # -) QC tasks + if [[ -z "${DISABLE_QC}" && "${remainingargs}" == *"--include-local-qc"* ]]; then + targetString="${targetString} '^.*QC.*'" # QC tasks should have QC in the name + fi fi +echo_info "Workflow will run with target specification ${targetString}" -[[ -n "${DISABLE_QC}" ]] && echo_info "QC is disabled, skip it." +${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${targetString} \ + --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources \ + ${ALIEN_O2DPG_FILEGRAPH:+--remove-files-early ${ALIEN_O2DPG_FILEGRAPH}} \ + ${ALIEN_O2DPG_ADDITIONAL_WORKFLOW_RUNNER_ARGS} -if [[ -z "${DISABLE_QC}" && "${MCRC}" == "0" && "${remainingargs}" == *"--include-local-qc"* ]] ; then - # do QC tasks - echo_info "Doing QC" - ${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-labels QC --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} -k - # NOTE that with the -k|--keep-going option, the runner will try to keep on executing even if some tasks fail. - # That means, even if there is a failing QC task, the return code will be 0 - MCRC=$? -fi +MCRC=$? # <--- we'll report back this code # # full logs tar-ed for output, regardless the error code or validation - to catch also QC logs...