From 440d39e8239eb32efb6cabc5d335784f42cdab59 Mon Sep 17 00:00:00 2001 From: "Eugene M." Date: Thu, 12 Feb 2026 12:54:46 -0500 Subject: [PATCH 1/2] ENH: Add data validation in Tiled --- data_validation.py | 44 +++++++++++++++++++++++++++++++++++++----- end_of_run_workflow.py | 8 +++++--- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/data_validation.py b/data_validation.py index 7219f74..9c549e3 100644 --- a/data_validation.py +++ b/data_validation.py @@ -1,7 +1,9 @@ -import time as ttime +import time from prefect import flow, get_run_logger, task from prefect.blocks.system import Secret + +from bluesky_tiled_plugins.writing.validator import validate from tiled.client import from_profile @@ -12,13 +14,45 @@ def read_all_streams(beamline_acronym, uid): tiled_client = from_profile("nsls2", api_key=api_key) run = tiled_client[beamline_acronym]["raw"][uid] logger.info(f"Validating uid {run.start['uid']}") - start_time = ttime.monotonic() + start_time = time.monotonic() for stream in run: logger.info(f"{stream}:") - stream_start_time = ttime.monotonic() + stream_start_time = time.monotonic() stream_data = run[stream].read() - stream_elapsed_time = ttime.monotonic() - stream_start_time + stream_elapsed_time = time.monotonic() - stream_start_time logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") - elapsed_time = ttime.monotonic() - start_time + elapsed_time = time.monotonic() - start_time logger.info(f"{elapsed_time = }") + + +@task(retries=3, retry_delay_seconds=20) +def data_validation_task(uid, beamline_acronym="cms"): + """Task to validate the data structure and accessibility in Tiled + + Parameters + ---------- + uid : str + The UID of the run to validate + beamline_acronym : str, optional + The acronym of the beamline (default is "cms") + """ + + logger = get_run_logger() + + api_key = Secret.load(f"tiled-{beamline_acronym}-api-key", _sync=True).get() + tiled_client = from_profile("nsls2", api_key=api_key) + + logger.info(f"Connecting to Tiled client for beamline '{beamline_acronym}'") + run_client = tiled_client[f"{beamline_acronym}/migration"][uid] + + logger.info(f"Validating uid {uid}") + start_time = time.monotonic() + validate(run_client, fix_errors=True, try_reading=True, raise_on_error=True) + elapsed_time = time.monotonic() - start_time + logger.info(f"Finished validating data; {elapsed_time = }") + + +@flow(log_prints=True) +def data_validation_flow(uid): + data_validation_task(uid) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 2eab7fb..f00b621 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -2,7 +2,7 @@ from prefect.task_runners import ConcurrentTaskRunner #from analysis import run_analysis -from data_validation import read_all_streams +from data_validation import read_all_streams, data_validation_task from linker import create_symlinks @@ -21,8 +21,9 @@ def end_of_run_workflow(stop_doc): linker_task = create_symlinks.submit(uid) logger.info("Launched linker task") - validation_task = read_all_streams.submit("cms", uid) - logger.info("Launched validation task") + read_streams_task = read_all_streams.submit("cms", uid) + validation_task = data_validation_task.submit(uid) + logger.info("Launched validation tasks") # analysis_task = run_analysis(raw_ref=uid) # logger.info("Launched analysis task") @@ -30,6 +31,7 @@ def end_of_run_workflow(stop_doc): # Wait for all tasks to comple logger.info("Waiting for tasks to complete") linker_task.result() + read_streams_task.result() validation_task.result() # analysis_task.result() log_completion() From 795ac74752ab0969dbb2a75987ea28a031dc4b7e Mon Sep 17 00:00:00 2001 From: "Eugene M." Date: Fri, 13 Feb 2026 09:42:50 -0500 Subject: [PATCH 2/2] DEP: use full tiled instead of just client --- pixi.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixi.toml b/pixi.toml index c5df2f6..0fc415a 100644 --- a/pixi.toml +++ b/pixi.toml @@ -5,7 +5,7 @@ platforms = ["linux-64"] [dependencies] prefect = "3.*" -tiled-client = ">=0.2.3" +tiled = ">=0.2.3" python = "<3.12" bluesky-tiled-plugins = ">=2" prefect-docker = "*"