From 230831924c26188870e846ad1b54627374c8995c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:20:25 -0800 Subject: [PATCH 01/13] Updating ALCF endpoints to include the synaps-i allocation (to be set up) --- config.yml | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/config.yml b/config.yml index 3f26a4f..0d24832 100644 --- a/config.yml +++ b/config.yml @@ -72,17 +72,23 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_raw: + alcf832_synaps: + root_path: / + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps + + alcf832_iri_raw: root_path: /data/raw uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_raw + name: alcf_iri_raw - alcf832_scratch: + alcf832_iri_scratch: root_path: /data/scratch uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_scratch + name: alcf_iri_scratch alcf_eagle832: root_path: /IRIBeta/als/example From dc330441b76be53ca5f76c65760a7b20199ca25f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:21:44 -0800 Subject: [PATCH 02/13] Updating bl832 config.py to distinguish IRI and SYNAPS-I ALCF endpoints --- orchestration/flows/bl832/config.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 788eef4..7294b0a 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,8 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] - self.scicat = self.config["scicat"] - self.ghcr_images832 = self.config["ghcr_images832"] + self.alcf832_synaps = self.endpoints["alcf832_synaps"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] + self.scicat = config["scicat"] + self.ghcr_images832 = config["ghcr_images832"] From 99e87418d877b7a8541fab1427b69c8eb1241ff7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:22:11 -0800 Subject: [PATCH 03/13] Adding the config.yaml file for setting up the globus compute endpoint for reconstruction on ALCF --- .../polaris/globus_compute_recon_config.yaml | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 scripts/polaris/globus_compute_recon_config.yaml diff --git a/scripts/polaris/globus_compute_recon_config.yaml b/scripts/polaris/globus_compute_recon_config.yaml new file mode 100644 index 0000000..66ffd33 --- /dev/null +++ b/scripts/polaris/globus_compute_recon_config.yaml @@ -0,0 +1,39 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time From 523fc1938efda1e0b91d6823cad2bf91bb276df7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:22:54 -0800 Subject: [PATCH 04/13] Adding the config.yaml file for setting up the globus compute endpoint for segmentation on ALCF. Still needs to be configured for GPU and the environment with dependencies --- .../globus_compute_segment_config.yaml | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 scripts/polaris/globus_compute_segment_config.yaml diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml new file mode 100644 index 0000000..07bced0 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -0,0 +1,41 @@ +# This needs to be updated to use GPUs and a segmentation environment + +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time From 951b9d92c5c1a67b369ff896f1e8c4bb63c02118 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:23:53 -0800 Subject: [PATCH 05/13] Adding segmentation Prefect task, and segmentation globus compute code for the TomographyController. Turning off TIFF to ZARR on ALCF for the demo --- orchestration/flows/bl832/alcf.py | 195 +++++++++++++++++++++++++----- 1 file changed, 166 insertions(+), 29 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac..c012698 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -35,7 +35,7 @@ def __init__( # The block must be registered with the name "alcf-allocation-root-path" logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) - self.allocation_root = allocation_data.get("alcf-allocation-root-path") + self.allocation_root = allocation_data.get("alcf-allocation-root-path") # eagle/SYNAPS-I/ if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") logger.info(f"Allocation root loaded: {self.allocation_root}") @@ -57,17 +57,19 @@ def reconstruct( file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name - iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" - iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py" + rundir = f"{self.allocation_root}/data/bl832/raw" + recon_script = f"{self.allocation_root}/reconstruction/scripts/globus_reconstruction.py" gcc = Client(code_serialization_strategy=CombinedCode()) + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") future = fxe.submit( self._reconstruct_wrapper, - iri_als_bl832_rundir, - iri_als_bl832_recon_script, + rundir, + recon_script, file_name, folder_name ) @@ -76,8 +78,8 @@ def reconstruct( @staticmethod def _reconstruct_wrapper( - rundir: str = "/eagle/IRIProd/ALS/data/raw", - script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py", + rundir: str = "/eagle/SYNAPS-I/data/bl832/raw", + script_path: str = "/eagle/SYNAPS-I/reconstruction/scripts/globus_reconstruction.py", h5_file_name: str = None, folder_path: str = None ) -> str: @@ -185,6 +187,101 @@ def _build_multi_resolution_wrapper( f"Converted tiff files to zarr;\n {zarr_res}" ) + def segmentation( + self, + folder_path: str = "", + ) -> bool: + """ + Run tomography segmentation at ALCF through Globus Compute. + + :param folder_path: Path to the TIFF folder to be processed. + + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + # Operate on reconstructed data + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{Path(folder_path).name}" + output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{Path(folder_path).name}" + segmentation_script = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py" + + gcc = Client(code_serialization_strategy=CombinedCode()) + + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation + with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + logger.info(f"Running segmentation on {folder_path} at ALCF") + future = fxe.submit( + self._segmentation_wrapper, + input_dir=rundir, + output_dir=output_dir, + script_path=segmentation_script, + output_dir=folder_path, + ) + result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) + return result + + @staticmethod + def _segmentation_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", + script_path: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py", + nproc_per_node: int = 4, + nnodes: int = 1, + nnode_rank: int = 0, + master_addr: str = "localhost", + master_port: str = "29500", + patch_size: int = 512, + batch_size: int = 1, + num_workers: int = 4, + confidence: float = 0.5, + prompts: list[str] = ["background", "cell"], + ) -> str: + """ + Python function that wraps around the application call for segmentation on ALCF + + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the segmentation + :param folder_path: the path to the folder containing the TIFF data to be segmented + :return: confirmation message + """ + import os + import subprocess + import time + + seg_start = time.time() + + # Move to directory where data are located + os.chdir(input_dir) + + # Run segmentation.py + command = [ + "torchrun", + f"--nproc_per_node={nproc_per_node}", + f"--nnodes={nnodes}", + f"--node_rank={nnode_rank}", + f"--master_addr={master_addr}", + f"--master_port={master_port}", + "-m", script_path, + "--input-dir", input_dir, + "--output-dir", output_dir, + "--patch-size", str(patch_size), + "--batch-size", str(batch_size), + "--num-workers", str(num_workers), + "--confidence", str(confidence), + "--prompts", *prompts, + ] + + segment_res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + seg_end = time.time() + + print(f"Segmented data in {input_dir} in {seg_end-seg_start} seconds;\n {segment_res}") + return ( + f"Segmented data specified in {input_dir} in {seg_end-seg_start} seconds;\n" + f"{segment_res}" + ) + @staticmethod def _wait_for_globus_compute_future( future: Future, @@ -368,7 +465,7 @@ def alcf_recon_flow( config: Optional[Config832] = None, ) -> bool: """ - Process and transfer a file from a source to the ALCF. + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. Args: file_path (str): The path to the file to be processed. @@ -437,51 +534,91 @@ def alcf_recon_flow( destination=config.data832_scratch ) - # STEP 2B: Run the Tiff to Zarr Globus Flow - logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") - alcf_multi_res_success = tomography_controller.build_multi_resolution( - file_path=file_path, + # STEP 3: Run the Segmentation Task at ALCF + logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") + alcf_segmentation_success = alcf_segmentation_task( + recon_folder_path=scratch_path_tiff, + config=config ) - if not alcf_multi_res_success: - logger.error("Tiff to Zarr Failed.") - raise ValueError("Tiff to Zarr at ALCF Failed") + if not alcf_segmentation_success: + logger.warning("Segmentation at ALCF Failed") else: - logger.info("Tiff to Zarr Successful.") - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_zarr_transfer_success = transfer_controller.copy( - file_path=scratch_path_zarr, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) + logger.info("Segmentation at ALCF Successful") + + # Not running TIFF to Zarr conversion at ALCF for now + # STEP 2B: Run the Tiff to Zarr Globus Flow + # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") + # alcf_multi_res_success = tomography_controller.build_multi_resolution( + # file_path=file_path, + # ) + # if not alcf_multi_res_success: + # logger.error("Tiff to Zarr Failed.") + # raise ValueError("Tiff to Zarr at ALCF Failed") + # else: + # logger.info("Tiff to Zarr Successful.") + # # Transfer B: Send reconstructed data (zarr) to data832 + # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # f"at ALCF to {config.data832_scratch} at data832") + # data832_zarr_transfer_success = transfer_controller.copy( + # file_path=scratch_path_zarr, + # source=config.alcf832_scratch, + # destination=config.data832_scratch + # ) # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False - data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success + # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success schedule_pruning( alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, + # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, + # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr one_minute=False, # Set to False for production durations config=config ) # TODO: ingest to scicat - if alcf_reconstruction_success and alcf_multi_res_success: + if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: return True else: return False -if __name__ == "__main__": +@task(name="alcf_segmentation_task") +def alcf_segmentation_task( + recon_folder_path: str, + config: Optional[Config832] = None, +): + logger = get_run_logger() + if config is None: + logger.info("No config provided, using default Config832.") + config = Config832() + + # Initialize the Tomography Controller and run the segmentation + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF segmentation task for {recon_folder_path=}") + alcf_segmentation_success = tomography_controller.segmentation( + recon_folder_path=recon_folder_path, + ) + if not alcf_segmentation_success: + logger.error("Segmentation Failed.") + else: + logger.info("Segmentation Successful.") + return alcf_segmentation_success + + +@flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") +def alcf_segmentation_integration_test(): folder_name = 'dabramov' file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' flow_success = alcf_recon_flow( From dd155f7d21bd04987a03d3c806c16a43a623f167 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:30:02 -0800 Subject: [PATCH 06/13] ensuring self.config for scicat and ghcr images --- orchestration/flows/bl832/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 7294b0a..17a2afb 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,5 +27,5 @@ def _beam_specific_config(self) -> None: self.alcf832_synaps = self.endpoints["alcf832_synaps"] self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] - self.scicat = config["scicat"] - self.ghcr_images832 = config["ghcr_images832"] + self.scicat = self.config["scicat"] + self.ghcr_images832 = self.config["ghcr_images832"] \ No newline at end of file From 719d4c6b6f68fd3d122a04bfd85aab3cbd656df2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:30:39 -0800 Subject: [PATCH 07/13] linting --- orchestration/flows/bl832/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 17a2afb..da75341 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -28,4 +28,4 @@ def _beam_specific_config(self) -> None: self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] - self.ghcr_images832 = self.config["ghcr_images832"] \ No newline at end of file + self.ghcr_images832 = self.config["ghcr_images832"] From 05341fc85b25f4803b6fbee6206022ff39f166ff Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:57:25 -0800 Subject: [PATCH 08/13] Making separate ALCF SYNAPS-I endpoint configs for raw, reconstructed, and segmented data --- config.yml | 19 ++++++++++++++++--- orchestration/flows/bl832/alcf.py | 18 ++++++++++++------ orchestration/flows/bl832/config.py | 4 +++- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/config.yml b/config.yml index 0d24832..b4d29a5 100644 --- a/config.yml +++ b/config.yml @@ -46,6 +46,7 @@ globus: uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-beegfs-data + # 8.3.2 ENDPOINTS spot832: @@ -72,11 +73,23 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_synaps: - root_path: / + alcf832_synaps_raw: + root_path: /data/bl832/raw + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps_raw + + alcf832_synaps_recon: + root_path: /data/bl832/scratch/reconstruction/ + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps_recon + + alcf832_synaps_segment: + root_path: /data/bl832/scratch/segmentation/ uri: alcf.anl.gov uuid: TBD - name: alcf832_synaps + name: alcf832_synaps_segment alcf832_iri_raw: root_path: /data/raw diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index c012698..766ae21 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -195,7 +195,6 @@ def segmentation( Run tomography segmentation at ALCF through Globus Compute. :param folder_path: Path to the TIFF folder to be processed. - :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -484,6 +483,7 @@ def alcf_recon_flow( file_name = path.stem h5_file_name = file_name + '.h5' scratch_path_tiff = folder_name + '/rec' + file_name + '/' + scratch_path_segment = folder_name + '/seg' + file_name + '/' scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' # initialize transfer_controller with globus @@ -498,7 +498,7 @@ def alcf_recon_flow( alcf_transfer_success = transfer_controller.copy( file_path=data832_raw_path, source=config.data832_raw, - destination=config.alcf832_raw + destination=config.alcf832_synaps_raw ) logger.info(f"Transfer status: {alcf_transfer_success}") @@ -526,11 +526,11 @@ def alcf_recon_flow( logger.info("Reconstruction Successful.") # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_tiff_transfer_success = transfer_controller.copy( file_path=scratch_path_tiff, - source=config.alcf832_scratch, + source=config.alcf832_synaps_recon, destination=config.data832_scratch ) @@ -544,6 +544,12 @@ def alcf_recon_flow( logger.warning("Segmentation at ALCF Failed") else: logger.info("Segmentation at ALCF Successful") + segment_transfer_success = transfer_controller.copy( + file_path=scratch_path_segment, + source=config.alcf832_synaps_segment, + destination=config.data832_scratch + ) + logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") # Not running TIFF to Zarr conversion at ALCF for now # STEP 2B: Run the Tiff to Zarr Globus Flow @@ -621,8 +627,8 @@ def alcf_segmentation_task( def alcf_segmentation_integration_test(): folder_name = 'dabramov' file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' - flow_success = alcf_recon_flow( - file_path=f"/{folder_name}/{file_name}.h5", + flow_success = alcf_segmentation_task( + recon_folder_path=f"/{folder_name}/{file_name}", config=Config832() ) print(flow_success) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index da75341..d523952 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,9 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_synaps = self.endpoints["alcf832_synaps"] + self.alcf832_synaps_raw = self.endpoints["alcf832_synaps_raw"] + self.alcf832_synaps_recon = self.endpoints["alcf832_synaps_recon"] + self.alcf832_synaps_segment = self.endpoints["alcf832_synaps_segment"] self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] From d74137a76ca07d5cc83572b6faed7d61955fe156 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 11:53:46 -0800 Subject: [PATCH 09/13] Refactoring ALCF reconstruction flow to use the prune_controller class --- orchestration/flows/bl832/alcf.py | 356 +++++++++++++++++++----------- 1 file changed, 222 insertions(+), 134 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 766ae21..0794be9 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,5 +1,5 @@ from concurrent.futures import Future -import datetime +# import datetime from pathlib import Path import time from typing import Optional @@ -12,8 +12,9 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController +# from orchestration.prefect import schedule_prefect_flow +from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.transfer_controller import get_transfer_controller, CopyMethod -from orchestration.prefect import schedule_prefect_flow class ALCFTomographyHPCController(TomographyHPCController): @@ -189,33 +190,36 @@ def _build_multi_resolution_wrapper( def segmentation( self, - folder_path: str = "", + recon_folder_path: str = "", ) -> bool: """ Run tomography segmentation at ALCF through Globus Compute. - :param folder_path: Path to the TIFF folder to be processed. + :param recon_folder_path: Path to the reconstructed data folder to be processed. :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() # Operate on reconstructed data - rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{Path(folder_path).name}" - output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{Path(folder_path).name}" - segmentation_script = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py" + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{recon_folder_path}" + output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{recon_folder_path}" + segmentation_module = "src.inference" + workdir = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo" gcc = Client(code_serialization_strategy=CombinedCode()) # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID # We will probably have 2 endpoints, one for recon, one for segmentation - with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: - logger.info(f"Running segmentation on {folder_path} at ALCF") + endpoint_id = "168c595b-9493-42db-9c6a-aad960913de2" + # with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: + logger.info(f"Running segmentation on {recon_folder_path} at ALCF") future = fxe.submit( self._segmentation_wrapper, input_dir=rundir, output_dir=output_dir, - script_path=segmentation_script, - output_dir=folder_path, + script_module=segmentation_module, + workdir=workdir ) result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) return result @@ -224,7 +228,8 @@ def segmentation( def _segmentation_wrapper( input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", - script_path: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py", + script_module: str = "src.inference", + workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo", nproc_per_node: int = 4, nnodes: int = 1, nnode_rank: int = 0, @@ -250,18 +255,18 @@ def _segmentation_wrapper( seg_start = time.time() - # Move to directory where data are located - os.chdir(input_dir) + # Move to directory where the segmentation code is located + os.chdir(workdir) # Run segmentation.py command = [ - "torchrun", + "python", "-m", "torch.distributed.run", f"--nproc_per_node={nproc_per_node}", f"--nnodes={nnodes}", f"--node_rank={nnode_rank}", f"--master_addr={master_addr}", f"--master_port={master_port}", - "-m", script_path, + "-m", script_module, "--input-dir", input_dir, "--output-dir", output_dir, "--patch-size", str(patch_size), @@ -353,109 +358,109 @@ def _wait_for_globus_compute_future( return success -@task(name="schedule_prune_task") -def schedule_prune_task( - path: str, - location: str, - schedule_days: datetime.timedelta, - source_endpoint=None, - check_endpoint=None -) -> bool: - """ - Schedules a Prefect flow to prune files from a specified location. - - Args: - path (str): The file path to the folder containing the files. - location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. - schedule_days (int): The number of days after which the file should be deleted. - source_endpoint (str): The source endpoint for the files. - check_endpoint (str): The endpoint to check for the existence of the files. - - Returns: - bool: True if the task was scheduled successfully, False otherwise. - """ - logger = get_run_logger() - - try: - flow_name = f"delete {location}: {Path(path).name}" - schedule_prefect_flow( - deployment_name=f"prune_{location}/prune_{location}", - flow_run_name=flow_name, - parameters={ - "relative_path": path, - "source_endpoint": source_endpoint, - "check_endpoint": check_endpoint - }, - duration_from_now=schedule_days - ) - return True - except Exception as e: - logger.error(f"Failed to schedule prune task: {e}") - return False - - -@task(name="schedule_pruning") -def schedule_pruning( - alcf_raw_path: str = None, - alcf_scratch_path_tiff: str = None, - alcf_scratch_path_zarr: str = None, - nersc_scratch_path_tiff: str = None, - nersc_scratch_path_zarr: str = None, - data832_raw_path: str = None, - data832_scratch_path_tiff: str = None, - data832_scratch_path_zarr: str = None, - one_minute: bool = False, - config: Config832 = None -) -> bool: - """ - This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. - - Args: - alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. - alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. - alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. - nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. - nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. - data832_scratch_path (str, optional): The scratch path on data832. - one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. - config (Config832, optional): Configuration object for the flow. - - Returns: - bool: True if the tasks were scheduled successfully, False otherwise. - """ - logger = get_run_logger() - - pruning_config = Variable.get("pruning-config", _sync=True) - - if one_minute: - alcf_delay = datetime.timedelta(minutes=1) - nersc_delay = datetime.timedelta(minutes=1) - data832_delay = datetime.timedelta(minutes=1) - else: - alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) - nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) - data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - - # (path, location, days, source_endpoint, check_endpoint) - delete_schedules = [ - (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), - (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), - (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), - (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) - ] - - for path, location, days, source_endpoint, check_endpoint in delete_schedules: - if path: - schedule_prune_task(path, location, days, source_endpoint, check_endpoint) - logger.info(f"Scheduled delete from {location} at {days} days") - else: - logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") - - return True +# @task(name="schedule_prune_task") +# def schedule_prune_task( +# path: str, +# location: str, +# schedule_days: datetime.timedelta, +# source_endpoint=None, +# check_endpoint=None +# ) -> bool: +# """ +# Schedules a Prefect flow to prune files from a specified location. + +# Args: +# path (str): The file path to the folder containing the files. +# location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. +# schedule_days (int): The number of days after which the file should be deleted. +# source_endpoint (str): The source endpoint for the files. +# check_endpoint (str): The endpoint to check for the existence of the files. + +# Returns: +# bool: True if the task was scheduled successfully, False otherwise. +# """ +# logger = get_run_logger() + +# try: +# flow_name = f"delete {location}: {Path(path).name}" +# schedule_prefect_flow( +# deployment_name=f"prune_{location}/prune_{location}", +# flow_run_name=flow_name, +# parameters={ +# "relative_path": path, +# "source_endpoint": source_endpoint, +# "check_endpoint": check_endpoint +# }, +# duration_from_now=schedule_days +# ) +# return True +# except Exception as e: +# logger.error(f"Failed to schedule prune task: {e}") +# return False + + +# @task(name="schedule_pruning") +# def schedule_pruning( +# alcf_raw_path: str = None, +# alcf_scratch_path_tiff: str = None, +# alcf_scratch_path_zarr: str = None, +# nersc_scratch_path_tiff: str = None, +# nersc_scratch_path_zarr: str = None, +# data832_raw_path: str = None, +# data832_scratch_path_tiff: str = None, +# data832_scratch_path_zarr: str = None, +# one_minute: bool = False, +# config: Config832 = None +# ) -> bool: +# """ +# This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. + +# Args: +# alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. +# alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. +# alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. +# nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. +# nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. +# data832_scratch_path (str, optional): The scratch path on data832. +# one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. +# config (Config832, optional): Configuration object for the flow. + +# Returns: +# bool: True if the tasks were scheduled successfully, False otherwise. +# """ +# logger = get_run_logger() + +# pruning_config = Variable.get("pruning-config", _sync=True) + +# if one_minute: +# alcf_delay = datetime.timedelta(minutes=1) +# nersc_delay = datetime.timedelta(minutes=1) +# data832_delay = datetime.timedelta(minutes=1) +# else: +# alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) +# nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) +# data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) + +# # (path, location, days, source_endpoint, check_endpoint) +# delete_schedules = [ +# (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), +# (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), +# (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), +# (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), +# (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), +# (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), +# (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), +# (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) +# ] + +# for path, location, days, source_endpoint, check_endpoint in delete_schedules: +# if path: +# schedule_prune_task(path, location, days, source_endpoint, check_endpoint) +# logger.info(f"Scheduled delete from {location} at {days} days") +# else: +# logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") + +# return True @flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") @@ -533,6 +538,7 @@ def alcf_recon_flow( source=config.alcf832_synaps_recon, destination=config.data832_scratch ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") # STEP 3: Run the Segmentation Task at ALCF logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") @@ -552,6 +558,8 @@ def alcf_recon_flow( logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") # Not running TIFF to Zarr conversion at ALCF for now + alcf_multi_res_success = False + data832_zarr_transfer_success = False # STEP 2B: Run the Tiff to Zarr Globus Flow # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") # alcf_multi_res_success = tomography_controller.build_multi_resolution( @@ -572,22 +580,99 @@ def alcf_recon_flow( # ) # Place holder in case we want to transfer to NERSC for long term storage - nersc_transfer_success = False + # nersc_transfer_success = False - # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success - schedule_pruning( - alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now - nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, - nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, - data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr - one_minute=False, # Set to False for production durations + # STEP 4: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, config=config ) + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/segmentation + if alcf_segmentation_success: + logger.info("Scheduling pruning of ALCF scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.alcf832_synaps_segment, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune ZARR from ALCF scratch/reconstruction + if alcf_multi_res_success: + logger.info("Scheduling pruning of ALCF scratch zarr reconstruction data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune reconstructed ZARR from data832 scratch + if data832_zarr_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction ZARR data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune segmented data from data832 scratch + if alcf_segmentation_success: + logger.info("Scheduling pruning of data832 scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success + # schedule_pruning( + # alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, + # alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, + # # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now + # nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, + # nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, + # data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, + # data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, + # # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr + # one_minute=False, # Set to False for production durations + # config=config + # ) + # TODO: ingest to scicat if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: @@ -625,10 +710,13 @@ def alcf_segmentation_task( @flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") def alcf_segmentation_integration_test(): - folder_name = 'dabramov' - file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' + recon_folder_path = 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( - recon_folder_path=f"/{folder_name}/{file_name}", + recon_folder_path=recon_folder_path, config=Config832() ) print(flow_success) + + +if __name__ == "__main__": + alcf_segmentation_integration_test() From 18d470c68d189cb3c0cc204c3025ecf80dcd974c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 11:54:47 -0800 Subject: [PATCH 10/13] Removing old commented out prune code --- orchestration/flows/bl832/alcf.py | 121 ------------------------------ 1 file changed, 121 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 0794be9..086a366 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,5 +1,4 @@ from concurrent.futures import Future -# import datetime from pathlib import Path import time from typing import Optional @@ -12,7 +11,6 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController -# from orchestration.prefect import schedule_prefect_flow from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.transfer_controller import get_transfer_controller, CopyMethod @@ -358,111 +356,6 @@ def _wait_for_globus_compute_future( return success -# @task(name="schedule_prune_task") -# def schedule_prune_task( -# path: str, -# location: str, -# schedule_days: datetime.timedelta, -# source_endpoint=None, -# check_endpoint=None -# ) -> bool: -# """ -# Schedules a Prefect flow to prune files from a specified location. - -# Args: -# path (str): The file path to the folder containing the files. -# location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. -# schedule_days (int): The number of days after which the file should be deleted. -# source_endpoint (str): The source endpoint for the files. -# check_endpoint (str): The endpoint to check for the existence of the files. - -# Returns: -# bool: True if the task was scheduled successfully, False otherwise. -# """ -# logger = get_run_logger() - -# try: -# flow_name = f"delete {location}: {Path(path).name}" -# schedule_prefect_flow( -# deployment_name=f"prune_{location}/prune_{location}", -# flow_run_name=flow_name, -# parameters={ -# "relative_path": path, -# "source_endpoint": source_endpoint, -# "check_endpoint": check_endpoint -# }, -# duration_from_now=schedule_days -# ) -# return True -# except Exception as e: -# logger.error(f"Failed to schedule prune task: {e}") -# return False - - -# @task(name="schedule_pruning") -# def schedule_pruning( -# alcf_raw_path: str = None, -# alcf_scratch_path_tiff: str = None, -# alcf_scratch_path_zarr: str = None, -# nersc_scratch_path_tiff: str = None, -# nersc_scratch_path_zarr: str = None, -# data832_raw_path: str = None, -# data832_scratch_path_tiff: str = None, -# data832_scratch_path_zarr: str = None, -# one_minute: bool = False, -# config: Config832 = None -# ) -> bool: -# """ -# This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. - -# Args: -# alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. -# alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. -# alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. -# nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. -# nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. -# data832_scratch_path (str, optional): The scratch path on data832. -# one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. -# config (Config832, optional): Configuration object for the flow. - -# Returns: -# bool: True if the tasks were scheduled successfully, False otherwise. -# """ -# logger = get_run_logger() - -# pruning_config = Variable.get("pruning-config", _sync=True) - -# if one_minute: -# alcf_delay = datetime.timedelta(minutes=1) -# nersc_delay = datetime.timedelta(minutes=1) -# data832_delay = datetime.timedelta(minutes=1) -# else: -# alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) -# nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) -# data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - -# # (path, location, days, source_endpoint, check_endpoint) -# delete_schedules = [ -# (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), -# (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), -# (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), -# (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), -# (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), -# (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), -# (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), -# (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) -# ] - -# for path, location, days, source_endpoint, check_endpoint in delete_schedules: -# if path: -# schedule_prune_task(path, location, days, source_endpoint, check_endpoint) -# logger.info(f"Scheduled delete from {location} at {days} days") -# else: -# logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") - -# return True - - @flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") def alcf_recon_flow( file_path: str, @@ -659,20 +552,6 @@ def alcf_recon_flow( days_from_now=30.0 ) - # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success - # schedule_pruning( - # alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - # alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - # # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now - # nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, - # nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, - # data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - # data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - # # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr - # one_minute=False, # Set to False for production durations - # config=config - # ) - # TODO: ingest to scicat if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: From c1d7e2d52894a8db74aeb5778018c5ef168e3dd0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 11:58:37 -0800 Subject: [PATCH 11/13] linting and docstrings --- orchestration/flows/bl832/alcf.py | 69 ++++++++++++++++--------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 086a366..57874b3 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -21,14 +21,18 @@ class ALCFTomographyHPCController(TomographyHPCController): There is a @staticmethod wrapper for each compute task submitted via Globus Compute. Also, there is a shared wait_for_globus_compute_future method that waits for the task to complete. - Args: - TomographyHPCController (ABC): Abstract class for tomography HPC controllers. + :param TomographyHPCController: Abstract class for tomography HPC controllers. """ def __init__( self, config: Config832 ) -> None: + """ + Initialize the ALCF Tomography HPC Controller. + + :param config: Configuration object for the controller. + """ super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" @@ -46,11 +50,8 @@ def reconstruct( """ Run tomography reconstruction at ALCF through Globus Compute. - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path : Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() file_name = Path(file_path).stem + ".h5" @@ -85,14 +86,11 @@ def _reconstruct_wrapper( """ Python function that wraps around the application call for Tomopy reconstruction on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will run the reconstruction - h5_file_name (str): the name of the h5 file to be reconstructed - folder_path (str): the path to the folder containing the h5 file - - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the reconstruction + :param h5_file_name: the name of the h5 file to be reconstructed + :param folder_path: the path to the folder containing the h5 file + :return: confirmation message """ import os import subprocess @@ -123,11 +121,8 @@ def build_multi_resolution( """ Tiff to Zarr code that is executed using Globus Compute - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path: Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -294,14 +289,11 @@ def _wait_for_globus_compute_future( """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. - Args: - future: The future object returned from the Globus Compute Executor submit method. - task_name: A descriptive name for the task being executed (used for logging). - check_interval: The interval (in seconds) between status checks. - walltime: The maximum time (in seconds) to wait for the task to complete. - - Returns: - bool: True if the task completed successfully within walltime, False otherwise. + :param future: The future object returned from the Globus Compute Executor submit method. + :param task_name: A descriptive name for the task being executed (used for logging). + :param check_interval: The interval (in seconds) between status checks. + :param walltime: The maximum time (in seconds) to wait for the task to complete. + :return: True if the task completed successfully within walltime, False otherwise. """ logger = get_run_logger() @@ -364,12 +356,9 @@ def alcf_recon_flow( """ Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. - Args: - file_path (str): The path to the file to be processed. - config (Config832): Configuration object for the flow. - - Returns: - bool: True if the flow completed successfully, False otherwise. + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. """ logger = get_run_logger() @@ -565,6 +554,13 @@ def alcf_segmentation_task( recon_folder_path: str, config: Optional[Config832] = None, ): + """ + Run segmentation task at ALCF. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :param config: Configuration object for the flow. + :return: True if the task completed successfully, False otherwise. + """ logger = get_run_logger() if config is None: logger.info("No config provided, using default Config832.") @@ -589,6 +585,11 @@ def alcf_segmentation_task( @flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") def alcf_segmentation_integration_test(): + """ + Integration test for the ALCF segmentation task. + + :return: None + """ recon_folder_path = 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( recon_folder_path=recon_folder_path, From 8e0947f504babadd66ab46a17bf0504ab72c539a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 12:04:49 -0800 Subject: [PATCH 12/13] Docstrings, linting, and type hints --- orchestration/flows/bl832/alcf.py | 44 ++++++++++++++++++------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 57874b3..ecdb84a 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -159,13 +159,11 @@ def _build_multi_resolution_wrapper( """ Python function that wraps around the application call for Tiff to Zarr on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will convert the tiff files to zarr - recon_path (str): the path to the reconstructed data - raw_path (str): the path to the raw data - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will convert the tiff files to zarr + :param recon_path: the path to the reconstructed data + :param raw_path: the path to the raw data + :return: confirmation message """ import os import subprocess @@ -374,13 +372,14 @@ def alcf_recon_flow( scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) # STEP 1: Transfer data from data832 to ALCF - logger.info("Copying data to ALCF.") + logger.info("Copying raw data to ALCF.") data832_raw_path = f"{folder_name}/{h5_file_name}" alcf_transfer_success = transfer_controller.copy( file_path=data832_raw_path, @@ -395,14 +394,16 @@ def alcf_recon_flow( else: logger.info("Transfer to ALCF Successful.") - # STEP 2A: Run the Tomopy Reconstruction Globus Flow + # STEP 2: Run the Tomopy Reconstruction Globus Flow logger.info(f"Starting ALCF reconstruction flow for {file_path=}") # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") tomography_controller = get_controller( hpc_type=HPC.ALCF, config=config ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") alcf_reconstruction_success = tomography_controller.reconstruct( file_path=file_path, ) @@ -412,7 +413,7 @@ def alcf_recon_flow( else: logger.info("Reconstruction Successful.") - # Transfer A: Send reconstructed data (tiff) to data832 + # STEP 3: Send reconstructed data (tiff) to data832 logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_tiff_transfer_success = transfer_controller.copy( @@ -422,7 +423,7 @@ def alcf_recon_flow( ) logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") - # STEP 3: Run the Segmentation Task at ALCF + # STEP 4: Run the Segmentation Task at ALCF logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") alcf_segmentation_success = alcf_segmentation_task( recon_folder_path=scratch_path_tiff, @@ -432,6 +433,10 @@ def alcf_recon_flow( logger.warning("Segmentation at ALCF Failed") else: logger.info("Segmentation at ALCF Successful") + + # STEP 5: Send segmented data to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_segment} " + f"at ALCF to {config.data832_scratch} at data832") segment_transfer_success = transfer_controller.copy( file_path=scratch_path_segment, source=config.alcf832_synaps_segment, @@ -442,7 +447,7 @@ def alcf_recon_flow( # Not running TIFF to Zarr conversion at ALCF for now alcf_multi_res_success = False data832_zarr_transfer_success = False - # STEP 2B: Run the Tiff to Zarr Globus Flow + # STEP 6: Run the Tiff to Zarr Globus Flow # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") # alcf_multi_res_success = tomography_controller.build_multi_resolution( # file_path=file_path, @@ -452,7 +457,7 @@ def alcf_recon_flow( # raise ValueError("Tiff to Zarr at ALCF Failed") # else: # logger.info("Tiff to Zarr Successful.") - # # Transfer B: Send reconstructed data (zarr) to data832 + # # STEP 7: Send reconstructed data (zarr) to data832 # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " # f"at ALCF to {config.data832_scratch} at data832") # data832_zarr_transfer_success = transfer_controller.copy( @@ -464,7 +469,7 @@ def alcf_recon_flow( # Place holder in case we want to transfer to NERSC for long term storage # nersc_transfer_success = False - # STEP 4: Schedule Pruning of files + # STEP 8: Schedule Pruning of files logger.info("Scheduling file pruning tasks.") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, @@ -553,7 +558,7 @@ def alcf_recon_flow( def alcf_segmentation_task( recon_folder_path: str, config: Optional[Config832] = None, -): +) -> bool: """ Run segmentation task at ALCF. @@ -584,18 +589,21 @@ def alcf_segmentation_task( @flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") -def alcf_segmentation_integration_test(): +def alcf_segmentation_integration_test() -> bool: """ Integration test for the ALCF segmentation task. - :return: None + :return: True if the segmentation task completed successfully, False otherwise. """ + logger = get_run_logger() + logger.info("Starting ALCF segmentation integration test.") recon_folder_path = 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( recon_folder_path=recon_folder_path, config=Config832() ) - print(flow_success) + logger.info(f"Flow success: {flow_success}") + return flow_success if __name__ == "__main__": From 72f49f0fe7e4ce95c152384d7fc46bbcf7cdc041 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 12:05:42 -0800 Subject: [PATCH 13/13] Updating globus compute config for segmentation --- .../globus_compute_segment_config.yaml | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml index 07bced0..15f150e 100644 --- a/scripts/polaris/globus_compute_segment_config.yaml +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -1,9 +1,8 @@ -# This needs to be updated to use GPUs and a segmentation environment - engine: type: GlobusComputeEngine # This engine uses the HighThroughputExecutor max_retries_on_system_failure: 2 max_workers: 1 # Sets one worker per node + max_workers_per_node: 4 prefetch_capacity: 0 # Increase if you have many more tasks than workers address: @@ -25,16 +24,24 @@ engine: overrides: --depth=64 --ppn 1 account: SYNAPS-I - queue: debug - cpus_per_node: 64 + queue: debug # debug (1-2 nodes), debug-scaling (1-10 nodes), or some other queue, probably want demand (1-56 nodes) for real-time things, prod (496 nodes) + # minimum node 1, max 56 nodes. Max time 59 minutes + cpus_per_node: 32 # may want to change to 4 (only 4 GPUs per node) # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" scheduler_options: "#PBS -l filesystems=home:eagle" - # Node setup: activate necessary conda environment and such - worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" - - walltime: 00:60:00 # Jobs will end after 60 minutes + # worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/segmentation/env/; export PATH=$PATH:/eagle/SYNAPS-I/; cd $HOME/.globus_compute/globus_compute_segmentation" + worker_init: | + module use /soft/modulefiles + module load conda + conda activate base + source /eagle/SYNAPS-I/segmentation/env/bin/activate + export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + cd /eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo + + walltime: 59:00 # Jobs will end after 59 minutes nodes_per_block: 2 # All jobs will have 1 node init_blocks: 0 min_blocks: 0