diff --git a/examples/lcmodel/README.md b/examples/lcmodel/README.md new file mode 100644 index 0000000..cfb7d54 --- /dev/null +++ b/examples/lcmodel/README.md @@ -0,0 +1,11 @@ +### LCModel examples + +This folder contains example code to launch the LCModel pipeline using the VIP Python client. It provides three configurations depending on the data source: + +- **Data on VIP**: The notebook `launch_lcmodel.ipynb` uses inputs already stored on VIP and writes the results back to VIP. +- **Data on Girder**: The notebook `launch_with_girder.ipynb` uses inputs stored on Girder and saves the produced outputs there. +- **Local data**: The script `script_for_local_data/exec_LCModel.py` runs the LCModel pipeline on local inputs and downloads the results at the end of the session. + +Note that `exec_LCModel.py` relies on `script_for_local_data/session_utils.py` to provide additional flexibility (e.g., launching multiple sessions in parallel, queueing upcoming sessions, and deduplicating uploads for inputs shared across sessions). Some of these features use internal mechanisms of the Python client and are not official examples of the standard usage of the client. + +The notebook `launch_with_girder.ipynb` and the script `exec_LCModel.py` were created in collaboration with researchers from the CREATIS laboratory to study the variability of results as the DKNTMN parameter (specified in `.control` files) changes. \ No newline at end of file diff --git a/examples/lcmodel/launch_with_girder.ipynb b/examples/lcmodel/launch_with_girder.ipynb new file mode 100644 index 0000000..fda6bce --- /dev/null +++ b/examples/lcmodel/launch_with_girder.ipynb @@ -0,0 +1,438 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9d71b4e0", + "metadata": {}, + "source": [ + "## 1. Description\n", + "\n", + "This notebook automates running and aggregating LCModel quantification results. It can launch executions over multiple signal files while varying parameter files. It uses the VIP platform to execute jobs and Girder to manage files. Finally, it can aggregate the results into a tabular file to facilitate analysis.\n", + "\n", + "- Configure and authenticate against the VIP and Girder APIs\n", + "- Run LCModel jobs on signal and parameter files with VIP\n", + "- Download and extract result `.tgz` archives\n", + "- Parse LCModel `.table` files into DataFrames\n", + "- Aggregate data and add metadata (DKNTMN, file name)\n", + "- Compare quantifications (`Rate_Cr`) using boxplots" + ] + }, + { + "cell_type": "markdown", + "id": "4157109a", + "metadata": {}, + "source": [ + "## 2. Imports & Configuration\n", + "\n", + "This cell includes:\n", + "\n", + "- **Standard libraries**: file, path and archive handling\n", + "- **Data science libraries**: pandas, numpy, matplotlib\n", + "- **VIP & Girder clients** for data access\n", + "- **User parameters**: API keys, URL, Girder folder IDs, local paths" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d329905e", + "metadata": {}, + "outputs": [], + "source": [ + "# 2.1 — Standard Libraries\n", + "import os\n", + "import tarfile\n", + "import shutil\n", + "from pathlib import Path\n", + "\n", + "# 2.2 — Data Science\n", + "import pandas as pd\n", + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "\n", + "# 2.3 — VIP & Girder\n", + "from vip_client.classes import VipGirder\n", + "from girder_client import GirderClient\n", + "\n", + "# 2.4 — User parameters (to fill)\n", + "VIP_API_KEY = os.environ.get('VIP_API_KEY')\n", + "GIRDER_API_KEY = os.environ.get('GIRDER_API_KEY')\n", + "\n", + "GIRDER_URL = \"https://srmnopt.creatis.insa-lyon.fr/api/v1\"\n", + "PIPELINE_ID = \"LCModel/0.2\"\n", + "LAUNCH_EXECUTION = True\n", + "\n", + "# 2.5 — Pipeline parameters\n", + "CONTROL_FOLDER_PATH = '/collection/Schizemo/DATA/DATA_REPRO_VIP/inputs/parameters-DKNTMN-from-10-to-150'\n", + "SIGNAL_FOLDER_PATH = '/collection/Schizemo/DATA/DATA_REPRO_VIP'\n", + "OUTPUT_FOLDER_PATH = '/collection/Schizemo/DATA/DATA_REPRO_VIP/outputs-DKNTMN-from-10-to-150/old'\n", + "BASIS_FOLDER_PATH = '/collection/Schizemo/DATA/DATA_REPRO_VIP/inputs'\n", + "\n", + "# 2.6 — Local directories to save files\n", + "EXTRACTION_FOLDER = Path('./downloaded_outputa')\n", + "EXTRACTION_FOLDER.mkdir(exist_ok=True)" + ] + }, + { + "cell_type": "markdown", + "id": "43f532dd", + "metadata": {}, + "source": [ + "## 3. VIP and Girder: Initialization\n", + "\n", + "Authenticate and prepare the clients:\n", + "\n", + "- **VIP** to launch sessions and retrieve results\n", + "- **GirderClient** to browse collections and download files" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c7e53689", + "metadata": {}, + "outputs": [], + "source": [ + "# 3.1 — VIP\n", + "if LAUNCH_EXECUTION:\n", + " VipGirder.init(\n", + " vip_key=VIP_API_KEY,\n", + " girder_key=GIRDER_API_KEY,\n", + " girder_api_url=GIRDER_URL,\n", + " girder_id_prefix=\"magicsGirder\"\n", + " )\n", + "\n", + "# 3.2 — GirderClient\n", + "gc = GirderClient(apiUrl=GIRDER_URL)\n", + "gc.authenticate(apiKey=GIRDER_API_KEY)" + ] + }, + { + "cell_type": "markdown", + "id": "12777015", + "metadata": {}, + "source": [ + "## 4. Running LCModel jobs (Optional)\n", + "\n", + "If `LAUNCH_EXECUTION` is `True`, this cell runs LCModel jobs on the specified signal and parameter files. It uses the paths provided by the user in the first cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb6a4279", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "if LAUNCH_EXECUTION:\n", + " control_files_list = gc.listItem(gc.resourceLookup(CONTROL_FOLDER_PATH)['_id'])\n", + " control_files_list = list(control_files_list)\n", + "\n", + " signal_files_list = gc.listItem(gc.resourceLookup(SIGNAL_FOLDER_PATH)['_id'])\n", + " signal_files_list = list(signal_files_list)\n", + "\n", + " for i in range(0, len(control_files_list)):\n", + " INPUTS_SETTINGS = {\n", + " \"signal_file\": [SIGNAL_FOLDER_PATH + \"/\" + signal_files_list[k][\"name\"] for k in range(len(signal_files_list))],\n", + " \"zipped_folder\": BASIS_FOLDER_PATH + \"/basis.zip\",\n", + " \"makebasis_file\": BASIS_FOLDER_PATH + \"/makeBasis_3T_Mac_VIP.in\",\n", + " \"control_file\": CONTROL_FOLDER_PATH + \"/\" + control_files_list[i][\"name\"], \n", + " }\n", + " # split this to keep only the number at the end, prefix it with DKNTMN_\n", + " session_name = \"asupr_LCMODEL_DKNTMN_\" + control_files_list[i][\"name\"].split(\"_\")[-1].split(\".\")[0]\n", + " # create a folder in the output path with the session name\n", + " subfolder = gc.createFolder(gc.resourceLookup(OUTPUT_FOLDER_PATH)['_id'], session_name, public=True)\n", + " new_session = VipGirder(\n", + " session_name = session_name,\n", + " pipeline_id = PIPELINE_ID,\n", + " input_settings = INPUTS_SETTINGS,\n", + " output_dir = OUTPUT_FOLDER_PATH + '/' + session_name,\n", + " ).run_session(nb_runs=1)\n", + "else:\n", + " print(\"LAUNCH_EXECUTION is set to False, skipping the pipeline\")" + ] + }, + { + "cell_type": "markdown", + "id": "73b0e950", + "metadata": {}, + "source": [ + "## 5. Utility Functions\n", + "\n", + "We define here the routines for downloading, extracting and parsing:\n", + "- **download_and_extract_tgz**: handles download and extraction without blocking on Windows\n", + "- **get_table**: reads an LCModel `.table` file, returns a DataFrame and diagnostics\n", + "- **parse_lcmodel**: reformats raw data, computes CRLB and convergence status" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2922f579", + "metadata": {}, + "outputs": [], + "source": [ + "def download_and_extract_tgz(item: any, dest_dir: Path):\n", + " \"\"\"\n", + " Download a `.tgz` from Girder (item_id) into dest_dir,\n", + " extract its contents, then remove the archive.\n", + " \"\"\"\n", + " tgz_path = dest_dir / f\"{item['name']}.tgz\"\n", + " gc.downloadItem(item['_id'], str(tgz_path))\n", + " \n", + " file = os.listdir(tgz_path)[0]\n", + " # tgz_path = tgz_path + '/' + file\n", + " tgz_path = os.path.join(tgz_path, file)\n", + " # Extraction\n", + " with tarfile.open(tgz_path, mode='r:gz') as tar:\n", + " tar.extractall(path=str(dest_dir))\n", + "\n", + " tgz_path = Path(tgz_path)\n", + " tgz_path.unlink()" + ] + }, + { + "cell_type": "markdown", + "id": "8fda0234", + "metadata": {}, + "source": [ + "### 5.2 Reading an LCModel `.table` file" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "984f9fde", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "def get_table(table_path) -> tuple[pd.DataFrame, list]:\n", + " \"\"\"Function to read a .table file after LCmodel execution\"\"\"\n", + "\n", + " # Create an empty data frame \n", + " data = pd.DataFrame()\n", + " \n", + " # Read the result file\n", + " with open(table_path, 'r') as f:\n", + " # Look for Concentration data\n", + " while f.readline().split(' ')[0] != \"$$CONC\":\n", + " pass\n", + " # Get headers\n", + " line = f.readline()[:-1]\n", + " headers = list(filter(None, line.split()))\n", + " lh = len(headers)\n", + " # Get the data\n", + " rawdata=[]\n", + " line = f.readline()[:-1]\n", + " while line :\n", + " # Split line by unknown number of spaces\n", + " l = list(filter(None, line.split())) \n", + " if len(l) < lh:\n", + " # Problem when a +/- sign replaces the space for macromolecules\n", + " for sep in [\"+\", \"-\"]:\n", + " if sep in l[-1]:\n", + " l = l[:-1] + l[-1].split(sep) \n", + " # Update the result matrix\n", + " rawdata.append(l) \n", + " # Read new line\n", + " line = f.readline()[:-1]\n", + " # Look for the Diagnostics table\n", + " while f.readline().split(' ')[0] != \"$$DIAG\":\n", + " pass\n", + " # Read first line after $$DIAG\n", + " line = f.readline()[:-1]\n", + " diag = []\n", + " # Record each line\n", + " while line:\n", + " diag.append(line)\n", + " line = f.readline()[:-1]\n", + "\n", + " # Convert the data to a dataframe\n", + " data = pd.DataFrame(\n", + " {\n", + " headers[j]: [\n", + " rawdata[i][j] for i in range(len(rawdata))\n", + " ] for j in range(len(headers))\n", + " }\n", + " )\n", + " # Return the results\n", + " return data, diag" + ] + }, + { + "cell_type": "markdown", + "id": "1c2d089e", + "metadata": {}, + "source": [ + "### 5.3 Parsing LCModel results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "064e706c", + "metadata": {}, + "outputs": [], + "source": [ + "def parse_lcmodel(data_raw, diag, exec_path) -> pd.DataFrame:\n", + " \"\"\"Reformats LCModel output and adds the convergence flag.\"\"\"\n", + " data_exec = pd.DataFrame({\n", + " 'Metabolite': data_raw['Metabolite'],\n", + " 'Rate_Raw': data_raw['Conc.'].astype(float),\n", + " 'Rate_Cr': data_raw['/Cr+PCr'].astype(float),\n", + " })\n", + " data_exec['CRLB_Raw'] = (\n", + " data_raw['%SD'].str.rstrip('%').astype(float) *\n", + " data_exec['Rate_Raw'] / 100\n", + " )\n", + " cr_val = data_exec.loc[data_exec['Metabolite']=='Cr+PCr', 'Rate_Cr'].iat[0]\n", + " fit_ok = cr_val != 0\n", + " diag_ok = all(\n", + " d.split()[1].lower() in ('info','warning') for d in diag if d.strip()\n", + " )\n", + " return data_exec.assign(Convergence=fit_ok and diag_ok)\n" + ] + }, + { + "cell_type": "markdown", + "id": "9b65f5b8", + "metadata": {}, + "source": [ + "## 6. Downloading & Extracting Results\n", + "\n", + "Iterate over each `LCMODEL_DKNTMN_` folder on Girder, then:\n", + "\n", + "1. Download the `.tgz` archives\n", + "2. Extract them locally using the `download_and_extract_tgz` function\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "257ae77a", + "metadata": {}, + "outputs": [], + "source": [ + "# 6.1 — List DKNTMN folders on Girder\n", + "folders = list(gc.listFolder(gc.resourceLookup(OUTPUT_FOLDER_PATH)['_id']))\n", + "print(gc.resourceLookup(OUTPUT_FOLDER_PATH))\n", + "dkntmn_folders = [\n", + " f for f in folders if f['name'].startswith('LCMODEL_DKNTMN_')\n", + "]\n", + "\n", + "# 6.2 — For each folder, download + extract\n", + "for folder in dkntmn_folders:\n", + " sub = next(gc.listFolder(folder['_id']))\n", + " for item in gc.listItem(sub['_id']):\n", + " if item['name'].endswith('.tgz'):\n", + " dest = EXTRACTION_FOLDER / folder['name'] / item['name']\n", + " download_and_extract_tgz(item, dest)\n", + " # Move the .table file\n", + " table_path = dest / 'result.table'\n", + " if table_path.exists():\n", + " # Move the .table file\n", + " new_table_path = str(dest).split('.')[0] + '.table'\n", + " table_path.rename(new_table_path)\n", + " else:\n", + " print(f\"Table file not found for {item['name']} in {dest}.\")\n", + " # Remove the extracted directory\n", + " shutil.rmtree(dest, ignore_errors=True)\n" + ] + }, + { + "cell_type": "markdown", + "id": "d80238bd", + "metadata": {}, + "source": [ + "## 7. Aggregation & Cleanup of `.table` files\n", + "Aggregate LCModel quantification results into a single DataFrame, written to a CSV file. This centralizes data for later analysis, avoiding repeated downloads and parsing of individual files.\n", + "\n", + "1. Collect all extracted `.table` files\n", + "2. Parse each table and add `DKNTMN` and `File` columns\n", + "3. Concatenate into a single DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28c5a9a6", + "metadata": {}, + "outputs": [], + "source": [ + "# 7.1 — List `.table` files\n", + "tables = []\n", + "for wf in EXTRACTION_FOLDER.iterdir():\n", + " for file in wf.glob('**/*.table'):\n", + " tables.append((wf.name, file))\n", + "\n", + "# 7.2 — Concatenate all results\n", + "all_df = []\n", + "for dkntmn, path in tables:\n", + " raw, diag = get_table(path)\n", + " df = parse_lcmodel(raw, diag, EXTRACTION_FOLDER)\n", + " df['DKNTMN'] = dkntmn.split('_')[-1]\n", + " # sort the dataframe by DKNTMN\n", + " df = df.sort_values(by=['DKNTMN'])\n", + " df['File'] = path.stem\n", + " df['Group'] = path.stem.split('_')[1]\n", + " all_df.append(df)\n", + "\n", + "data_set = pd.concat(all_df, ignore_index=True)\n", + "data_set.to_csv('data_set.csv', index=False)\n" + ] + }, + { + "cell_type": "markdown", + "id": "e7b9a4b6", + "metadata": {}, + "source": [ + "## 8. Save the aggregated CSV to Girder\n", + "- **Export** the aggregated DataFrame to a CSV file\n", + "- **Upload** the CSV file to Girder into the specified folder\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cfb4b301", + "metadata": {}, + "outputs": [], + "source": [ + "data_set.to_csv('data_set.csv', index=False)\n", + "item = gc.createItem(\n", + " parentFolderId=gc.resourceLookup(OUTPUT_FOLDER_PATH)['_id'],\n", + " name='data_set.csv',\n", + " description='Aggregated LCModel results from multiple DKNTMN runs.'\n", + ")\n", + "gc.uploadFileToItem(\n", + " itemId=item['_id'],\n", + " filename='data_set.csv',\n", + " filepath='./data_set.csv'\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/lcmodel/script_for_local_data/exec_LCModel.py b/examples/lcmodel/script_for_local_data/exec_LCModel.py new file mode 100644 index 0000000..87897c8 --- /dev/null +++ b/examples/lcmodel/script_for_local_data/exec_LCModel.py @@ -0,0 +1,177 @@ +# 1. Imports +import os +import shutil +from pathlib import Path + +from vip_client.classes import VipSession +from session_utils import ( + create_session_for_control_file, + manage_concurrent_workflows, + cleanup_vip_sessions +) + +# 2. User parameters (to edit) +VIP_API_KEY = os.environ.get('VIP_API_KEY') + +PIPELINE_ID = "LCModel/0.2" +LAUNCH_EXECUTION = True + +# 3. Pipeline parameters (to edit) +CONTROL_FOLDER_PATH = './tests_slim/inputs/controls' +SIGNAL_FOLDER_PATH = './tests_slim/inputs/signals' +MAKE_BASIS_FILE_PATH = './tests_slim/inputs/makeBasis_3T_Mac_VIP.in' +ZIPPED_FOLDER_FILE_PATH = './tests_slim/inputs/basis.zip' +INPUT_FOLDER_PATH = './tests_slim/inputs' + +OUTPUT_FOLDER_PATH = './tests_slim/outputs/data' + +# 4. Results extraction folder +EXTRACTION_FOLDER = './tests_slim/outputs/extracted_files' +os.makedirs(EXTRACTION_FOLDER, exist_ok=True) + +# 5. Execution parameters (to edit) +MAX_CONCURRENT_WORKFLOWS = 3 # Number of concurrent workflows (probably max 3 for now) +REFRESH_TIME = 10 # Workflow status check interval in seconds +MAX_WAIT_TIME = 3600 # Maximum wait time for running sessions + +# 6. VIP initialization and global upload of inputs +session_0 = None +if LAUNCH_EXECUTION: + # Get all .control and .signal files (per database) + control_files_list = os.listdir(CONTROL_FOLDER_PATH) + signal_database_list = os.listdir(SIGNAL_FOLDER_PATH) + + print(f"Initializing VIP and uploading inputs for {len(control_files_list)} sessions...") + + # Upload the zero session (containing all inputs) + session_0 = VipSession.init( + api_key=VIP_API_KEY, + session_name="Session-Zero-LCModel", + input_dir=INPUT_FOLDER_PATH + ).upload_inputs() + + print("Global upload completed!") + +if LAUNCH_EXECUTION: + print(f"Preparing {len(control_files_list) * len(signal_database_list)} sessions") + + # Phase 1: Create executions + all_sessions = [] + + for control_file, signal_database in [(cf, sd) for cf in control_files_list for sd in signal_database_list]: + signal_files_list = [f for f in os.listdir(f"{SIGNAL_FOLDER_PATH}/{signal_database}") if f.endswith('.RAW')] + try: + db_name = signal_database.split("_")[0] + session, session_name, control_file_ref = create_session_for_control_file( + control_file, db_name, signal_files_list, CONTROL_FOLDER_PATH, + SIGNAL_FOLDER_PATH, ZIPPED_FOLDER_FILE_PATH, MAKE_BASIS_FILE_PATH, + PIPELINE_ID, OUTPUT_FOLDER_PATH, session_0 + ) + all_sessions.append(session) + print(f"Creating session for control file: {control_file} with signals from {signal_database}") + + except Exception as e: + print(f"ERROR: Failed to create session for {control_file}: {e}") + + print(f"Created {len(all_sessions)} sessions successfully") + + # Phase 2: Manage executions + if all_sessions: + final_status = manage_concurrent_workflows( + all_sessions, + max_concurrent=MAX_CONCURRENT_WORKFLOWS, + refresh_time=REFRESH_TIME, + max_wait_time=MAX_WAIT_TIME + ) + + # Phase 3: Download results + print("\nDownloading results...") + successful_downloads = 0 + + for session in all_sessions: + session_name = session.session_name + status = final_status.get(session_name, "Unknown") + + if status == "Finished": + try: + session.download_outputs(get_status=["Finished"], unzip=True) + successful_downloads += 1 + print(f" Downloaded: {session_name}") + except Exception as e: + print(f" ERROR: Download failed for {session_name}: {e}") + + # Summary + print("\nFinal Summary:") + print(f" Sessions created: {len(all_sessions)}") + print(f" Successfully completed: {sum(1 for s in final_status.values() if s == 'Finished')}") + print(f" Failed: {sum(1 for s in final_status.values() if s == 'Failed')}") + print(f" Downloaded: {successful_downloads}") + + # Cleanup: Delete temporary data from VIP. + # WARNING: if the script is interrupted before this step, you will need to manually delete outputs produced on VIP. + cleanup_vip_sessions(session_0, all_sessions) + +else: + print("LAUNCH_EXECUTION is set to False, skipping pipeline execution") + + + + +# 7. Retrieve output folders +if os.path.exists(OUTPUT_FOLDER_PATH): + folders = os.listdir(OUTPUT_FOLDER_PATH) + print(f"Processing output directory: {OUTPUT_FOLDER_PATH}") + dkntmn_folders = [f for f in folders if os.path.isdir(Path(OUTPUT_FOLDER_PATH) / f)] + + output_path = Path(OUTPUT_FOLDER_PATH) + + # 8. Extract .table files + extracted_count = 0 + for folder in dkntmn_folders: + folder_path = output_path / folder + + if not folder_path.exists(): + continue + + sub_folders = [d for d in os.listdir(folder_path) if os.path.isdir(folder_path / d)] + if not sub_folders: + continue + + # To ignore timestamps that wrap simulations (change if studying reproducibility with the same parameters) + sub = sub_folders[0] + sub_path = folder_path / sub + + # Search for .tgz folders containing results + items_in_sub = os.listdir(sub_path) + tgz_folders = [f for f in items_in_sub if f.endswith('.tgz') and os.path.isdir(sub_path / f)] + + for item in tgz_folders: + item_path = sub_path / item + table_path = item_path / 'result.table' + dest_folder = Path(EXTRACTION_FOLDER) / folder + + if table_path.exists(): + os.makedirs(dest_folder, exist_ok=True) + + identifier = item.split('.')[0] + '.table' + new_table_path = dest_folder / identifier + + # Copy the .table into the extraction folder + try: + shutil.copy(table_path, new_table_path) + extracted_count += 1 + + except Exception as e: + print(f"ERROR during copy: {e}") + + print(f"Extracted {extracted_count} result files to: {EXTRACTION_FOLDER}") + + # Check final + if os.path.exists(EXTRACTION_FOLDER): + total_files = sum(len(files) for _, _, files in os.walk(EXTRACTION_FOLDER)) + print(f"Total files in extraction folder: {total_files}") + else: + print("Extraction folder does not exist or is empty") + +else: + print(f"Output directory {OUTPUT_FOLDER_PATH} does not exist. Skipping file extraction.") diff --git a/examples/lcmodel/script_for_local_data/session_utils.py b/examples/lcmodel/script_for_local_data/session_utils.py new file mode 100644 index 0000000..a008954 --- /dev/null +++ b/examples/lcmodel/script_for_local_data/session_utils.py @@ -0,0 +1,194 @@ +""" +Utilities for VIP session management and workflow execution. +Contains helper functions for creating, launching, and monitoring LCModel sessions. +""" + +import os +import time +import sys +from io import StringIO +from vip_client.classes import VipSession + + +def create_session_for_control_file(control_file, db_name, signal_files_list, control_folder_path, + signal_folder_path, zipped_folder_file_path, + make_basis_file_path, pipeline_id, output_folder_path, + session_0): + """Creates and configures a VIP session for a given control file using pre-uploaded inputs""" + # Individual settings for this specific session (referencing already uploaded files) + input_settings = { + "signal_file": [signal_folder_path + "/" + db_name + "/" + signal_file for signal_file in signal_files_list], + "zipped_folder": zipped_folder_file_path, + "makebasis_file": make_basis_file_path, + "control_file": control_folder_path + "/" + control_file, + } + + print(control_file) + session_name = "LCMODEL_DKNTMN_" + control_file.split("_")[-1].split(".")[0] + "_" + db_name + + os.makedirs(output_folder_path + '/' + session_name, exist_ok=True) + + session = VipSession( + session_name=session_name, + pipeline_id=pipeline_id, + input_settings=input_settings, + output_dir=output_folder_path + '/' + session_name, + ) + + # Access the inputs of Session-Zero + session.get_inputs(session_0) + + return session, session_name, control_file + + +def launch_session(session): + """Launches the pipeline for a session using pre-uploaded inputs""" + try: + session.launch_pipeline(nb_runs=1) + return True + except Exception as e: + print(f" ERROR: Failed to launch {session.session_name}: {e}") + return False + + +def check_session_status(session): + """ + Check the current status of a session by examining its workflows + Returns: "Running", "Finished", "Failed", or "Unknown" + """ + try: + if not hasattr(session, 'workflows') or not session.workflows: + return "Queued" + + # Update workflow status from VIP + session._update_workflows() + + # Check if any workflows are still running + running_count = 0 + finished_count = 0 + failed_count = 0 + + for workflow_id, workflow_info in session.workflows.items(): + status = workflow_info.get("status", "Unknown") + if status == "Running": + running_count += 1 + elif status == "Finished": + finished_count += 1 + elif status in ["Failed", "Error"]: + failed_count += 1 + + # Determine overall session status + if failed_count > 0: + return "Failed" + elif running_count > 0: + return "Running" + elif finished_count > 0 and running_count == 0: + return "Finished" + else: + return "Unknown" + + except Exception as e: + print(f" Warning: Status check failed for {session.session_name}: {e}") + return "Unknown" + + +def manage_concurrent_workflows(all_sessions, max_concurrent=3, refresh_time=30, max_wait_time=3600): + """ + Manages execution of sessions with a maximum number of concurrent workflows + Uses a simple polling approach based on the VIP library source code + + Args: + all_sessions: List of all sessions to execute + max_concurrent: Maximum number of simultaneous workflows + refresh_time: Time between status checks in seconds + max_wait_time: Maximum total wait time in seconds + + Returns: + Dict with final status of each session + """ + print(f"Managing {len(all_sessions)} sessions with max {max_concurrent} concurrent workflows") + + # Initialize queues and tracking + pending_sessions = all_sessions.copy() + active_sessions = [] + completed_sessions = {} + + start_time = time.time() + check_count = 0 + + while pending_sessions or active_sessions: + current_time = time.time() + elapsed_time = current_time - start_time + check_count += 1 + + print(f"\n--- Check #{check_count} ({elapsed_time:.0f}s) | Pending: {len(pending_sessions)}, Active: {len(active_sessions)}, Completed: {len(completed_sessions)} ---") + + # Check timeout + if elapsed_time > max_wait_time: + print(f"TIMEOUT: Maximum wait time ({max_wait_time}s) reached.") + for session in active_sessions + pending_sessions: + if session.session_name not in completed_sessions: + completed_sessions[session.session_name] = "Timeout" + break + + # Check status of active sessions + newly_completed = [] + for session in active_sessions[:]: + status = check_session_status(session) + + if status in ["Finished", "Failed"]: + completed_sessions[session.session_name] = status + active_sessions.remove(session) + newly_completed.append(session) + print(f" {session.session_name}: {status}") + elif status == "Unknown": + print(f" {session.session_name}: Status unknown, treating as still running") + + # Launch new sessions if slots are available + available_slots = max_concurrent - len(active_sessions) + sessions_to_launch = min(available_slots, len(pending_sessions)) + + if sessions_to_launch > 0 and pending_sessions: + for _ in range(sessions_to_launch): + if pending_sessions: + session = pending_sessions.pop(0) + if launch_session(session): + active_sessions.append(session) + print(f" Launched: {session.session_name}") + else: + completed_sessions[session.session_name] = "Failed" + print(f" Failed to launch: {session.session_name}") + + # Show completed sessions this cycle + if newly_completed: + print(f" Completed: {[s.session_name for s in newly_completed]}") + + # Wait before next check if there are still active or pending sessions + if active_sessions or pending_sessions: + print(f" Waiting {refresh_time} seconds before next check...") + time.sleep(refresh_time) + else: + break + + total_time = time.time() - start_time + print(f"\nAll sessions completed in {total_time:.0f}s") + return completed_sessions + + +def cleanup_vip_sessions(session_0, all_sessions): + """Clean up VIP data for session_0 and all individual sessions""" + print("\nCleaning up VIP data...") + try: + session_0.finish() # removes the INPUT data for all sessions + print(" Session-Zero cleaned up") + + for session in all_sessions: + try: + VipSession(session.session_name).finish() # removes the OUTPUT data for current session + except: + pass + print(" Individual sessions cleaned up") + return True + except Exception as e: + print(f" Warning: Cleanup failed: {e}") + return False