From 6ac26f7aad2d226156fad6e750e7c6f44e4062be Mon Sep 17 00:00:00 2001 From: Matt Mallory Date: Thu, 5 Jun 2025 23:49:13 -0700 Subject: [PATCH 1/3] fix: tolerate whitespaces and special char in filenames --- .../projection_matrix_from_swc_directory.py | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py b/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py index 1c9e4b1..bf43e39 100644 --- a/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py +++ b/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py @@ -88,17 +88,17 @@ def main(ccf_swc_directory, results.append(res) else: - this_output_projection_csv = os.path.join(single_sp_proj_dir, swc_fn.replace(".swc",".csv")) + this_output_projection_csv_check = os.path.join(single_sp_proj_dir, swc_fn.replace(".swc",f"_{mask_method}.csv")) - if not os.path.exists(this_output_projection_csv): + if not os.path.exists(this_output_projection_csv_check): job_file = os.path.join(job_dir,swc_fn.replace(".swc",".sh")) log_file = os.path.join(job_dir,swc_fn.replace(".swc",".log")) command = "morph_utils_extract_projection_matrix_single_cell " command = command+ f" --input_swc_file '{swc_pth}'" - command = command+ f" --output_projection_csv {this_output_projection_csv}" + command = command+ f" --output_projection_csv '{this_output_projection_csv}'" command = command+ f" --projection_threshold {projection_threshold}" command = command+ f" --normalize_proj_mat {normalize_proj_mat}" command = command+ f" --mask_method {mask_method}" @@ -114,7 +114,7 @@ def main(ccf_swc_directory, command_list = [activate_command, command] slurm_kwargs = { - "--job-name": f"{swc_fn}", + "--job-name": f"'{swc_fn}'", "--mail-type": "NONE", "--cpus-per-task": "1", "--nodes": "1", @@ -122,7 +122,7 @@ def main(ccf_swc_directory, "--mem": "24gb", "--time": "1:00:00", "--partition": "celltypes", - "--output": log_file + "--output": f"'{log_file}'" } dag_node = { @@ -147,15 +147,20 @@ def main(ccf_swc_directory, job_f.write(val) job_f.write('\n') - - command = "sbatch {}".format(job_file) - command_list = command.split(" ") - result = subprocess.run(command_list, stdout=subprocess.PIPE) + # print("Going to submit this job file") + # print(job_file) + command_list = ["sbatch", job_file] + result = subprocess.run(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + std_out = result.stdout.decode('utf-8') + std_err = result.stderr.decode('utf-8') - job_id = std_out.split("Submitted batch job ")[-1].replace("\n", "") - single_cell_job_ids.append(job_id) - # time.sleep(0.1) + if result.returncode != 0: + print("Error submitting job:", std_err) + else: + job_id = std_out.split("Submitted batch job ")[-1].strip() + single_cell_job_ids.append(job_id) + if run_host!='local': # aggregate single projection files into proj mat @@ -206,18 +211,19 @@ def main(ccf_swc_directory, job_f.write(val) job_f.write('\n') - command = "sbatch --dependency=afterany" - for p_jid in single_cell_job_ids: - command = command + f":{p_jid}" - command = command + " {}".format(job_file) - command_list = command.split(" ") - # print(command) - result = subprocess.run(command_list, stdout=subprocess.PIPE) + dependency_str = f"afterany:{':'.join(single_cell_job_ids)}" + command_list = ["sbatch", f"--dependency={dependency_str}", job_file] + result = subprocess.run(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) std_out = result.stdout.decode('utf-8') + std_err = result.stderr.decode('utf-8') + + if result.returncode != 0: + print("Error submitting dependent job:", std_err) + else: + job_id = std_out.split("Submitted batch job ")[-1].strip() + print(f"Submitted aggregation job ID: {job_id}") + - job_id = std_out.split("Submitted batch job ")[-1].replace("\n", "") - - if results != []: output_projection_csv = output_projection_csv.replace(".csv", f"_{mask_method}.csv") From 34cc47550ed3433da3c24b09b2c2fe518c5f35a0 Mon Sep 17 00:00:00 2001 From: Matt Mallory Date: Thu, 5 Jun 2025 23:49:38 -0700 Subject: [PATCH 2/3] add: roll up to proj mat agg --- .../aggregate_single_cell_projection_csvs.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py b/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py index 944670b..b59ba57 100644 --- a/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py +++ b/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py @@ -2,7 +2,10 @@ from tqdm import tqdm import pandas as pd import argschema as ags +import numpy as np from morph_utils.ccf import projection_matrix_for_swc +from morph_utils.ccf import STRUCTURE_DESCENDANTS_ACRONYM + class IO_Schema(ags.ArgSchema): output_directory = ags.fields.OutputDir(description="output directory") @@ -12,6 +15,68 @@ class IO_Schema(ags.ArgSchema): normalize_proj_mat = ags.fields.Boolean(default=True) +def de_layer(st): + CTX_STRUCTS = STRUCTURE_DESCENDANTS_ACRONYM['CTX'] + sub_st = st.replace("ipsi_","").replace("contra","") + if sub_st in CTX_STRUCTS: + + for l in ["1","2/3","4","5","6a","6b"]: + st = st.replace(l,"") + + if "ENT" in st: + for l in ["2", "3","6"]: + st = st.replace(l,"") + + return st + else: + return st + + +def roll_up_proj_mat(infile, outfile): + + df = pd.read_csv(infile, index_col=0) + df.index = df.index.map(os.path.basename) + + non_proj_cols = [f for f in df.columns if not any([i in f for i in ["ipsi","contra"]])] + new_df = df[non_proj_cols].copy() + + proj_cols = [f for f in df.columns if any([i in f for i in ["ipsi","contra"]])] + de_layer_dict = {p:de_layer(p) for p in proj_cols} + + parent_names = list(de_layer_dict.values()) + unique_parent_names = np.unique(parent_names) + unique_parent_names = sorted(unique_parent_names, key=lambda x:parent_names.index(x)) + + roll_up_records = {} + for low_res_struct in unique_parent_names: + children = [k for k,v in de_layer_dict.items() if v==low_res_struct ] + roll_up_records[low_res_struct] = children + + + + # for parent, child_list in roll_up_records.items(): + # new_df[parent] = df[child_list].sum(axis=1) + new_cols = { + parent: df[child_list].sum(axis=1) + for parent, child_list in roll_up_records.items() + } + new_cols_df = pd.DataFrame(new_cols) + new_df = pd.concat([new_df, new_cols_df], axis=1) + + # sanity check + for n_struct,old_list in roll_up_records.items(): + sum_old = df[old_list].sum(axis=1) + sum_new = new_df[n_struct] + assert sum(sum_old==sum_new) == len(df) + + + + # print(outfile) + # print() + assert os.path.abspath(outfile) != os.path.abspath(infile) + new_df.to_csv(outfile) + + def normalize_projection_columns_per_cell(input_df, projection_column_identifiers=['ipsi', 'contra']): """ :param input_df: input projection df @@ -54,6 +119,7 @@ def main(output_directory, # proj_df_mask = pd.DataFrame(branch_and_tip_projection_records).T.fillna(0) proj_df.to_csv(output_projection_csv) + roll_up_proj_mat(infile=output_projection_csv, outfile=output_projection_csv.replace(".csv",'_rollup.csv')) # proj_df_mask.to_csv(output_projection_csv_tip_branch_mask) if projection_threshold != 0: @@ -79,6 +145,7 @@ def main(output_directory, proj_df = normalize_projection_columns_per_cell(proj_df) proj_df.to_csv(output_projection_csv) + roll_up_proj_mat(infile=output_projection_csv, outfile=output_projection_csv.replace(".csv",'_rollup.csv')) # proj_df_mask = normalize_projection_columns_per_cell(proj_df_mask) # proj_df_mask.to_csv(output_projection_csv_tip_branch_mask) From e3f1de5f96cf9ce5a75431ca7906165e07517524 Mon Sep 17 00:00:00 2001 From: Matt Mallory Date: Thu, 5 Jun 2025 23:57:20 -0700 Subject: [PATCH 3/3] fix: local and hpc proj mat runs roll-up --- .../aggregate_single_cell_projection_csvs.py | 81 +------------------ .../projection_matrix_from_swc_directory.py | 5 ++ morph_utils/proj_mat_utils.py | 0 3 files changed, 7 insertions(+), 79 deletions(-) create mode 100644 morph_utils/proj_mat_utils.py diff --git a/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py b/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py index b59ba57..16aeb18 100644 --- a/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py +++ b/morph_utils/executable_scripts/aggregate_single_cell_projection_csvs.py @@ -3,8 +3,7 @@ import pandas as pd import argschema as ags import numpy as np -from morph_utils.ccf import projection_matrix_for_swc -from morph_utils.ccf import STRUCTURE_DESCENDANTS_ACRONYM +from morph_utils.proj_mat_utils import roll_up_proj_mat,normalize_projection_columns_per_cell class IO_Schema(ags.ArgSchema): @@ -15,83 +14,6 @@ class IO_Schema(ags.ArgSchema): normalize_proj_mat = ags.fields.Boolean(default=True) -def de_layer(st): - CTX_STRUCTS = STRUCTURE_DESCENDANTS_ACRONYM['CTX'] - sub_st = st.replace("ipsi_","").replace("contra","") - if sub_st in CTX_STRUCTS: - - for l in ["1","2/3","4","5","6a","6b"]: - st = st.replace(l,"") - - if "ENT" in st: - for l in ["2", "3","6"]: - st = st.replace(l,"") - - return st - else: - return st - - -def roll_up_proj_mat(infile, outfile): - - df = pd.read_csv(infile, index_col=0) - df.index = df.index.map(os.path.basename) - - non_proj_cols = [f for f in df.columns if not any([i in f for i in ["ipsi","contra"]])] - new_df = df[non_proj_cols].copy() - - proj_cols = [f for f in df.columns if any([i in f for i in ["ipsi","contra"]])] - de_layer_dict = {p:de_layer(p) for p in proj_cols} - - parent_names = list(de_layer_dict.values()) - unique_parent_names = np.unique(parent_names) - unique_parent_names = sorted(unique_parent_names, key=lambda x:parent_names.index(x)) - - roll_up_records = {} - for low_res_struct in unique_parent_names: - children = [k for k,v in de_layer_dict.items() if v==low_res_struct ] - roll_up_records[low_res_struct] = children - - - - # for parent, child_list in roll_up_records.items(): - # new_df[parent] = df[child_list].sum(axis=1) - new_cols = { - parent: df[child_list].sum(axis=1) - for parent, child_list in roll_up_records.items() - } - new_cols_df = pd.DataFrame(new_cols) - new_df = pd.concat([new_df, new_cols_df], axis=1) - - # sanity check - for n_struct,old_list in roll_up_records.items(): - sum_old = df[old_list].sum(axis=1) - sum_new = new_df[n_struct] - assert sum(sum_old==sum_new) == len(df) - - - - # print(outfile) - # print() - assert os.path.abspath(outfile) != os.path.abspath(infile) - new_df.to_csv(outfile) - - -def normalize_projection_columns_per_cell(input_df, projection_column_identifiers=['ipsi', 'contra']): - """ - :param input_df: input projection df - :param projection_column_identifiers: list of identifiers for projection columns. i.e. strings that identify projection columns from metadata columns - :return: normalized projection matrix - """ - proj_cols = [c for c in input_df.columns if any([ider in c for ider in projection_column_identifiers])] - input_df[proj_cols] = input_df[proj_cols].fillna(0) - - res = input_df[proj_cols].T / input_df[proj_cols].sum(axis=1) - input_df[proj_cols] = res.T - - return input_df - - def main(output_directory, output_projection_csv, projection_threshold, @@ -133,6 +55,7 @@ def main(output_directory, proj_df_arr[proj_df_arr < projection_threshold] = 0 proj_df = pd.DataFrame(proj_df_arr, columns=proj_df.columns, index=proj_df.index) proj_df.to_csv(output_projection_csv) + roll_up_proj_mat(output_projection_csv, output_projection_csv.replace(".csv","_rollup.csv")) # proj_df_mask_arr = proj_df_mask.values # proj_df_mask_arr[proj_df_mask_arr < projection_threshold] = 0 diff --git a/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py b/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py index bf43e39..683bcdd 100644 --- a/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py +++ b/morph_utils/executable_scripts/projection_matrix_from_swc_directory.py @@ -5,6 +5,7 @@ import time import subprocess from morph_utils.ccf import projection_matrix_for_swc +from morph_utils.proj_mat_utils import roll_up_proj_mat class IO_Schema(ags.ArgSchema): @@ -241,6 +242,7 @@ def main(ccf_swc_directory, # proj_df_mask = pd.DataFrame(branch_and_tip_projection_records).T.fillna(0) proj_df.to_csv(output_projection_csv) + roll_up_proj_mat(output_projection_csv, output_projection_csv.replace(".csv","_rollup.csv")) # proj_df_mask.to_csv(output_projection_csv_tip_branch_mask) if projection_threshold != 0: @@ -254,6 +256,8 @@ def main(ccf_swc_directory, proj_df_arr[proj_df_arr < projection_threshold] = 0 proj_df = pd.DataFrame(proj_df_arr, columns=proj_df.columns, index=proj_df.index) proj_df.to_csv(output_projection_csv) + roll_up_proj_mat(output_projection_csv, output_projection_csv.replace(".csv","_rollup.csv")) + # proj_df_mask_arr = proj_df_mask.values # proj_df_mask_arr[proj_df_mask_arr < projection_threshold] = 0 @@ -266,6 +270,7 @@ def main(ccf_swc_directory, proj_df = normalize_projection_columns_per_cell(proj_df) proj_df.to_csv(output_projection_csv) + roll_up_proj_mat(output_projection_csv, output_projection_csv.replace(".csv","_rollup.csv")) # proj_df_mask = normalize_projection_columns_per_cell(proj_df_mask) # proj_df_mask.to_csv(output_projection_csv_tip_branch_mask) diff --git a/morph_utils/proj_mat_utils.py b/morph_utils/proj_mat_utils.py new file mode 100644 index 0000000..e69de29