From 4a2999fef085d5136434295af6ea3e9c5d7d8291 Mon Sep 17 00:00:00 2001 From: Tim Fisher Date: Mon, 18 Jun 2018 13:59:28 -0500 Subject: [PATCH] Issue #12189 Added mechanism to propagate tripped QC test flags to all affected parameters --- RELEASE_NOTES.md | 9 +- util/metadata_service/stream.py | 24 +++- util/stream_request.py | 212 +++++++++++++++++++++++++++++++- 3 files changed, 239 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 29cff76fc..e662afd37 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,13 @@ # Stream Engine -# Development Release 1.6.0 2018-06-12 +# Development Release 1.6.0 2018-06-18 + +Issue #12189 - Add mechanism to propagate tripped QC test flags to all affected parameters +- Modified StreamRequest._run_qc to recognize failed QC tests for a parameter in a dataset, + determine related and nearby sensors and affected streams, and propagate those failures to + the corresponding particles in the affected parameters in the appropriate datasets +- Modified stream metadata service to provide calls to obtain streams for a reference + designator and to obtain reference designators and streams for a subsite and node Issue #13299 - Asychronous download of specific parameters to CSV and NetCDF is Inconsistent - Implemented parameter filtering as part of CVS packaging of data requests. diff --git a/util/metadata_service/stream.py b/util/metadata_service/stream.py index d3614e761..8a51b866f 100644 --- a/util/metadata_service/stream.py +++ b/util/metadata_service/stream.py @@ -10,10 +10,30 @@ @log_timing(_log) +def _get_stream_metadata_list(): + return metadata_service_api.get_stream_metadata_records() + + def _get_stream_metadata(): - stream_metadata_record_list = metadata_service_api.get_stream_metadata_records() return [_RecordInfo(method=rec['method'], stream=rec['stream'], **rec['referenceDesignator']) - for rec in stream_metadata_record_list] + for rec in _get_stream_metadata_list()] + + +def get_refdes_streams_for_subsite_node(subsite, node): + return [(Dict['referenceDesignator']['subsite']+'-'+Dict['referenceDesignator']['node'] + + '-'+Dict['referenceDesignator']['sensor'], Dict['stream']) for Dict in _get_stream_metadata_list() + if Dict['referenceDesignator']['subsite'] == subsite and Dict['referenceDesignator']['node'] == node] + + +def get_streams_for_subsite_node_sensor(subsite, node, sensor): + return [Dict['stream'] for Dict in _get_stream_metadata_list() + if Dict['referenceDesignator']['subsite'] == subsite and Dict['referenceDesignator']['node'] == node + and Dict['referenceDesignator']['sensor'] == sensor] + + +def get_streams_for_refdes(refdes): + subsite, node, sensor = refdes.split('-', 2) + return get_streams_for_subsite_node_sensor(subsite, node, sensor) @timed_cache(engine.app.config['METADATA_CACHE_SECONDS']) diff --git a/util/stream_request.py b/util/stream_request.py index c4ba98dc5..342769685 100644 --- a/util/stream_request.py +++ b/util/stream_request.py @@ -6,16 +6,29 @@ import util.provenance_metadata_store from util.annotation import AnnotationStore from engine import app -from ooi_data.postgres.model import Parameter, Stream, NominalDepth +from ooi_data.postgres.model import NominalDepth, Parameter, ParameterFunction, Stream from util.asset_management import AssetManagement from util.cass import fetch_l0_provenance from util.common import log_timing, StreamEngineException, StreamKey, MissingDataException, read_size_config -from util.metadata_service import build_stream_dictionary, get_available_time_range +from util.metadata_service import build_stream_dictionary, get_available_time_range, get_streams_for_refdes, \ + get_refdes_streams_for_subsite_node from util.qc_executor import QcExecutor from util.stream_dataset import StreamDataset +from numbers import Number log = logging.getLogger() +CC = 'CC' +DATA_QC_PROPAGATE_FLAGS = 'dataqc_propagateflags' +DPI = 'dpi_' +NEARBY = 'N' +PD = 'PD' +QC_EXECUTED = '_qc_executed' +QC_RESULTS = '_qc_results' +RELATED = 'R' +# these tests were deemed not ready for propagation by the Data Team +QC_TEST_FAIL_SKIP_PROPAGATE = ('dataqc_gradienttest', 'dataqc_localrangetest', 'dataqc_polytrendtest') + PRESSURE_DPI = app.config.get('PRESSURE_DPI') GPS_STREAM_ID = app.config.get('GPS_STREAM_ID') LATITUDE_PARAM_ID = app.config.get('LATITUDE_PARAM_ID') @@ -64,6 +77,9 @@ def __init__(self, stream_key, parameters, time_range, uflags, qc_parameters=Non self.datasets = {} self.external_includes = {} self.annotation_store = AnnotationStore() + self.qc_affects = {} + self.qc_propagate_flag = None + self.qc_test_flag_map = None self._initialize() @@ -193,11 +209,129 @@ def insert_provenance(self): @log_timing(log) def _run_qc(self): + propagate_failures = {} # execute any QC for sk, stream_dataset in self.datasets.iteritems(): + sk_related_refdes_streams = {} + sk_related_stream_refdes = {} + sk_nearby_refdes_streams = {} for param in sk.stream.parameters: + param_qc_executed = param.name + QC_EXECUTED + param_qc_results = param.name + QC_RESULTS for dataset in stream_dataset.datasets.itervalues(): self.qc_executor.qc_check(param, dataset) + # check for QC test failure on 1+ data particles for the parameter + if (param_qc_executed in dataset.keys()) and (param_qc_results in dataset.keys()) and \ + (dataset[param_qc_executed].values.min() > dataset[param_qc_results].values.min()): + qc_executed_values = dataset[param_qc_executed].values + qc_results_values = dataset[param_qc_results].values + # list of tuples with the position where a QC variance exists and the variance + qc_variances = [(position, value[0]-value[1]) + for position, value in enumerate(zip(qc_executed_values, qc_results_values)) + if value[0] != value[1]] + # populate these collections as needed, once per sk + if not sk_related_refdes_streams: + # capture related (by subsite, node) reference-designators and streams in separate + # maps, one where reference-designator is key and one where stream is key + for refdes, stream in get_refdes_streams_for_subsite_node(sk.subsite, sk.node): + sk_related_refdes_streams.setdefault(refdes, set()).add(stream) + sk_related_stream_refdes.setdefault(stream, set()).add(refdes) + if not sk.is_mobile: + # capture nearby (by subsite, depth nearness) reference-designators + # and streams if they are not already considered "related" + max_depth_var = self.derive_max_depth(sk.sensor) + nd = NominalDepth.get_nominal_depth(sk.subsite, sk.node, sk.sensor) + for nearby_nd in nd.get_depth_within(max_depth_var): + if not sk_related_refdes_streams.get(nearby_nd.reference_designator): + nearby_streams = set(get_streams_for_refdes(nearby_nd.reference_designator)) + if nearby_streams: + sk_nearby_refdes_streams[nearby_nd.reference_designator] = nearby_streams + + # affected_streams: map of streams (and their parameters) affected by param + affected_streams = self._get_affected_streams(param) + + # put all the related streams (from the keys of sk_related_stream_refdes) into a set + # and match those streams to affected_streams and prepare to propagate QC test failure + sk_related_streams = set(sk_related_stream_refdes.keys()) + for matched_stream in sk_related_streams.intersection(affected_streams): + # get the parameters from the matching affected_streams without param + new_matched_parameters = affected_streams.get(matched_stream, set()).difference({param}) + if not new_matched_parameters: + continue + # get the corresponding refdes for use in failure propagation + for matched_refdes in sk_related_stream_refdes.get(matched_stream): + matching_propagation = propagate_failures.get( + (RELATED, matched_refdes, matched_stream, sk.method), {}).get(param) + current_matched_parameters = matching_propagation[1] if matching_propagation else set() + matched_parameters = current_matched_parameters.union(new_matched_parameters) + if matched_parameters > current_matched_parameters: + propagate_failures.setdefault( + (RELATED, matched_refdes, matched_stream, sk.method), {})[param] = \ + (qc_variances, matched_parameters) + + # match nearby streams to affected_streams and prepare to propagate QC test failure + for review_refdes, review_streams in sk_nearby_refdes_streams.items(): + for matched_stream in review_streams.intersection(affected_streams): + new_matched_parameters = affected_streams.get(matched_stream, set()).\ + difference({param}) + if not new_matched_parameters: + continue + matching_propagation = propagate_failures.get( + (NEARBY, review_refdes, matched_stream, sk.method), {}).get(param) + current_matched_parameters = matching_propagation[1] if matching_propagation else set() + matched_parameters = current_matched_parameters.union(new_matched_parameters) + if matched_parameters > current_matched_parameters: + propagate_failures.setdefault( + (NEARBY, review_refdes, matched_stream, sk.method), {})[param] = \ + (qc_variances, matched_parameters) + + if not propagate_failures: + return + + # process all propagation failures on the tested streams and data sets + for sk, stream_dataset in self.datasets.iteritems(): + stream_parameters = set(sk.stream.parameters) + failed_nearby_params_map = propagate_failures.get((NEARBY, sk.as_three_part_refdes(), sk.stream_name, + sk.method), {}) + # make a set of the failed input parameters for the sk for matching stream_parameters + failed_nearby_parameters = set(failed_nearby_params_map.keys()) + # only propagate failures on affected parameters whose upstream parameters are not inputs to this stream + for matched_parameter in failed_nearby_parameters.difference(stream_parameters): + propagate_input = failed_nearby_params_map[matched_parameter] + qc_variances = propagate_input[0] + affected_parameters = propagate_input[1] + for propagate_affected in affected_parameters.intersection(stream_parameters): + param_qc_executed = propagate_affected.name + QC_EXECUTED + for dataset in stream_dataset.datasets.itervalues(): + if param_qc_executed in dataset.keys(): + qc_executed_values = dataset[param_qc_executed].values + for position, variance in qc_variances: + # propagate the failure on this particle if propagating tests failed + failed_tests_to_propagate = [(i, self.qc_test_flag_map[i][0]) + for i in self.qc_test_flag_map.keys() + if i & variance and self.qc_test_flag_map[i][1]] + if failed_tests_to_propagate: + qc_executed_values[position] = \ + qc_executed_values[position] | self.qc_propagate_flag + + failed_related_parameters = propagate_failures.get((RELATED, sk.as_three_part_refdes(), sk.stream_name, + sk.method), {}) + for propagate_input in failed_related_parameters.values(): + qc_variances = propagate_input[0] + affected_parameters = propagate_input[1] + for propagate_affected in affected_parameters.intersection(stream_parameters): + param_qc_executed = propagate_affected.name + QC_EXECUTED + for dataset in stream_dataset.datasets.itervalues(): + if param_qc_executed in dataset.keys(): + qc_executed_values = dataset[param_qc_executed].values + for position, variance in qc_variances: + # propagate the failure on this particle if propagating tests failed + failed_tests_to_propagate = [(i, self.qc_test_flag_map[i][0]) + for i in self.qc_test_flag_map.keys() + if i & variance and self.qc_test_flag_map[i][1]] + if failed_tests_to_propagate: + qc_executed_values[position] = \ + qc_executed_values[position] | self.qc_propagate_flag # noinspection PyTypeChecker def _insert_provenance(self): @@ -225,6 +359,30 @@ def insert_annotations(self): for stream_key in self.stream_parameters: self.annotation_store.add_query_annotations(stream_key, self.time_range) + # builds a map of all streams that are mapped to any parameters that can have affects + def _get_affected_streams(self, param): + affected_params = {param} + # visit parameters that can be affected by this one + params_to_visit = self.qc_affects.get(param, []) + + while params_to_visit: + visit_param = params_to_visit.pop() + affected_params.add(visit_param) + # capture and prepare to visit any parameters this one can affect + for review_param in self.qc_affects.get(visit_param, []): + if review_param in affected_params: + continue + affected_params.add(review_param) + params_to_visit.add(review_param) + + # build streams map keyed by stream tied to affected parameters + streams = {} + for affected_param in affected_params: + for stream in affected_param.streams: + streams.setdefault(stream.name, set()).add(affected_param) + + return streams + def _exclude_flagged_data(self): """ Exclude data from datasets based on annotations @@ -363,6 +521,50 @@ def _initialize(self): log.debug('<%s> primary stream internal needs: %r', self.request_id, primary_internals) self.stream_parameters[self.stream_key] = primary_internals + # The decimal value for the dataqc_propagateflags entry in the parameter_function table + self.qc_propagate_flag = [int(param_func.qc_flag, 2) + for param_func in ParameterFunction.query.filter_by(function_type_id=3).all() + if DATA_QC_PROPAGATE_FLAGS == param_func.name][0] + + # Map of decimal value for the 6 QC tests and a tuple of the name and a boolean + # indicating the test is considered one on which test failures are propagated + self.qc_test_flag_map = dict([(int(param_func.qc_flag, 2), + (param_func.name, param_func.name not in QC_TEST_FAIL_SKIP_PROPAGATE)) + for param_func in ParameterFunction.query.filter_by(function_type_id=3).all() + if DATA_QC_PROPAGATE_FLAGS != param_func.name]) + + # map key=Parameter.data_product_identifier,value=set of corresponding Parameters + # build the map from each Parameter with a populated data_product_identifier + dpi = {} + for p in Parameter.query: + if p.data_product_identifier: + dpi.setdefault(p.data_product_identifier, set()).add(p) + + # review each Parameter having a populated parameter_function_map + # qc_affects key=the referenced Parameter (via the "dpi_" or "PD" value) + # value=the Parameters being reviewed + for p in Parameter.query: + if p.is_function: + for values in p.parameter_function_map.values(): + # force "values" of a single value to be a list + if not isinstance(values, list): + values = [values] + for value in values: + # values that are numbers or calibration coefficients affect no other parameters + if isinstance(value, Number) or value.startswith(CC): + continue + # DPI values reference parameters that are affected + # the key is each Parameter found in the dpi map via the "dpi_" value + if value.startswith(DPI): + dpi_value = value.split(DPI)[-1] + for param in dpi.get(dpi_value, []): + self.qc_affects.setdefault(param, set()).add(p) + # PD numbers reference parameters that are affected + # the key is the Parameter found after the "PD" literal + elif PD in value: + param = Parameter.query.get(value.split(PD)[-1]) + self.qc_affects.setdefault(param, set()).add(p) + if self.execute_dpa: # Identify external parameters needed to support this query external_to_process = self.stream_key.stream.needs_external(internal_requested) @@ -457,7 +659,7 @@ def find_stream(self, stream_key, poss_params, stream=None): if not stream_key.is_mobile: nominal_depth = NominalDepth.get_nominal_depth(subsite, node, sensor) if nominal_depth is not None: - max_depth_var = MAX_DEPTH_VARIANCE_METBK if 'METBK' in sensor else MAX_DEPTH_VARIANCE + max_depth_var = self.derive_max_depth(sensor) nearby = nominal_depth.get_depth_within(max_depth_var) for param, search_streams in param_streams: sk = self._find_stream_from_list(stream_key, search_streams, nearby, stream_dictionary) @@ -562,3 +764,7 @@ def compute_request_size(self, size_estimates=SIZE_ESTIMATES): @staticmethod def compute_request_time(file_size): return max(MINIMUM_REPORTED_TIME, file_size * SECONDS_PER_BYTE) + + @staticmethod + def derive_max_depth(sensor): + return MAX_DEPTH_VARIANCE_METBK if 'METBK' in sensor else MAX_DEPTH_VARIANCE