diff --git a/pyproject.toml b/pyproject.toml index 5999706..4ec7d12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "VITools" -version = "0.4.0" +version = "0.4.1" authors = [ {name="Brandon Nelson", email="brandon.nelson@fda.hhs.gov"}, ] diff --git a/src/VITools/study.py b/src/VITools/study.py index fb7fa6f..92e738b 100644 --- a/src/VITools/study.py +++ b/src/VITools/study.py @@ -13,10 +13,12 @@ import re import sys from datetime import datetime +import os +import re from pathlib import Path from tqdm import tqdm import shutil -from time import sleep +from time import sleep, time import ast from subprocess import run from argparse import ArgumentParser @@ -388,23 +390,78 @@ def _monitor_progress(self, log_dir, scans_queued=None): output_df = self.get_scans_completed() scans_completed = len(np.unique(output_df.get('case_id', []))) errors = {} + + # Define error log path + try: + study_root = Path(self.metadata.iloc[0]['output_directory']).parent + error_log_path = study_root / 'study_errors.log' + except (IndexError, KeyError): + error_log_path = None + with tqdm(total=scans_queued, initial=scans_completed, desc='Scans completed in parallel') as pbar: while scans_completed < scans_queued: sleep(1) temp_errors = scan_logs_for_errors(log_dir, verbose=False) output_df = self.get_scans_completed() + # Update errors from logs + for task_id, msg in temp_errors.items(): + if task_id not in errors: + errors[task_id] = msg + error_msg = f"--- ERROR FOUND IN: {task_id} ---\nError Message: {msg}\n\n" + print(error_msg) + if error_log_path: + with open(error_log_path, 'a') as f: + f.write(error_msg) + # Count successful scans - successful_scans = len(np.unique(output_df.get('case_id', []))) - - # Check for errors - new_errors_found = False - if len(temp_errors) > len(errors): - errors = temp_errors - new_errors_found = True - for task_id in errors: - print(f"--- ERROR FOUND IN: {task_id} ---") - print(f"Error Message: {errors[task_id]}\n") + successful_cases = set(output_df.get('case_id', [])) + successful_scans = len(successful_cases) + + # Get set of completed IDs as integers to handle varying string formats (case_0001 vs case_1) + successful_ids = set() + for cid in successful_cases: + try: + successful_ids.add(int(str(cid).split('_')[-1])) + except (ValueError, IndexError): + pass + + # Check for stalled logs (timeout) + # Iterate over log files in log_dir + try: + for entry in os.scandir(log_dir): + if entry.is_file() and entry.name.startswith('task_') and entry.name.endswith('.log'): + # Extract task id from filename "task_0.log" + try: + # Regex match to be safe + match = re.search(r'task_(\d+)\.log', entry.name) + if match: + task_id_str = match.group(1) + task_id = int(task_id_str) + + # Check if already completed + if task_id in successful_ids: + continue + + # Check if already errored + if entry.name in errors: + continue + + # Check modification time + mtime = entry.stat().st_mtime + if time() - mtime > 8 * 3600: + msg = "Timeout: Exceeded limit of 8 hours" + errors[entry.name] = msg + error_msg = f"--- ERROR FOUND IN: {entry.name} ---\nError Message: {msg}\n\n" + print(error_msg) + if error_log_path: + with open(error_log_path, 'a') as f: + f.write(error_msg) + + except Exception: + pass + except FileNotFoundError: + pass # Total completed = successes + failures current_total_completed = successful_scans + len(errors)