diff --git a/src/dve/core_engine/backends/base/auditing.py b/src/dve/core_engine/backends/base/auditing.py index 37b1774..596a22a 100644 --- a/src/dve/core_engine/backends/base/auditing.py +++ b/src/dve/core_engine/backends/base/auditing.py @@ -31,6 +31,7 @@ QueueType, SubmissionResult, ) +from dve.pipeline.utils import SubmissionStatus AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name @@ -329,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs): ProcessingStatusRecord( submission_id=submission_id, processing_status="business_rules", - submission_result="failed" if failed else None, + submission_result="validation_failed" if failed else None, **kwargs, ) for submission_id, failed in submissions @@ -379,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs): """Update submission processing_status to failed.""" recs = [ ProcessingStatusRecord( - submission_id=submission_id, processing_status="failed", **kwargs + submission_id=submission_id, + processing_status="failed", + submission_result="processing_failed", + **kwargs ) for submission_id in submissions ] @@ -493,6 +497,27 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt ) except StopIteration: return None + def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]: + """Get the latest submission status for a submission""" + + try: + processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records( + self._processing_status.get_most_recent_records( + order_criteria=[OrderCriteria("time_updated", True)], + pre_filter_criteria=[FilterCriteria("submission_id", submission_id)] + ))) + except StopIteration: + return None + sub_status = SubmissionStatus() + sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id) + if processing_rec.submission_result == "processing_failed": + sub_status.processing_failed = True + if processing_rec.submission_result == "validation_failed": + sub_status.validation_failed = True + if sub_stats_rec: + sub_status.number_of_records = sub_stats_rec.record_count + + return sub_status def __enter__(self): """Use audit table as context manager""" diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index 1a993e9..64a9d11 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -15,7 +15,6 @@ import dve.parser.file_handling as fh from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type from dve.core_engine.type_hints import URI, Messages -from dve.reporting.error_report import conditional_cast # We need to rely on a Python typing implementation detail in Python <= 3.7. if sys.version_info[:2] <= (3, 7): @@ -179,38 +178,3 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType: if polars_type: return polars_type raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") - - -def dump_errors( - working_folder: URI, - step_name: str, - messages: Messages, - key_fields: Optional[dict[str, list[str]]] = None, -): - """Write out to disk captured feedback error messages.""" - if not working_folder: - raise AttributeError("processed files path not passed") - - if not key_fields: - key_fields = {} - - errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json") - processed = [] - - for message in messages: - primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) - error = message.to_dict( - key_field=primary_keys, - value_separator=" -- ", - max_number_of_values=10, - record_converter=None, - ) - error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") - processed.append(error) - - with fh.open_stream(errors, "a") as f: - json.dump( - processed, - f, - default=str, - ) diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index d2faaf5..1e68d79 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -25,18 +25,12 @@ def __init__( def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) - - def to_feedback_message(self) -> FeedbackMessage: - "Convert to feedback message to write to json file" - return FeedbackMessage( - entity=None, - record=None, - failure_type="integrity", - error_type="processing", - error_location="Whole File", - error_message=self.error_message, - ) - + + @classmethod + def from_exception(cls, exc:Exception): + return cls(error_message = repr(exc), + entities=None, + messages=[]) class EntityTypeMismatch(TypeError): """An exception emitted if entity type outputs from two collaborative objects are different.""" diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py index 991f46f..0be3763 100644 --- a/src/dve/core_engine/type_hints.py +++ b/src/dve/core_engine/type_hints.py @@ -236,7 +236,7 @@ PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus))) """List of all possible DVE submission statuses""" -SubmissionResult = Literal["success", "failed", "failed_xml_generation", "archived"] +SubmissionResult = Literal["success", "validation_failed", "archived", "processing_failed"] """Allowed DVE submission results""" SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult))) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index d47b4bd..06f2727 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -2,17 +2,18 @@ from typing import Optional from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet -from dve.core_engine.backends.utilities import dump_errors +from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed from dve.pipeline.duckdb_pipeline import DDBDVEPipeline from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh +from dve.reporting.utils import dump_processing_errors +@duckdb_get_entity_count +@duckdb_write_parquet class FoundryDDBPipeline(DDBDVEPipeline): - """DuckDB pipeline for running on Foundry Platform. - Polymorphed to allow for exception handling when processing - single files sequentially through services.""" + """DuckDB pipeline for running on Foundry Platform""" def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" @@ -29,66 +30,106 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: def file_transformation( self, submission_info: SubmissionInfo - ) -> SubmissionInfo | dict[str, str]: + ) -> tuple[SubmissionInfo, SubmissionStatus]: try: return super().file_transformation(submission_info) except Exception as exc: # pylint: disable=W0718 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - return submission_info.dict() + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + [CriticalProcessingError.from_exception(exc)] + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, SubmissionStatus(processing_failed=True) - def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]: + def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo | SubmissionStatus]: try: - return super().apply_data_contract(submission_info) + return super().apply_data_contract(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply data contract raised exception: {exc}") self._logger.exception(exc) - return submission_info, True + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "contract", + [CriticalProcessingError.from_exception(exc)] + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, SubmissionStatus(processing_failed=True) - def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): + def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None): try: - return super().apply_business_rules(submission_info, failed) + return super().apply_business_rules(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply business rules raised exception: {exc}") self._logger.exception(exc) - return submission_info, SubmissionStatus(failed=True) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "business_rules", + [CriticalProcessingError.from_exception(exc)] + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, SubmissionStatus(processing_failed=True) + + def error_report(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None): + try: + return super().error_report(submission_info, submission_status) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Error reports raised exception: {exc}") + self._logger.exception(exc) + sub_stats = None + report_uri = None + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "error_report", + [CriticalProcessingError.from_exception(exc)] + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, submission_status, sub_stats, report_uri - def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]: + def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]: """Sequential single submission pipeline runner""" try: sub_id: str = submission_info.submission_id + report_uri = None self._audit_tables.add_new_submissions(submissions=[submission_info]) self._audit_tables.mark_transform(submission_ids=[sub_id]) - sub_info = self.file_transformation(submission_info=submission_info) - if isinstance(sub_info, SubmissionInfo): + sub_info, sub_status = self.file_transformation(submission_info=submission_info) + if not (sub_status.validation_failed or sub_status.processing_failed): self._audit_tables.mark_data_contract(submission_ids=[sub_id]) - sub_info, failed = self.apply_data_contract(submission_info=submission_info) - self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) + sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status) + self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)]) sub_info, sub_status = self.apply_business_rules( - submission_info=submission_info, failed=failed + submission_info=submission_info, submission_status=sub_status ) - else: - sub_status = SubmissionStatus(failed=True) - self._audit_tables.mark_error_report( - submissions=[(sub_id, sub_status.submission_result)] - ) - sub_info, sub_status, sub_stats, report_uri = self.error_report( - submission_info=submission_info, status=sub_status - ) - self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + + if not sub_status.processing_failed: + self._audit_tables.mark_error_report( + submissions=[(sub_id, sub_status.submission_result)] + ) + sub_info, sub_status, sub_stats, report_uri = self.error_report( + submission_info=submission_info, submission_status=sub_status + ) + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) except Exception as err: # pylint: disable=W0718 self._logger.error( f"During processing of submission_id: {sub_id}, the following exception was raised: {err}" ) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "run_pipeline", + [CriticalProcessingError.from_exception(err)] + ) self._audit_tables.mark_failed(submissions=[sub_id]) finally: audit_files_uri = self.persist_audit_records(submission_info=submission_info) - return ( - ( - None - if sub_status.failed - else fh.joinuri(self.processed_files_path, sub_id, "business_rules") - ), - report_uri, - audit_files_uri, - ) + return ( + ( + None + if (sub_status.validation_failed or sub_status.processing_failed) + else fh.joinuri(self.processed_files_path, sub_id, "business_rules") + ), + report_uri if report_uri else None, + audit_files_uri, + ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 15008a9..6270bdd 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -1,5 +1,6 @@ # pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long """Generic Pipeline object to define how DVE should be interacted with.""" +from itertools import starmap import json import re from collections import defaultdict @@ -28,13 +29,14 @@ ) from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType -from dve.core_engine.backends.utilities import dump_errors, stringify_model +from dve.core_engine.backends.utilities import stringify_model from dve.core_engine.loggers import get_logger from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord from dve.core_engine.type_hints import URI, Failed, FileURI, InfoURI from dve.parser import file_handling as fh from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates +from dve.reporting.utils import dump_feedback_errors, dump_processing_errors PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = ( FileNotFoundError, # type: ignore @@ -236,6 +238,11 @@ def audit_received_file_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"audit_received_file raised exception: {exc}") self._logger.exception(exc) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "audit_received", + [CriticalProcessingError.from_exception(exc)] + ) # sub_info should at least # be populated with file_name and file_extension failed.append( @@ -266,12 +273,13 @@ def audit_received_file_step( def file_transformation( self, submission_info: SubmissionInfo - ) -> Union[SubmissionInfo, dict[str, str]]: + ) -> tuple[SubmissionInfo, SubmissionStatus]: """Transform a file from its original format into a 'stringified' parquet file""" if not self.processed_files_path: raise AttributeError("processed files path not provided") errors: list[FeedbackMessage] = [] + submission_status: SubmissionStatus = SubmissionStatus() submission_file_uri: URI = fh.joinuri( self.processed_files_path, submission_info.submission_id, @@ -287,29 +295,19 @@ def file_transformation( self._logger.exception(exc) errors.extend(exc.messages) - except BackendError as exc: # pylint: disable=broad-except - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) - errors.extend([ - CriticalProcessingError( - entities=None, - error_message=repr(exc), - messages=[], - ).to_feedback_message() - ]) - if errors: - dump_errors( + dump_feedback_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "file_transformation", errors, ) - return submission_info.dict() - return submission_info + submission_status.validation_failed = True + return submission_info, submission_status + return submission_info, submission_status def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] - ) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]: + ) -> tuple[list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]]: """Step to transform files from their original format into parquet files""" file_transform_futures: list[tuple[SubmissionInfo, Future]] = [] @@ -318,15 +316,21 @@ def file_transformation_step( future = pool.submit(self.file_transformation, submission_info) file_transform_futures.append((submission_info, future)) - success: list[SubmissionInfo] = [] - failed: list[SubmissionInfo] = [] - failed_processing: list[SubmissionInfo] = [] + success: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for sub_info, future in file_transform_futures: try: # sub_info passed here either return SubInfo or dict. If SubInfo, not actually # modified in anyway during this step. - result = future.result() + submission_info: SubmissionInfo + submission_status: SubmissionStatus + submission_info, submission_status = future.result() + if submission_status.validation_failed: + failed.append((submission_info, submission_status)) + else: + success.append((submission_info, submission_status)) except AttributeError as exc: self._logger.error(f"File transformation raised exception: {exc}") raise exc @@ -338,27 +342,27 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "file_transformation", + [CriticalProcessingError.from_exception(exc)] + ) + submission_status = SubmissionStatus(processing_failed=True) + failed_processing.append((sub_info, submission_status)) continue - if isinstance(result, SubmissionInfo): - success.append(sub_info) - else: - failed.append(sub_info) - if len(success) > 0: self._audit_tables.mark_data_contract( - list(map(lambda x: x.submission_id, success)), job_run_id=self.job_run_id + list(starmap(lambda x, _: x.submission_id, success)), job_run_id=self.job_run_id ) if len(failed) > 0: self._audit_tables.mark_error_report( list( - map( - lambda x: ( + starmap( + lambda x, _: ( x.submission_id, - "failed", + "validation_failed", ), failed, ) @@ -368,14 +372,15 @@ def file_transformation_step( if len(failed_processing) > 0: self._audit_tables.mark_failed( - [si.submission_id for si in failed_processing], job_run_id=self.job_run_id + [si.submission_id for si, _ in failed_processing], job_run_id=self.job_run_id ) return success, failed - def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo, Failed]: + def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" - + if not submission_status: + submission_status = self._audit_tables.get_submission_status(submission_info.submission_id) if not self.processed_files_path: raise AttributeError("processed files path not provided") @@ -403,33 +408,39 @@ def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[Submissi key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if messages: - dump_errors( + dump_feedback_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "contract", messages, key_fields=key_fields, ) - failed = any(not rule_message.is_informational for rule_message in messages) + validation_failed = any(not rule_message.is_informational for rule_message in messages) + + if validation_failed: + submission_status.validation_failed = True - return submission_info, failed + return submission_info, submission_status def data_contract_step( - self, pool: Executor, file_transform_results: list[SubmissionInfo] - ) -> tuple[list[tuple[SubmissionInfo, Failed]], list[SubmissionInfo]]: + self, pool: Executor, file_transform_results: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]] + ) -> tuple[list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]]: """Step to validate the types of an untyped (stringly typed) parquet file""" - processed_files: list[tuple[SubmissionInfo, Failed]] = [] - failed_processing: list[SubmissionInfo] = [] - dc_futures: list[tuple[SubmissionInfo, Future]] = [] + processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] - for info in file_transform_results: - dc_futures.append((info, pool.submit(self.apply_data_contract, info))) + for info, sub_status in file_transform_results: + sub_status = ( + sub_status if sub_status + else self._audit_tables.get_submission_status(info.submission_id)) + dc_futures.append((info, sub_status, pool.submit(self.apply_data_contract, info, sub_status))) - for sub_info, future in dc_futures: + for sub_info, sub_status, future in dc_futures: try: submission_info: SubmissionInfo - failed: Failed - submission_info, failed = future.result() + submission_status: SubmissionStatus + submission_info, submission_status = future.result() except AttributeError as exc: self._logger.error(f"Data Contract raised exception: {exc}") raise exc @@ -441,33 +452,41 @@ def data_contract_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Data Contract raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "contract", + [CriticalProcessingError.from_exception(exc)] + ) + sub_status.processing_failed=True + failed_processing.append((sub_info, sub_status)) continue - processed_files.append((submission_info, failed)) + processed_files.append((submission_info, submission_status)) if len(processed_files) > 0: self._audit_tables.mark_business_rules( [ - (sub_info.submission_id, failed) # type: ignore - for sub_info, failed in processed_files + (sub_info.submission_id, submission_status.validation_failed) # type: ignore + for sub_info, submission_status in processed_files ], job_run_id=self.job_run_id, ) if len(failed_processing) > 0: self._audit_tables.mark_failed( - [sub_info.submission_id for sub_info in failed_processing], + [sub_info.submission_id for sub_info, _ in failed_processing], job_run_id=self.job_run_id, ) return processed_files, failed_processing - def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): + def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None): """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ + if not submission_status: + submission_status = self._audit_tables.get_submission_status(submission_info.submission_id) + if not self.rules_path: raise AttributeError("business rules path not provided.") @@ -506,14 +525,17 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if rule_messages: - dump_errors( + dump_feedback_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "business_rules", rule_messages, key_fields, ) - failed = any(not rule_message.is_informational for rule_message in rule_messages) or failed + submission_status.validation_failed = ( + any(not rule_message.is_informational for rule_message in rule_messages) + or submission_status.validation_failed + ) for entity_name, entity in entity_manager.entities.items(): projected = self._step_implementations.write_parquet( # type: ignore @@ -529,47 +551,53 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): projected ) - status = SubmissionStatus( - failed=failed, - number_of_records=self.get_entity_count( + submission_status.number_of_records = self.get_entity_count( entity=entity_manager.entities[ f"""Original{rules.global_variables.get( 'entity', submission_info.dataset_id)}""" ] - ), - ) + ) - return submission_info, status + return submission_info, submission_status def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, Failed]], + files: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]], ) -> tuple[ list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]], - list[SubmissionInfo], + list[tuple[SubmissionInfo, SubmissionStatus]], ]: """Step to apply business rules (Step impl) to a typed parquet file""" - future_files: list[tuple[SubmissionInfo, Future]] = [] + future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] - for submission_info, submission_failed in files: + for submission_info, submission_status in files: + submission_status = ( + submission_status if submission_status + else self._audit_tables.get_submission_status(submission_info.submission_id)) future_files.append( ( submission_info, - pool.submit(self.apply_business_rules, submission_info, submission_failed), + submission_status, + pool.submit(self.apply_business_rules, submission_info, submission_status), ) ) - failed_processing: list[SubmissionInfo] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] unsucessful_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] successful_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] - for sub_info, future in future_files: - status: SubmissionStatus + for sub_info, sub_status, future in future_files: try: - submission_info, status = future.result() + submission_info: SubmissionInfo + submission_status: SubmissionStatus + submission_info, submission_status = future.result() + if submission_status.validation_failed: + unsucessful_files.append((submission_info, submission_status)) + else: + successful_files.append((submission_info, submission_status)) except AttributeError as exc: self._logger.error(f"Business Rules raised exception: {exc}") raise exc @@ -581,14 +609,16 @@ def business_rule_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Business Rules raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "business_rules", + [CriticalProcessingError.from_exception(exc)] + ) + sub_status.processing_failed = True + failed_processing.append((sub_info, sub_status)) continue - if status.failed: - unsucessful_files.append((submission_info, status)) - else: - successful_files.append((submission_info, status)) + if len(unsucessful_files + successful_files) > 0: self._audit_tables.mark_error_report( @@ -601,7 +631,7 @@ def business_rule_step( if len(failed_processing) > 0: self._audit_tables.mark_failed( - [si.submission_id for si in failed_processing], job_run_id=self.job_run_id + [si.submission_id for si, _ in failed_processing], job_run_id=self.job_run_id ) return successful_files, unsucessful_files, failed_processing @@ -657,14 +687,20 @@ def _get_error_dataframes(self, submission_id: str): return errors_df, aggregates - def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus): + def error_report(self, + submission_info: SubmissionInfo, + submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]]: """Creates the error reports given a submission info and submission status""" + + if not submission_status: + submission_status = self._audit_tables.get_submission_status(submission_info.submission_id) + if not self.processed_files_path: raise AttributeError("processed files path not provided") errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id) - if not status.number_of_records: + if not submission_status.number_of_records: sub_stats = None else: err_types = { @@ -675,7 +711,7 @@ def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus } sub_stats = SubmissionStatisticsRecord( submission_id=submission_info.submission_id, - record_count=status.number_of_records, + record_count=submission_status.number_of_records, number_record_rejections=err_types.get("Submission Failure", 0), number_warnings=err_types.get("Warning", 0), ) @@ -703,35 +739,40 @@ def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) - return submission_info, status, sub_stats, report_uri + return submission_info, submission_status, sub_stats, report_uri def error_report_step( self, pool: Executor, - processed: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), - failed_file_transformation: Iterable[SubmissionInfo] = tuple(), + processed: Iterable[tuple[SubmissionInfo, Optional[SubmissionStatus]]] = tuple(), + failed_file_transformation: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), ) -> list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] ]: """Step to produce error reports takes processed files and files that failed file transformation """ - futures: list[tuple[SubmissionInfo, Future]] = [] + futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] reports: list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] ] = [] - failed_processing: list[SubmissionInfo] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for info, status in processed: - futures.append((info, pool.submit(self.error_report, info, status))) + status = ( + status if status + else self._audit_tables.get_submission_status(info.submission_id) + ) + futures.append((info, status, pool.submit(self.error_report, info, status))) - for info_dict in failed_file_transformation: - status = SubmissionStatus(True, 0) - futures.append((info_dict, pool.submit(self.error_report, info_dict, status))) + for info_dict, status in failed_file_transformation: + status.number_of_records = 0 + futures.append((info_dict, status, pool.submit(self.error_report, info_dict, status))) - for sub_info, future in futures: + for sub_info, status, future in futures: try: - submission_info, status, stats, feedback_uri = future.result() + submission_info, submission_status, submission_stats, feedback_uri = future.result() + reports.append((submission_info, submission_status, submission_stats, feedback_uri)) except AttributeError as exc: self._logger.error(f"Error reports raised exception: {exc}") raise exc @@ -743,9 +784,15 @@ def error_report_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Error reports raised exception: {exc}") self._logger.exception(exc) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "error_report", + [CriticalProcessingError.from_exception(exc)] + ) + status.processing_failed = True + failed_processing.append((sub_info, status)) continue - reports.append((submission_info, status, stats, feedback_uri)) + if reports: self._audit_tables.mark_finished( @@ -761,7 +808,7 @@ def error_report_step( if failed_processing: self._audit_tables.mark_failed( - [submission_info.submission_id for submission_info in failed_processing], + [submission_info.submission_id for submission_info, _ in failed_processing], job_run_id=self.job_run_id, ) diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index ae77cfd..57f112c 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -13,7 +13,7 @@ from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed from dve.pipeline.pipeline import BaseDVEPipeline -from dve.pipeline.utils import unpersist_all_rdds +from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds # pylint: disable=abstract-method @@ -56,7 +56,7 @@ def write_file_to_parquet( # type: ignore def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, Failed]], + files: list[tuple[SubmissionInfo, SubmissionStatus]], ): successful_files, unsucessful_files, failed_processing = super().business_rule_step( pool, files diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index 4fa9a02..e7d0d64 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -73,23 +73,18 @@ class SubmissionStatus: # _logger = get_logger("submission_status") - def __init__(self, failed: bool, number_of_records: Optional[int] = None): - self._failed = failed - self._number_of_records = number_of_records - - @property - def failed(self): - """Whether the submission was successfully processed.""" - return self._failed - - @property - def number_of_records(self) -> Union[int, None]: - """Number of records within a submission.""" - return self._number_of_records - + def __init__(self, + validation_failed: bool = False, + number_of_records: Optional[int] = None, + processing_failed: bool = False): + self.validation_failed = validation_failed + self.number_of_records = number_of_records + self.processing_failed = processing_failed + @property def submission_result(self) -> SubmissionResult: - """The result of the submission, either succes, failed or failed_xml_generation""" - if self.failed: - return "failed" - return "success" + if self.processing_failed: + return "processing_failed" + if self.validation_failed: + return "validation_failed" + return "success" \ No newline at end of file diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py new file mode 100644 index 0000000..0bac248 --- /dev/null +++ b/src/dve/reporting/utils.py @@ -0,0 +1,69 @@ +import json +from typing import Optional +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.type_hints import URI, Messages +import dve.parser.file_handling as fh +from dve.reporting.error_report import conditional_cast + +def dump_feedback_errors( + working_folder: URI, + step_name: str, + messages: Messages, + key_fields: Optional[dict[str, list[str]]] = None, +): + """Write out captured feedback error messages.""" + if not working_folder: + raise AttributeError("processed files path not passed") + + if not key_fields: + key_fields = {} + + errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json") + processed = [] + + for message in messages: + primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) + error = message.to_dict( + key_field=primary_keys, + value_separator=" -- ", + max_number_of_values=10, + record_converter=None, + ) + error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") + processed.append(error) + + with fh.open_stream(errors, "a") as f: + json.dump( + processed, + f, + default=str, + ) + +def dump_processing_errors( + working_folder: URI, + step_name: str, + errors: list[CriticalProcessingError] +): + """Write out critical processing errors""" + if not working_folder: + raise AttributeError("processed files path not passed") + if not step_name: + raise AttributeError("step name not passed") + if not errors: + raise AttributeError("errors list not passed") + + error_file: URI = fh.joinuri(working_folder, "errors", f"processing_errors.json") + processed = [] + + for error in errors: + processed.append({"step_name": step_name, + "error_location": "processing", + "error_level": "integrity", + "error_message": error.error_message}) + + with fh.open_stream(error_file, "a") as f: + json.dump( + processed, + f, + default=str, + ) \ No newline at end of file diff --git a/tests/features/books.feature b/tests/features/books.feature index 6313a07..9bc0611 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -64,9 +64,8 @@ Feature: Pipeline tests using the books dataset And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase - Then the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced + Then the latest audit record for the submission is marked with processing status failed + # TODO - handle above within the stream xml reader - specific Scenario: Handle a file that fails XSD validation (duckdb) Given I submit the books file books_xsd_fail.xml for processing diff --git a/tests/features/planets.feature b/tests/features/planets.feature index 321f047..c469a92 100644 --- a/tests/features/planets.feature +++ b/tests/features/planets.feature @@ -42,9 +42,7 @@ Feature: Pipeline tests using the planets dataset And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase - Then the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced + Then the latest audit record for the submission is marked with processing status failed Scenario: Handle a file with duplicated extension provided (spark) Given I submit the planets file planets.csv.csv for processing diff --git a/tests/features/steps/steps_pipeline.py b/tests/features/steps/steps_pipeline.py index 3dbc2b2..fda37bc 100644 --- a/tests/features/steps/steps_pipeline.py +++ b/tests/features/steps/steps_pipeline.py @@ -108,15 +108,18 @@ def run_file_transformation_step(context: Context): pool=ThreadPoolExecutor(1), submissions_to_process=[ctxt.get_submission_info(context)] ) if failed: - ctxt.set_failed_file_transformation(context, failed[0]) + ctxt.set_failed_file_transformation(context, failed[0][0]) + @when("I run the data contract phase") def apply_data_contract_with_error(context: Context): """Apply the data contract stage""" pipeline = ctxt.get_pipeline(context) + sub_info = ctxt.get_submission_info(context) + sub_status = pipeline._audit_tables.get_submission_status(sub_info.submission_id) pipeline.data_contract_step( - pool=ThreadPoolExecutor(1), file_transform_results=[ctxt.get_submission_info(context)] + pool=ThreadPoolExecutor(1), file_transform_results=[(sub_info, sub_status)] ) @@ -126,10 +129,9 @@ def apply_business_rules(context: Context): pipeline = ctxt.get_pipeline(context) sub_info = ctxt.get_submission_info(context) - proc_rec = pipeline._audit_tables.get_current_processing_info(sub_info.submission_id) - # add row count here so sub stats calculated + sub_status = pipeline._audit_tables.get_submission_status(sub_info.submission_id) success, failed, _ = pipeline.business_rule_step( - pool=ThreadPoolExecutor(1), files=[(sub_info, proc_rec.submission_result == "failed")] + pool=ThreadPoolExecutor(1), files=[(sub_info, sub_status)] ) assert len(success + failed) == 1 sub_status = (success + failed)[0][1] @@ -142,19 +144,28 @@ def create_error_report(context: Context): pipeline = ctxt.get_pipeline(context) try: - failed_file_transformation = [ctxt.get_failed_file_transformation(context)] - processed = [] + failed_ft = ctxt.get_failed_file_transformation(context) + sub_status = pipeline._audit_tables.get_submission_status(failed_ft.submission_id) + + pipeline.error_report_step( + pool=ThreadPoolExecutor(1), + processed=[], + failed_file_transformation=[(failed_ft, sub_status)] + + ) + except AttributeError: sub_info = ctxt.get_submission_info(context) processed = [(sub_info, ctxt.get_submission_status(context))] - failed_file_transformation = [] - - pipeline.error_report_step( + pipeline.error_report_step( pool=ThreadPoolExecutor(1), processed=processed, - failed_file_transformation=failed_file_transformation, + failed_file_transformation=[] + ) + + @then("there are {expected_num_errors:d} record rejections from the {service} phase") @then("there is {expected_num_errors:d} record rejection from the {service} phase") diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py index 989a928..b0598eb 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py @@ -10,7 +10,8 @@ from duckdb import ColumnExpression, ConstantExpression, DuckDBPyConnection from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager -from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord +from dve.pipeline.utils import SubmissionStatus from .....fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -115,14 +116,14 @@ def test_audit_table_update_status(ddb_audit_manager: DDBAuditingManager): == "business_rules" ) - ddb_audit_manager.mark_error_report([(_sub_info.submission_id, "failed")]) + ddb_audit_manager.mark_error_report([(_sub_info.submission_id, "validation_failed")]) assert ( ddb_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status == "error_report" ) - ddb_audit_manager.mark_finished([(_sub_info.submission_id, "failed")]) + ddb_audit_manager.mark_finished([(_sub_info.submission_id, "validation_failed")]) assert ( ddb_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status @@ -350,3 +351,94 @@ def test_get_error_report_submissions(ddb_audit_manager_threaded: DDBAuditingMan assert len(processed) == 2 assert len(dodgy) == 0 assert processed == expected + +def test_get_submission_status(ddb_audit_manager_threaded: DDBAuditingManager): + with ddb_audit_manager_threaded as aud: + # add four submissions + sub_1 = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_2 = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_3 = SubmissionInfo( + submission_id="3", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_4 = SubmissionInfo( + submission_id="4", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + # mark 1 as failed validation, 2 as failed processing, 3 as null and 4 as successful + aud.add_new_submissions([sub_1, sub_2, sub_3, sub_4]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_1.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_2.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ProcessingStatusRecord( + submission_id=sub_3.submission_id, processing_status="business_rules" + ), + ProcessingStatusRecord( + submission_id=sub_4.submission_id, processing_status="error_report", submission_result="success" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_1.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + SubmissionStatisticsRecord(submission_id=sub_4.submission_id, record_count=20, number_record_rejections=0, number_warnings=1) + ]) + + while not aud.queue.empty(): + time.sleep(0.05) + + sub_stats_1 = aud.get_submission_status(sub_1.submission_id) + assert sub_stats_1.submission_result == "validation_failed" + assert sub_stats_1.validation_failed + assert not sub_stats_1.processing_failed + assert sub_stats_1.number_of_records == 5 + sub_stats_2 = aud.get_submission_status(sub_2.submission_id) + assert sub_stats_2.submission_result == "processing_failed" + assert not sub_stats_2.validation_failed + assert sub_stats_2.processing_failed + sub_stats_3 = aud.get_submission_status(sub_3.submission_id) + assert not sub_stats_3.validation_failed + assert not sub_stats_3.processing_failed + sub_stats_4 = aud.get_submission_status(sub_4.submission_id) + assert sub_stats_4.submission_result == "success" + assert not sub_stats_4.validation_failed + assert not sub_stats_4.processing_failed + assert sub_stats_4.number_of_records == 20 + assert not aud.get_submission_status("5") \ No newline at end of file diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py index 4045a90..a5146f9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py @@ -10,8 +10,8 @@ from pyspark.sql.functions import col, lit from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager -from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo -from dve.pipeline.utils import unpersist_all_rdds +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord +from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds from dve.core_engine.backends.implementations.spark.spark_helpers import PYTHON_TYPE_TO_SPARK_TYPE from .....conftest import get_test_file_path @@ -123,14 +123,14 @@ def test_audit_table_update_status(spark_audit_manager: SparkAuditingManager): == "business_rules" ) - spark_audit_manager.mark_error_report([(_sub_info.submission_id, "failed")]) + spark_audit_manager.mark_error_report([(_sub_info.submission_id, "validation_failed")]) assert ( spark_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status == "error_report" ) - spark_audit_manager.mark_finished([(_sub_info.submission_id, "failed")]) + spark_audit_manager.mark_finished([(_sub_info.submission_id, "validation_failed")]) assert ( spark_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status @@ -367,3 +367,91 @@ def test_get_error_report_submissions(spark_audit_manager: SparkAuditingManager) assert len(processed) == 2 assert len(dodgy) == 0 assert processed == expected + +def test_get_submission_status(spark_audit_manager: SparkAuditingManager): + with spark_audit_manager as aud: + # add three submissions + sub_1 = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_2 = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_3 = SubmissionInfo( + submission_id="3", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_4 = SubmissionInfo( + submission_id="4", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + # mark 1 as failed validation, 2 as failed processing, 3 as null and 4 as successful + aud.add_new_submissions([sub_1, sub_2, sub_3, sub_4]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_1.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_2.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ProcessingStatusRecord( + submission_id=sub_3.submission_id, processing_status="business_rules" + ), + ProcessingStatusRecord( + submission_id=sub_4.submission_id, processing_status="error_report", submission_result="success" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_1.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + SubmissionStatisticsRecord(submission_id=sub_4.submission_id, record_count=20, number_record_rejections=0, number_warnings=1) + ]) + + sub_stats_1 = aud.get_submission_status(sub_1.submission_id) + assert sub_stats_1.submission_result == "validation_failed" + assert sub_stats_1.validation_failed + assert not sub_stats_1.processing_failed + assert sub_stats_1.number_of_records == 5 + sub_stats_2 = aud.get_submission_status(sub_2.submission_id) + assert sub_stats_2.submission_result == "processing_failed" + assert not sub_stats_2.validation_failed + assert sub_stats_2.processing_failed + sub_stats_3 = aud.get_submission_status(sub_3.submission_id) + assert not sub_stats_3.validation_failed + assert not sub_stats_3.processing_failed + sub_stats_4 = aud.get_submission_status(sub_4.submission_id) + assert sub_stats_4.submission_result == "success" + assert not sub_stats_4.validation_failed + assert not sub_stats_4.processing_failed + assert sub_stats_4.number_of_records == 20 + assert not aud.get_submission_status("5") diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 8512494..81660dd 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -17,6 +17,7 @@ from dve.core_engine.models import SubmissionInfo import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -122,10 +123,11 @@ def test_data_contract_step( ) success, failed = dve_pipeline.data_contract_step( - pool=ThreadPoolExecutor(2), file_transform_results=[sub_info] + pool=ThreadPoolExecutor(2), file_transform_results=[(sub_info, SubmissionStatus())] ) assert len(success) == 1 + assert not success[0][1].validation_failed assert len(failed) == 0 assert Path(processed_file_path, sub_info.submission_id, "contract", "planets").exists() @@ -159,10 +161,11 @@ def test_business_rule_step( audit_manager.add_new_submissions([sub_info], job_run_id=1) successful_files, unsuccessful_files, failed_processing = dve_pipeline.business_rule_step( - pool=ThreadPoolExecutor(2), files=[(sub_info, None)] + pool=ThreadPoolExecutor(2), files=[(sub_info, SubmissionStatus())] ) assert len(successful_files) == 1 + assert not successful_files[0][1].validation_failed assert len(unsuccessful_files) == 0 assert len(failed_processing) == 0 diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index dcdb59b..df58b59 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -113,6 +113,6 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn): reference_data_loader=DuckDBRefDataLoader, ) output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) - assert fh.get_resource_exists(report_uri) + assert not fh.get_resource_exists(report_uri) assert not output_loc assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/test_pipeline/test_pipeline.py b/tests/test_pipeline/test_pipeline.py index 429e947..38418d6 100644 --- a/tests/test_pipeline/test_pipeline.py +++ b/tests/test_pipeline/test_pipeline.py @@ -90,7 +90,8 @@ def test_file_transformation(planet_test_files): # pylint: disable=redefined-ou output_path = Path(tdir, submitted_file_info.submission_id, "transform") - result = dve_pipeline.file_transformation(submitted_file_info) + sub_info, sub_status = dve_pipeline.file_transformation(submitted_file_info) - assert isinstance(result, SubmissionInfo) + assert isinstance(sub_info, SubmissionInfo) + assert not sub_status.processing_failed assert output_path.joinpath("planets").exists() diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index c3a7fb2..79d5815 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -126,10 +126,10 @@ def test_apply_data_contract_success( reference_data_loader=None, spark=spark, ) + sub_status = SubmissionStatus() + sub_info, sub_status = dve_pipeline.apply_data_contract(sub_info, sub_status) - _, failed = dve_pipeline.apply_data_contract(sub_info) - - assert not failed + assert not sub_status.validation_failed assert Path(Path(processed_file_path), sub_info.submission_id, "contract", "planets").exists() @@ -147,9 +147,10 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name reference_data_loader=None, spark=spark, ) + sub_status = SubmissionStatus() - _, failed = dve_pipeline.apply_data_contract(sub_info) - assert failed + sub_info, sub_status = dve_pipeline.apply_data_contract(sub_info, sub_status) + assert sub_status.validation_failed output_path = Path(processed_file_path) / sub_info.submission_id assert Path(output_path, "contract", "planets").exists() @@ -210,6 +211,7 @@ def test_data_contract_step( spark_test_database, ): # pylint: disable=redefined-outer-name sub_info, processed_file_path = planet_data_after_file_transformation + sub_status = SubmissionStatus() with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( audit_tables=audit_manager, @@ -221,10 +223,11 @@ def test_data_contract_step( ) success, failed = dve_pipeline.data_contract_step( - pool=ThreadPoolExecutor(2), file_transform_results=[sub_info] + pool=ThreadPoolExecutor(2), file_transform_results=[(sub_info, sub_status)] ) assert len(success) == 1 + assert not success[0][1].validation_failed assert len(failed) == 0 assert Path(processed_file_path, sub_info.submission_id, "contract", "planets").exists() @@ -254,9 +257,9 @@ def test_apply_business_rules_success( spark=spark, ) - _, status = dve_pipeline.apply_business_rules(sub_info, False) + _, status = dve_pipeline.apply_business_rules(sub_info, SubmissionStatus()) - assert not status.failed + assert not status.validation_failed assert status.number_of_records == 1 planets_entity_path = Path( @@ -299,9 +302,9 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out spark=spark, ) - _, status = dve_pipeline.apply_business_rules(sub_info, False) + _, status = dve_pipeline.apply_business_rules(sub_info, SubmissionStatus()) - assert status.failed + assert status.validation_failed assert status.number_of_records == 1 br_path = Path( @@ -382,7 +385,7 @@ def test_business_rule_step( audit_manager.add_new_submissions([sub_info], job_run_id=1) successful_files, unsuccessful_files, failed_processing = dve_pipeline.business_rule_step( - pool=ThreadPoolExecutor(2), files=[(sub_info, None)] + pool=ThreadPoolExecutor(2), files=[(sub_info, SubmissionStatus())] ) assert len(successful_files) == 1 @@ -418,7 +421,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out sub_info, SubmissionStatus(True, 9) ) - assert status.failed + assert status.validation_failed expected = { "submission_id": submission_info.submission_id,