Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "VITools"
version = "0.4.0"
version = "0.4.1"
authors = [
{name="Brandon Nelson", email="brandon.nelson@fda.hhs.gov"},
]
Expand Down
79 changes: 68 additions & 11 deletions src/VITools/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import re
import sys
from datetime import datetime
import os
import re
from pathlib import Path
from tqdm import tqdm
import shutil
from time import sleep
from time import sleep, time
import ast
from subprocess import run
from argparse import ArgumentParser
Expand Down Expand Up @@ -388,23 +390,78 @@ def _monitor_progress(self, log_dir, scans_queued=None):
output_df = self.get_scans_completed()
scans_completed = len(np.unique(output_df.get('case_id', [])))
errors = {}

# Define error log path
try:
study_root = Path(self.metadata.iloc[0]['output_directory']).parent
error_log_path = study_root / 'study_errors.log'
except (IndexError, KeyError):
error_log_path = None

with tqdm(total=scans_queued, initial=scans_completed, desc='Scans completed in parallel') as pbar:
while scans_completed < scans_queued:
sleep(1)
temp_errors = scan_logs_for_errors(log_dir, verbose=False)
output_df = self.get_scans_completed()

# Update errors from logs
for task_id, msg in temp_errors.items():
if task_id not in errors:
errors[task_id] = msg
error_msg = f"--- ERROR FOUND IN: {task_id} ---\nError Message: {msg}\n\n"
print(error_msg)
if error_log_path:
with open(error_log_path, 'a') as f:
f.write(error_msg)

# Count successful scans
successful_scans = len(np.unique(output_df.get('case_id', [])))

# Check for errors
new_errors_found = False
if len(temp_errors) > len(errors):
errors = temp_errors
new_errors_found = True
for task_id in errors:
print(f"--- ERROR FOUND IN: {task_id} ---")
print(f"Error Message: {errors[task_id]}\n")
successful_cases = set(output_df.get('case_id', []))
successful_scans = len(successful_cases)

# Get set of completed IDs as integers to handle varying string formats (case_0001 vs case_1)
successful_ids = set()
for cid in successful_cases:
try:
successful_ids.add(int(str(cid).split('_')[-1]))
except (ValueError, IndexError):
pass

# Check for stalled logs (timeout)
# Iterate over log files in log_dir
try:
for entry in os.scandir(log_dir):
if entry.is_file() and entry.name.startswith('task_') and entry.name.endswith('.log'):
# Extract task id from filename "task_0.log"
try:
# Regex match to be safe
match = re.search(r'task_(\d+)\.log', entry.name)
if match:
task_id_str = match.group(1)
task_id = int(task_id_str)

# Check if already completed
if task_id in successful_ids:
continue

# Check if already errored
if entry.name in errors:
continue

# Check modification time
mtime = entry.stat().st_mtime
if time() - mtime > 8 * 3600:
msg = "Timeout: Exceeded limit of 8 hours"
errors[entry.name] = msg
error_msg = f"--- ERROR FOUND IN: {entry.name} ---\nError Message: {msg}\n\n"
print(error_msg)
if error_log_path:
with open(error_log_path, 'a') as f:
f.write(error_msg)

except Exception:
pass
except FileNotFoundError:
pass

# Total completed = successes + failures
current_total_completed = successful_scans + len(errors)
Expand Down