Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 59 additions & 60 deletions docs/mkdocs/docs/bl733.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,62 @@
# Beamline 7.3.3 Flows

This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.3.3 (SAXS/WAXS/GISAXS)](https://saxswaxs.lbl.gov/user-information). Beamline 7.3.3 supports hard x-ray scattering techniques include small- and wide-angle x-ray scattering (SAXS/WAXS) and grazing-incidence SAXS/WAXS (GISAXS/GIWAXS).
This page documents the workflows supported by Splash Flows Globus at [ALS Beamline 7.3.3 (SAXS/WAXS/GISAXS)](https://saxswaxs.lbl.gov/user-information).

Beamline 7.3.3 supports hard x-ray scattering techniques, including small- and wide-angle x-ray scattering (SAXS/WAXS) and grazing-incidence SAXS/WAXS (GISAXS/GIWAXS).

## Data at 7.3.3

The data collected from 7.3.3 are typically 2D scattering images, where each pixel records scattering intensity as a function of scattering angle.

## File Watcher

There is a file watcher on the system `data733` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps in a `new_733_file_task` task.

## Prefect Configuration

### Registered Flows

#### `dispatcher.py`

The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code).

#### `move.py`

Flow to process a new file at BL 7.3.3
1. Copy the data from data733 to Lamarr (our common staging area).
2. Copy the file from the data733 to NERSC CFS.
3. Ingest the data from Lamarr into SciCat.
4. Schedule pruning from data733 for 6 months from now.
5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future.

### Prefect Server + Deployments

This beamline is starting fresh with `Prefect==3.4.2` (an upgrade from `2.19.5`). With the latest Prefect versions, we can define deployments in a `yaml` file rather than build/apply steps in a shell script. `create_deployments_733.sh` is the legacy way we support registering flows. Now, flows are defined in `orchestration/flows/bl733/prefect.yaml`. Keeping the prefect config for the beamline within the flows folder makes it easier to keep track of different Prefect deployments for different beamlines.

Note that we still must create work pools manually before we can register flows to them.

For example, here is how we can now create our deployments:

```bash
# cd to the directory
cd orchestration/flows/bl733/

# add Prefect API URL + Key to the environment (if not already present)
export PREFECT_API_URL=http://<your-prefect-server-for-bl733>:4200/api

# create the work-pools
prefect work-pool create new_file_733_pool
prefect work-pool create dispatcher_733_pool
prefect work-pool create prune_733_pool

prefect deploy
```

We can also preview a deployment: `prefect deploy --output yaml`, or deploy only one flow `prefect deploy --name run_733_dispatcher`.

The following script follows the above logic for deploying the flows in a streamlined fashion for the latest version of Prefect:
`splash_flows/init_work_pools.py`


## Diagrams

Expand All @@ -13,7 +69,7 @@ sequenceDiagram

%% Initial Trigger
T->>T: Detector → File Watcher
T->>F: File Watcher triggers Dispatcher
T->>F: File Watcher<br/>triggers Dispatcher
F->>F: Dispatcher coordinates downstream Flows

%% Flow 1: new_file_733
Expand Down Expand Up @@ -50,7 +106,6 @@ sequenceDiagram
end
```


### Data Infrastructure Workflows
```mermaid
---
Expand All @@ -59,7 +114,7 @@ config:
layout: elk
look: neo
---
flowchart LR
flowchart
subgraph s1["new_file_733 Flow"]
n20["data733"]
n21["NERSC CFS"]
Expand Down Expand Up @@ -133,64 +188,8 @@ flowchart LR
style s1 stroke:#757575
style s2 stroke:#757575
style s3 stroke:#757575

```

## Data at 7.3.3

The data collected from 7.3.3 are typically 2D scattering images, where each pixel records scattering intensity as a function of scattering angle.

## File Watcher

There is a file watcher on the system `data733` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps:
- Copy scans in real time to `NERSC CFS` using Globus Transfer.
- Copy project data to `NERSC HPSS` for long-term storage.
- Analysis on HPC systems (TBD).
- Schedule data pruning from `data733` and `NERSC CFS`.

## Prefect Configuration

### Registered Flows

#### `dispatcher.py`

The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. As as soon as the File Watcher detects that a new file is written, it calls the `dispatcher()` Flow. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code).

#### `move.py`

Flow to process a new file at BL 7.3.3
1. Copy the file from the data733 to NERSC CFS. Ingest file path in SciCat.
2. Schedule pruning from data733 (ensuring that data is on NERSC before deletion).
3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat.
4. Schedule pruning from NERSC CFS (ensuring data is on HPSS before deletion).

### Prefect Server + Deployments

This beamline is starting fresh with `Prefect==3.4.2` (an upgrade from `2.19.5`). With the latest Prefect versions, we can define deployments in a `yaml` file rather than build/apply steps in a shell script. `create_deployments_733.sh` is the legacy way we support registering flows. Now, flows are defined in `orchestration/flows/bl733/prefect.yaml`. Keeping the prefect config for the beamline within the flows folder makes it easier to keep track of different Prefect deployments for different beamlines.

Note that we still must create work pools manually before we can register flows to them.

For example, here is how we can now create our deployments:

```bash
# cd to the directory
cd orchestration/flows/bl733/

# add Prefect API URL + Key to the environment (if not already present)
export PREFECT_API_URL=http://<your-prefect-server-for-bl733>:4200/api

# create the work-pools
prefect work-pool create new_file_733_pool
prefect work-pool create dispatcher_733_pool
prefect work-pool create prune_733_pool

prefect deploy
```

We can also preview a deployment: `prefect deploy --output yaml`, or deploy only one flow `prefect deploy --name run_733_dispatcher`.

The following script follows the above logic for deploying the flows in a streamlined fashion for the latest version of Prefect:
`splash_flows/init_work_pools.py`

## VM Details

Expand Down
4 changes: 2 additions & 2 deletions docs/mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Summary of `config.yaml`
### Summary of `config.yml`

The `config.yaml` file contains configurations for various components involved in the data management, processing, and orchestration workflows related to the ALS beamlines.
The `config.yml` file contains configurations for various components involved in the data management, processing, and orchestration workflows related to the ALS beamlines.

#### **Globus Endpoints**
- **globus_endpoints**: Defines multiple Globus endpoints used for data transfer between various systems.
Expand Down
5 changes: 2 additions & 3 deletions orchestration/flows/bl733/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ def dispatcher(
:param is_export_control: Flag indicating if export control measures should be applied.
(Not used in the current BL733 processing)
:param config: Configuration settings for processing.
Expected to be an instance of Config733 or a dict that can be converted.
:raises ValueError: If no configuration is provided.
:raises TypeError: If the provided configuration is not a dict or Config733.
If not provided, a default Config733 is instantiated.
:raises ValueError: If no file_path is provided.
"""

logger.info("Starting dispatcher flow for BL 7.3.3")
Expand Down
26 changes: 15 additions & 11 deletions orchestration/flows/bl733/move.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import logging
from pathlib import Path
from typing import Optional

from prefect import flow, get_run_logger, task
from prefect.variables import Variable

from orchestration.flows.scicat.ingest import scicat_ingest_flow
from orchestration.flows.bl733.config import Config733
from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe
from orchestration.prefect import schedule_prefect_flow
Expand Down Expand Up @@ -147,7 +149,7 @@ def process_new_733_file_flow(
:param config: Configuration settings for processing.
:return: None
"""
process_new_733_file_flow(
process_new_733_file_task(
file_path=file_path,
config=config
)
Expand All @@ -159,11 +161,12 @@ def process_new_733_file_task(
config: Optional[Config733] = None
) -> None:
"""
Task to process a new file at BL 7.3.3
1. Copy the file from the data733 to NERSC CFS. Ingest file path in SciCat.
2. Schedule pruning from data733. 6 months from now.
3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat.
4. Schedule pruning from NERSC CFS.
Task to process new data at BL 7.3.3
1. Copy the data from data733 to Lamarr (our common staging area).
2. Copy the file from the data733 to NERSC CFS.
3. Ingest the data from Lamarr into SciCat.
4. Schedule pruning from data733 for 6 months from now.
5. Archive the file from NERSC CFS to NERSC HPSS at some point in the future.

:param file_path: Path to the new file to be processed.
:param config: Configuration settings for processing.
Expand Down Expand Up @@ -192,10 +195,14 @@ def process_new_733_file_task(
destination=config.nersc733_alsdev_raw
)

# Waiting for PR #62 to be merged (prune_controller)
# Note that the SciCat ingester assumes the data is on Lamarr.
try:
scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec="als733_saxs")
except Exception as e:
logger.error(f"SciCat ingest failed with {e}")

# Waiting for PR #62 to be merged (prune_controller)
bl733_settings = Variable.get("bl733-settings", _sync=True)

prune(
file_path=file_path,
source_endpoint=config.data733_raw,
Expand All @@ -206,9 +213,6 @@ def process_new_733_file_task(
# TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years?
# Waiting for PR #62 to be merged (transfer_controller)

# TODO: Ingest file path in SciCat
# Waiting for PR #62 to be merged (scicat_controller)


@flow(name="move_733_flight_check", flow_run_name="move_733_flight_check-{file_path}")
def move_733_flight_check(
Expand Down