From c98d98afeb2bf99bb1231279ae0ca28d026dae31 Mon Sep 17 00:00:00 2001 From: Garrett Birkel Date: Mon, 5 Jan 2026 12:46:19 -0800 Subject: [PATCH 1/6] Minor edit: making the extension match. --- docs/mkdocs/docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/mkdocs/docs/configuration.md b/docs/mkdocs/docs/configuration.md index 529fe1f9..d7cd8db6 100644 --- a/docs/mkdocs/docs/configuration.md +++ b/docs/mkdocs/docs/configuration.md @@ -1,6 +1,6 @@ -### Summary of `config.yaml` +### Summary of `config.yml` -The `config.yaml` file contains configurations for various components involved in the data management, processing, and orchestration workflows related to the ALS beamlines. +The `config.yml` file contains configurations for various components involved in the data management, processing, and orchestration workflows related to the ALS beamlines. #### **Globus Endpoints** - **globus_endpoints**: Defines multiple Globus endpoints used for data transfer between various systems. From aaffdcfda265c0e72e0161b5c35ad2d714e4dd56 Mon Sep 17 00:00:00 2001 From: Garrett Birkel Date: Mon, 5 Jan 2026 12:46:40 -0800 Subject: [PATCH 2/6] Minor: Slightly more accurate function description --- orchestration/flows/bl733/dispatcher.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/orchestration/flows/bl733/dispatcher.py b/orchestration/flows/bl733/dispatcher.py index bd5f558d..f2f82bcc 100644 --- a/orchestration/flows/bl733/dispatcher.py +++ b/orchestration/flows/bl733/dispatcher.py @@ -22,9 +22,8 @@ def dispatcher( :param is_export_control: Flag indicating if export control measures should be applied. (Not used in the current BL733 processing) :param config: Configuration settings for processing. - Expected to be an instance of Config733 or a dict that can be converted. - :raises ValueError: If no configuration is provided. - :raises TypeError: If the provided configuration is not a dict or Config733. + If not provided, a default Config733 is instantiated. + :raises ValueError: If no file_path is provided. """ logger.info("Starting dispatcher flow for BL 7.3.3") From 096136412c0b7fce3cde158f4f9421d895bf52c6 Mon Sep 17 00:00:00 2001 From: Garrett Birkel Date: Mon, 5 Jan 2026 12:48:26 -0800 Subject: [PATCH 3/6] Calling the SciCat ingestion flow with the 733 SAXS spec, as part of process_new_733_file_task. Not sure if we need to do anything special with file_path... --- orchestration/flows/bl733/move.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 9480b043..f6900a83 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -1,10 +1,12 @@ import datetime import logging +from pathlib import Path from typing import Optional from prefect import flow, get_run_logger, task from prefect.variables import Variable +from orchestration.flows.scicat.ingest import scicat_ingest_flow from orchestration.flows.bl733.config import Config733 from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe from orchestration.prefect import schedule_prefect_flow @@ -192,10 +194,13 @@ def process_new_733_file_task( destination=config.nersc733_alsdev_raw ) - # Waiting for PR #62 to be merged (prune_controller) + try: + scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs") + except Exception as e: + logger.error(f"SciCat ingest failed with {e}") + # Waiting for PR #62 to be merged (prune_controller) bl733_settings = Variable.get("bl733-settings", _sync=True) - prune( file_path=file_path, source_endpoint=config.data733_raw, @@ -206,9 +211,6 @@ def process_new_733_file_task( # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) - # TODO: Ingest file path in SciCat - # Waiting for PR #62 to be merged (scicat_controller) - @flow(name="move_733_flight_check", flow_run_name="move_733_flight_check-{file_path}") def move_733_flight_check( From 30de84288a93314f2c04c81d224ec7da82b082ff Mon Sep 17 00:00:00 2001 From: Garrett Birkel Date: Mon, 5 Jan 2026 12:50:52 -0800 Subject: [PATCH 4/6] This should call the task, I assume, not call itself in a loop :D --- orchestration/flows/bl733/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index f6900a83..4159f636 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -149,7 +149,7 @@ def process_new_733_file_flow( :param config: Configuration settings for processing. :return: None """ - process_new_733_file_flow( + process_new_733_file_task( file_path=file_path, config=config ) From 9937375a0689b549d9dd1b4b754a88da01f68167 Mon Sep 17 00:00:00 2001 From: Garrett Birkel Date: Mon, 5 Jan 2026 12:57:57 -0800 Subject: [PATCH 5/6] Revised process_new_733_file_task description --- orchestration/flows/bl733/move.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 4159f636..296bf875 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -161,11 +161,12 @@ def process_new_733_file_task( config: Optional[Config733] = None ) -> None: """ - Task to process a new file at BL 7.3.3 - 1. Copy the file from the data733 to NERSC CFS. Ingest file path in SciCat. - 2. Schedule pruning from data733. 6 months from now. - 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. - 4. Schedule pruning from NERSC CFS. + Task to process new data at BL 7.3.3 + 1. Copy the data from data733 to Lamarr (our common staging area). + 2. Copy the file from the data733 to NERSC CFS. + 3. Ingest the data from Lamarr into SciCat. + 4. Schedule pruning from data733 for 6 months from now. + 5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future. :param file_path: Path to the new file to be processed. :param config: Configuration settings for processing. @@ -194,6 +195,7 @@ def process_new_733_file_task( destination=config.nersc733_alsdev_raw ) + # Note that the SciCat ingester assumes the data is on Lamarr. try: scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs") except Exception as e: From 6f35b14f041354cfb86b1b8999159c22769b283c Mon Sep 17 00:00:00 2001 From: Garrett Birkel Date: Mon, 5 Jan 2026 13:03:54 -0800 Subject: [PATCH 6/6] Moving the charts down near the bottom of the doc, and updating some of the description. I think the 'sequence diagram' chart needs revising? --- docs/mkdocs/docs/bl733.md | 119 +++++++++++++++++++------------------- 1 file changed, 59 insertions(+), 60 deletions(-) diff --git a/docs/mkdocs/docs/bl733.md b/docs/mkdocs/docs/bl733.md index 1a76de38..a8630d95 100644 --- a/docs/mkdocs/docs/bl733.md +++ b/docs/mkdocs/docs/bl733.md @@ -1,6 +1,62 @@ # Beamline 7.3.3 Flows -This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.3.3 (SAXS/WAXS/GISAXS)](https://saxswaxs.lbl.gov/user-information). Beamline 7.3.3 supports hard x-ray scattering techniques include small- and wide-angle x-ray scattering (SAXS/WAXS) and grazing-incidence SAXS/WAXS (GISAXS/GIWAXS). +This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.3.3 (SAXS/WAXS/GISAXS)](https://saxswaxs.lbl.gov/user-information). + +Beamline 7.3.3 supports hard x-ray scattering techniques, including small- and wide-angle x-ray scattering (SAXS/WAXS) and grazing-incidence SAXS/WAXS (GISAXS/GIWAXS). + +## Data at 7.3.3 + +The data collected from 7.3.3 are typically 2D scattering images, where each pixel records scattering intensity as a function of scattering angle. + +## File Watcher + +There is a file watcher on the system `data733` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps in a `new_733_file_task` task. + +## Prefect Configuration + +### Registered Flows + +#### `dispatcher.py` + +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). + +#### `move.py` + +Flow to process a new file at BL 7.3.3 +1. Copy the data from data733 to Lamarr (our common staging area). +2. Copy the file from the data733 to NERSC CFS. +3. Ingest the data from Lamarr into SciCat. +4. Schedule pruning from data733 for 6 months from now. +5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future. + +### Prefect Server + Deployments + +This beamline is starting fresh with `Prefect==3.4.2` (an upgrade from `2.19.5`). With the latest Prefect versions, we can define deployments in a `yaml` file rather than build/apply steps in a shell script. `create_deployments_733.sh` is the legacy way we support registering flows. Now, flows are defined in `orchestration/flows/bl733/prefect.yaml`. Keeping the prefect config for the beamline within the flows folder makes it easier to keep track of different Prefect deployments for different beamlines. + +Note that we still must create work pools manually before we can register flows to them. + +For example, here is how we can now create our deployments: + +```bash +# cd to the directory +cd orchestration/flows/bl733/ + +# add Prefect API URL + Key to the environment (if not already present) +export PREFECT_API_URL=http://:4200/api + +# create the work-pools +prefect work-pool create new_file_733_pool +prefect work-pool create dispatcher_733_pool +prefect work-pool create prune_733_pool + +prefect deploy +``` + +We can also preview a deployment: `prefect deploy --output yaml`, or deploy only one flow `prefect deploy --name run_733_dispatcher`. + +The following script follows the above logic for deploying the flows in a streamlined fashion for the latest version of Prefect: +`splash_flows/init_work_pools.py` + ## Diagrams @@ -13,7 +69,7 @@ sequenceDiagram %% Initial Trigger T->>T: Detector → File Watcher - T->>F: File Watcher triggers Dispatcher + T->>F: File Watcher
triggers Dispatcher F->>F: Dispatcher coordinates downstream Flows %% Flow 1: new_file_733 @@ -50,7 +106,6 @@ sequenceDiagram end ``` - ### Data Infrastructure Workflows ```mermaid --- @@ -59,7 +114,7 @@ config: layout: elk look: neo --- -flowchart LR +flowchart subgraph s1["new_file_733 Flow"] n20["data733"] n21["NERSC CFS"] @@ -133,64 +188,8 @@ flowchart LR style s1 stroke:#757575 style s2 stroke:#757575 style s3 stroke:#757575 - -``` - -## Data at 7.3.3 - -The data collected from 7.3.3 are typically 2D scattering images, where each pixel records scattering intensity as a function of scattering angle. - -## File Watcher - -There is a file watcher on the system `data733` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps: -- Copy scans in real time to `NERSC CFS` using Globus Transfer. -- Copy project data to `NERSC HPSS` for long-term storage. -- Analysis on HPC systems (TBD). -- Schedule data pruning from `data733` and `NERSC CFS`. - -## Prefect Configuration - -### Registered Flows - -#### `dispatcher.py` - -The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). - -#### `move.py` - -Flow to process a new file at BL 7.3.3 -1. Copy the file from the data733 to NERSC CFS. Ingest file path in SciCat. -2. Schedule pruning from data733 (ensuring that data is on NERSC before deletion). -3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. -4. Schedule pruning from NERSC CFS (ensuring data is on HPSS before deletion). - -### Prefect Server + Deployments - -This beamline is starting fresh with `Prefect==3.4.2` (an upgrade from `2.19.5`). With the latest Prefect versions, we can define deployments in a `yaml` file rather than build/apply steps in a shell script. `create_deployments_733.sh` is the legacy way we support registering flows. Now, flows are defined in `orchestration/flows/bl733/prefect.yaml`. Keeping the prefect config for the beamline within the flows folder makes it easier to keep track of different Prefect deployments for different beamlines. - -Note that we still must create work pools manually before we can register flows to them. - -For example, here is how we can now create our deployments: - -```bash -# cd to the directory -cd orchestration/flows/bl733/ - -# add Prefect API URL + Key to the environment (if not already present) -export PREFECT_API_URL=http://:4200/api - -# create the work-pools -prefect work-pool create new_file_733_pool -prefect work-pool create dispatcher_733_pool -prefect work-pool create prune_733_pool - -prefect deploy ``` -We can also preview a deployment: `prefect deploy --output yaml`, or deploy only one flow `prefect deploy --name run_733_dispatcher`. - -The following script follows the above logic for deploying the flows in a streamlined fashion for the latest version of Prefect: -`splash_flows/init_work_pools.py` ## VM Details