From 67aaa3937910e55cfba220c1bff8e21541efba8d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 7 Oct 2025 11:50:19 -0700 Subject: [PATCH 1/8] Updating ALCF Globus UUID to be the IRIBeta collection --- config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config.yml b/config.yml index d0eec943..5701a1c7 100644 --- a/config.yml +++ b/config.yml @@ -27,13 +27,13 @@ globus: alcf832_raw: root_path: /data/raw uri: alcf.anl.gov - uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110 + uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 name: alcf_raw alcf832_scratch: root_path: /data/scratch uri: alcf.anl.gov - uuid: 2f9e7035-f4d8-4aa3-a911-d110bc2c8110 + uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 name: alcf_scratch alcf_eagle832: From 13d1b6d98ddcb953fcb34939b3c939ede9449e9c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 7 Oct 2025 11:52:35 -0700 Subject: [PATCH 2/8] Updating the raw_path when building multi-resolution pyramid. Adjusting the TIFF reconstruction file transfer to happen before Zarr generation for more immediate results. --- orchestration/flows/bl832/alcf.py | 57 ++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 4638d9c1..c631167d 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -135,7 +135,7 @@ def build_multi_resolution( folder_name = Path(file_path).parent.name tiff_scratch_path = f"{self.allocation_root}/data/scratch/{folder_name}/rec{file_name}/" - raw_path = f"{self.allocation_root}/raw/{folder_name}/{file_name}.h5" + raw_path = f"{self.allocation_root}/data/raw/{folder_name}/{file_name}.h5" iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" iri_als_bl832_conversion_script = f"{self.allocation_root}/scripts/tiff_to_zarr.py" @@ -413,6 +413,15 @@ def alcf_recon_flow( else: logger.info("Reconstruction Successful.") + # Transfer A: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + f"at ALCF to {config.data832_scratch} at data832") + data832_tiff_transfer_success = transfer_controller.copy( + file_path=scratch_path_tiff, + source=config.alcf832_scratch, + destination=config.data832_scratch + ) + # STEP 2B: Run the Tiff to Zarr Globus Flow logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") alcf_multi_res_success = tomography_controller.build_multi_resolution( @@ -423,27 +432,35 @@ def alcf_recon_flow( raise ValueError("Tiff to Zarr at ALCF Failed") else: logger.info("Tiff to Zarr Successful.") + # Transfer B: Send reconstructed data (zarr) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + f"at ALCF to {config.data832_scratch} at data832") + data832_zarr_transfer_success = transfer_controller.copy( + file_path=scratch_path_zarr, + source=config.alcf832_scratch, + destination=config.data832_scratch + ) # STEP 3: Send reconstructed data (tiffs and zarr) to data832 - if alcf_reconstruction_success: - # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_tiff_transfer_success = transfer_controller.copy( - file_path=scratch_path_tiff, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) - - if alcf_multi_res_success: - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_zarr_transfer_success = transfer_controller.copy( - file_path=scratch_path_zarr, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) + # if alcf_reconstruction_success: + # # Transfer A: Send reconstructed data (tiff) to data832 + # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # f"at ALCF to {config.data832_scratch} at data832") + # data832_tiff_transfer_success = transfer_controller.copy( + # file_path=scratch_path_tiff, + # source=config.alcf832_scratch, + # destination=config.data832_scratch + # ) + + # if alcf_multi_res_success: + # # Transfer B: Send reconstructed data (zarr) to data832 + # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # f"at ALCF to {config.data832_scratch} at data832") + # data832_zarr_transfer_success = transfer_controller.copy( + # file_path=scratch_path_zarr, + # source=config.alcf832_scratch, + # destination=config.data832_scratch + # ) # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False From 7307a72ba8ca0b30628600ddea9cd8df922aa67f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 7 Oct 2025 16:07:31 -0700 Subject: [PATCH 3/8] removing commented out old code --- orchestration/flows/bl832/alcf.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index c631167d..2adc77cd 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -441,27 +441,6 @@ def alcf_recon_flow( destination=config.data832_scratch ) - # STEP 3: Send reconstructed data (tiffs and zarr) to data832 - # if alcf_reconstruction_success: - # # Transfer A: Send reconstructed data (tiff) to data832 - # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - # f"at ALCF to {config.data832_scratch} at data832") - # data832_tiff_transfer_success = transfer_controller.copy( - # file_path=scratch_path_tiff, - # source=config.alcf832_scratch, - # destination=config.data832_scratch - # ) - - # if alcf_multi_res_success: - # # Transfer B: Send reconstructed data (zarr) to data832 - # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - # f"at ALCF to {config.data832_scratch} at data832") - # data832_zarr_transfer_success = transfer_controller.copy( - # file_path=scratch_path_zarr, - # source=config.alcf832_scratch, - # destination=config.data832_scratch - # ) - # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False From 925dcb7e93a96826c5fbbce40e14e0e06d6f350f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 7 Oct 2025 16:18:14 -0700 Subject: [PATCH 4/8] Fixing pytest to reflect the restructured copy order after TIFF recon stage is complete --- orchestration/_tests/test_globus_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index b71be618..caae3279 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -307,7 +307,7 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_hpc_multires.assert_called_once() # HPC is done, so there's 1 successful transfer (data832->alcf). # We have not transferred tiff or zarr => total 1 copy - assert mock_transfer_controller.copy.call_count == 1 + assert mock_transfer_controller.copy.call_count == 2 mock_schedule_pruning.assert_not_called() # Reset From 4be538055ee959bf62159d6573ff1b042c106b6c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 8 Oct 2025 10:30:09 -0700 Subject: [PATCH 5/8] adding get_run_logger() to the alcf recon flow --- orchestration/flows/bl832/alcf.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 2adc77cd..8aa0d9b0 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -7,7 +7,7 @@ from globus_compute_sdk import Client, Executor from globus_compute_sdk.serialize import CombinedCode -from prefect import flow, task +from prefect import flow, task, get_run_logger from prefect.blocks.system import Secret from prefect.variables import Variable @@ -363,6 +363,7 @@ def alcf_recon_flow( Returns: bool: True if the flow completed successfully, False otherwise. """ + logger = get_run_logger() if config is None: config = Config832() From f0db167d9b6114f80dbb8291b4099326dc8ac49c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 22 Oct 2025 15:57:03 -0700 Subject: [PATCH 6/8] Adding a walltime of 20 minutes to _wait_for_globus_compute_future() to address https://github.com/als-computing/splash_flows/issues/45 --- orchestration/flows/bl832/alcf.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 8aa0d9b0..6a8d8124 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -190,7 +190,8 @@ def _build_multi_resolution_wrapper( def _wait_for_globus_compute_future( future: Future, task_name: str, - check_interval: int = 20 + check_interval: int = 20, + walltime: int = 1200 # seconds = 20 minutes ) -> bool: """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. @@ -199,9 +200,10 @@ def _wait_for_globus_compute_future( future: The future object returned from the Globus Compute Executor submit method. task_name: A descriptive name for the task being executed (used for logging). check_interval: The interval (in seconds) between status checks. + walltime: The maximum time (in seconds) to wait for the task to complete. Returns: - bool: True if the task completed successfully, False otherwise. + bool: True if the task completed successfully within walltime, False otherwise. """ start_time = time.time() success = False @@ -209,6 +211,13 @@ def _wait_for_globus_compute_future( try: previous_state = None while not future.done(): + elapsed_time = time.time() - start_time + if elapsed_time > walltime: + logger.error(f"The {task_name} task exceeded the walltime of {walltime} seconds." + "Cancelling the Globus Compute job.") + future.cancel() + return False + # Check if the task was cancelled if future.cancelled(): logger.warning(f"The {task_name} task was cancelled.") From b1080f3e74f414ce20157a9707ac088e2fc78397 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 5 Jan 2026 14:04:20 -0800 Subject: [PATCH 7/8] Fixing comment in test_globus_flows since there are 2 copies in the test case now --- orchestration/_tests/test_globus_flow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index caae3279..c5e93b6c 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -305,8 +305,8 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() - # HPC is done, so there's 1 successful transfer (data832->alcf). - # We have not transferred tiff or zarr => total 1 copy + # HPC is done, so there's 2 successful transfer (data832->alcf). + # We have not transferred tiff or zarr => total 2 copies assert mock_transfer_controller.copy.call_count == 2 mock_schedule_pruning.assert_not_called() From 0221930bc8bbb37025d72c371e8142e48eaa6e12 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 5 Jan 2026 14:24:03 -0800 Subject: [PATCH 8/8] removing python logger and ensuring all tasks/flows use prefect's get_run_logger --- orchestration/flows/bl832/alcf.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 6a8d8124..bdf96ac2 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,6 +1,5 @@ from concurrent.futures import Future import datetime -import logging from pathlib import Path import time from typing import Optional @@ -16,9 +15,6 @@ from orchestration.transfer_controller import get_transfer_controller, CopyMethod from orchestration.prefect import schedule_prefect_flow -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - class ALCFTomographyHPCController(TomographyHPCController): """ @@ -37,6 +33,7 @@ def __init__( super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" + logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) self.allocation_root = allocation_data.get("alcf-allocation-root-path") if not self.allocation_root: @@ -56,7 +53,7 @@ def reconstruct( Returns: bool: True if the task completed successfully, False otherwise. """ - + logger = get_run_logger() file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name @@ -131,6 +128,8 @@ def build_multi_resolution( Returns: bool: True if the task completed successfully, False otherwise. """ + logger = get_run_logger() + file_name = Path(file_path).stem folder_name = Path(file_path).parent.name @@ -205,6 +204,8 @@ def _wait_for_globus_compute_future( Returns: bool: True if the task completed successfully within walltime, False otherwise. """ + logger = get_run_logger() + start_time = time.time() success = False @@ -277,6 +278,8 @@ def schedule_prune_task( Returns: bool: True if the task was scheduled successfully, False otherwise. """ + logger = get_run_logger() + try: flow_name = f"delete {location}: {Path(path).name}" schedule_prefect_flow( @@ -324,6 +327,8 @@ def schedule_pruning( Returns: bool: True if the tasks were scheduled successfully, False otherwise. """ + logger = get_run_logger() + pruning_config = Variable.get("pruning-config", _sync=True) if one_minute: