Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import time as ttime
import time

from prefect import flow, get_run_logger, task
from prefect.blocks.system import Secret

from bluesky_tiled_plugins.writing.validator import validate
from tiled.client import from_profile


Expand All @@ -12,13 +14,45 @@ def read_all_streams(beamline_acronym, uid):
tiled_client = from_profile("nsls2", api_key=api_key)
run = tiled_client[beamline_acronym]["raw"][uid]
logger.info(f"Validating uid {run.start['uid']}")
start_time = ttime.monotonic()
start_time = time.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_start_time = time.monotonic()
stream_data = run[stream].read()
stream_elapsed_time = ttime.monotonic() - stream_start_time
stream_elapsed_time = time.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = ttime.monotonic() - start_time
elapsed_time = time.monotonic() - start_time
logger.info(f"{elapsed_time = }")


@task(retries=3, retry_delay_seconds=20)
def data_validation_task(uid, beamline_acronym="cms"):
"""Task to validate the data structure and accessibility in Tiled

Parameters
----------
uid : str
The UID of the run to validate
beamline_acronym : str, optional
The acronym of the beamline (default is "cms")
"""

logger = get_run_logger()

api_key = Secret.load(f"tiled-{beamline_acronym}-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)

logger.info(f"Connecting to Tiled client for beamline '{beamline_acronym}'")
run_client = tiled_client[f"{beamline_acronym}/migration"][uid]

logger.info(f"Validating uid {uid}")
start_time = time.monotonic()
validate(run_client, fix_errors=True, try_reading=True, raise_on_error=True)
elapsed_time = time.monotonic() - start_time
logger.info(f"Finished validating data; {elapsed_time = }")


@flow(log_prints=True)
def data_validation_flow(uid):
data_validation_task(uid)
8 changes: 5 additions & 3 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from prefect.task_runners import ConcurrentTaskRunner

#from analysis import run_analysis
from data_validation import read_all_streams
from data_validation import read_all_streams, data_validation_task
from linker import create_symlinks


Expand All @@ -21,15 +21,17 @@ def end_of_run_workflow(stop_doc):
linker_task = create_symlinks.submit(uid)
logger.info("Launched linker task")

validation_task = read_all_streams.submit("cms", uid)
logger.info("Launched validation task")
read_streams_task = read_all_streams.submit("cms", uid)
validation_task = data_validation_task.submit(uid)
logger.info("Launched validation tasks")

# analysis_task = run_analysis(raw_ref=uid)
# logger.info("Launched analysis task")

# Wait for all tasks to comple
logger.info("Waiting for tasks to complete")
linker_task.result()
read_streams_task.result()
validation_task.result()
# analysis_task.result()
log_completion()
2 changes: 1 addition & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ platforms = ["linux-64"]

[dependencies]
prefect = "3.*"
tiled-client = ">=0.2.3"
tiled = ">=0.2.3"
python = "<3.12"
bluesky-tiled-plugins = ">=2"
prefect-docker = "*"
Expand Down