diff --git a/docs/mkdocs/docs/bl733.md b/docs/mkdocs/docs/bl733.md index 1a76de38..a8630d95 100644 --- a/docs/mkdocs/docs/bl733.md +++ b/docs/mkdocs/docs/bl733.md @@ -1,6 +1,62 @@ # Beamline 7.3.3 Flows -This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.3.3 (SAXS/WAXS/GISAXS)](https://saxswaxs.lbl.gov/user-information). Beamline 7.3.3 supports hard x-ray scattering techniques include small- and wide-angle x-ray scattering (SAXS/WAXS) and grazing-incidence SAXS/WAXS (GISAXS/GIWAXS). +This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.3.3 (SAXS/WAXS/GISAXS)](https://saxswaxs.lbl.gov/user-information). + +Beamline 7.3.3 supports hard x-ray scattering techniques, including small- and wide-angle x-ray scattering (SAXS/WAXS) and grazing-incidence SAXS/WAXS (GISAXS/GIWAXS). + +## Data at 7.3.3 + +The data collected from 7.3.3 are typically 2D scattering images, where each pixel records scattering intensity as a function of scattering angle. + +## File Watcher + +There is a file watcher on the system `data733` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps in a `new_733_file_task` task. + +## Prefect Configuration + +### Registered Flows + +#### `dispatcher.py` + +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). + +#### `move.py` + +Flow to process a new file at BL 7.3.3 +1. Copy the data from data733 to Lamarr (our common staging area). +2. Copy the file from the data733 to NERSC CFS. +3. Ingest the data from Lamarr into SciCat. +4. Schedule pruning from data733 for 6 months from now. +5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future. + +### Prefect Server + Deployments + +This beamline is starting fresh with `Prefect==3.4.2` (an upgrade from `2.19.5`). With the latest Prefect versions, we can define deployments in a `yaml` file rather than build/apply steps in a shell script. `create_deployments_733.sh` is the legacy way we support registering flows. Now, flows are defined in `orchestration/flows/bl733/prefect.yaml`. Keeping the prefect config for the beamline within the flows folder makes it easier to keep track of different Prefect deployments for different beamlines. + +Note that we still must create work pools manually before we can register flows to them. + +For example, here is how we can now create our deployments: + +```bash +# cd to the directory +cd orchestration/flows/bl733/ + +# add Prefect API URL + Key to the environment (if not already present) +export PREFECT_API_URL=http://:4200/api + +# create the work-pools +prefect work-pool create new_file_733_pool +prefect work-pool create dispatcher_733_pool +prefect work-pool create prune_733_pool + +prefect deploy +``` + +We can also preview a deployment: `prefect deploy --output yaml`, or deploy only one flow `prefect deploy --name run_733_dispatcher`. + +The following script follows the above logic for deploying the flows in a streamlined fashion for the latest version of Prefect: +`splash_flows/init_work_pools.py` + ## Diagrams @@ -13,7 +69,7 @@ sequenceDiagram %% Initial Trigger T->>T: Detector → File Watcher - T->>F: File Watcher triggers Dispatcher + T->>F: File Watcher
triggers Dispatcher F->>F: Dispatcher coordinates downstream Flows %% Flow 1: new_file_733 @@ -50,7 +106,6 @@ sequenceDiagram end ``` - ### Data Infrastructure Workflows ```mermaid --- @@ -59,7 +114,7 @@ config: layout: elk look: neo --- -flowchart LR +flowchart subgraph s1["new_file_733 Flow"] n20["data733"] n21["NERSC CFS"] @@ -133,64 +188,8 @@ flowchart LR style s1 stroke:#757575 style s2 stroke:#757575 style s3 stroke:#757575 - -``` - -## Data at 7.3.3 - -The data collected from 7.3.3 are typically 2D scattering images, where each pixel records scattering intensity as a function of scattering angle. - -## File Watcher - -There is a file watcher on the system `data733` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps: -- Copy scans in real time to `NERSC CFS` using Globus Transfer. -- Copy project data to `NERSC HPSS` for long-term storage. -- Analysis on HPC systems (TBD). -- Schedule data pruning from `data733` and `NERSC CFS`. - -## Prefect Configuration - -### Registered Flows - -#### `dispatcher.py` - -The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). - -#### `move.py` - -Flow to process a new file at BL 7.3.3 -1. Copy the file from the data733 to NERSC CFS. Ingest file path in SciCat. -2. Schedule pruning from data733 (ensuring that data is on NERSC before deletion). -3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. -4. Schedule pruning from NERSC CFS (ensuring data is on HPSS before deletion). - -### Prefect Server + Deployments - -This beamline is starting fresh with `Prefect==3.4.2` (an upgrade from `2.19.5`). With the latest Prefect versions, we can define deployments in a `yaml` file rather than build/apply steps in a shell script. `create_deployments_733.sh` is the legacy way we support registering flows. Now, flows are defined in `orchestration/flows/bl733/prefect.yaml`. Keeping the prefect config for the beamline within the flows folder makes it easier to keep track of different Prefect deployments for different beamlines. - -Note that we still must create work pools manually before we can register flows to them. - -For example, here is how we can now create our deployments: - -```bash -# cd to the directory -cd orchestration/flows/bl733/ - -# add Prefect API URL + Key to the environment (if not already present) -export PREFECT_API_URL=http://:4200/api - -# create the work-pools -prefect work-pool create new_file_733_pool -prefect work-pool create dispatcher_733_pool -prefect work-pool create prune_733_pool - -prefect deploy ``` -We can also preview a deployment: `prefect deploy --output yaml`, or deploy only one flow `prefect deploy --name run_733_dispatcher`. - -The following script follows the above logic for deploying the flows in a streamlined fashion for the latest version of Prefect: -`splash_flows/init_work_pools.py` ## VM Details diff --git a/docs/mkdocs/docs/configuration.md b/docs/mkdocs/docs/configuration.md index 529fe1f9..d7cd8db6 100644 --- a/docs/mkdocs/docs/configuration.md +++ b/docs/mkdocs/docs/configuration.md @@ -1,6 +1,6 @@ -### Summary of `config.yaml` +### Summary of `config.yml` -The `config.yaml` file contains configurations for various components involved in the data management, processing, and orchestration workflows related to the ALS beamlines. +The `config.yml` file contains configurations for various components involved in the data management, processing, and orchestration workflows related to the ALS beamlines. #### **Globus Endpoints** - **globus_endpoints**: Defines multiple Globus endpoints used for data transfer between various systems. diff --git a/orchestration/flows/bl733/dispatcher.py b/orchestration/flows/bl733/dispatcher.py index bd5f558d..f2f82bcc 100644 --- a/orchestration/flows/bl733/dispatcher.py +++ b/orchestration/flows/bl733/dispatcher.py @@ -22,9 +22,8 @@ def dispatcher( :param is_export_control: Flag indicating if export control measures should be applied. (Not used in the current BL733 processing) :param config: Configuration settings for processing. - Expected to be an instance of Config733 or a dict that can be converted. - :raises ValueError: If no configuration is provided. - :raises TypeError: If the provided configuration is not a dict or Config733. + If not provided, a default Config733 is instantiated. + :raises ValueError: If no file_path is provided. """ logger.info("Starting dispatcher flow for BL 7.3.3") diff --git a/orchestration/flows/bl733/move.py b/orchestration/flows/bl733/move.py index 9480b043..296bf875 100644 --- a/orchestration/flows/bl733/move.py +++ b/orchestration/flows/bl733/move.py @@ -1,10 +1,12 @@ import datetime import logging +from pathlib import Path from typing import Optional from prefect import flow, get_run_logger, task from prefect.variables import Variable +from orchestration.flows.scicat.ingest import scicat_ingest_flow from orchestration.flows.bl733.config import Config733 from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe from orchestration.prefect import schedule_prefect_flow @@ -147,7 +149,7 @@ def process_new_733_file_flow( :param config: Configuration settings for processing. :return: None """ - process_new_733_file_flow( + process_new_733_file_task( file_path=file_path, config=config ) @@ -159,11 +161,12 @@ def process_new_733_file_task( config: Optional[Config733] = None ) -> None: """ - Task to process a new file at BL 7.3.3 - 1. Copy the file from the data733 to NERSC CFS. Ingest file path in SciCat. - 2. Schedule pruning from data733. 6 months from now. - 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. - 4. Schedule pruning from NERSC CFS. + Task to process new data at BL 7.3.3 + 1. Copy the data from data733 to Lamarr (our common staging area). + 2. Copy the file from the data733 to NERSC CFS. + 3. Ingest the data from Lamarr into SciCat. + 4. Schedule pruning from data733 for 6 months from now. + 5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future. :param file_path: Path to the new file to be processed. :param config: Configuration settings for processing. @@ -192,10 +195,14 @@ def process_new_733_file_task( destination=config.nersc733_alsdev_raw ) - # Waiting for PR #62 to be merged (prune_controller) + # Note that the SciCat ingester assumes the data is on Lamarr. + try: + scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs") + except Exception as e: + logger.error(f"SciCat ingest failed with {e}") + # Waiting for PR #62 to be merged (prune_controller) bl733_settings = Variable.get("bl733-settings", _sync=True) - prune( file_path=file_path, source_endpoint=config.data733_raw, @@ -206,9 +213,6 @@ def process_new_733_file_task( # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? # Waiting for PR #62 to be merged (transfer_controller) - # TODO: Ingest file path in SciCat - # Waiting for PR #62 to be merged (scicat_controller) - @flow(name="move_733_flight_check", flow_run_name="move_733_flight_check-{file_path}") def move_733_flight_check(