From f0dbbe78908e29fd977484c4a1412f17370a1fdf Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:11:33 +0000 Subject: [PATCH 1/8] feat: initial work to add Foundry pipeline for running DVE --- src/dve/pipeline/duckdb_pipeline.py | 5 ++- src/dve/pipeline/foundry_ddb_pipeline.py | 43 +++++++++++++++++++++ src/dve/pipeline/pipeline.py | 2 +- src/dve/pipeline/spark_pipeline.py | 4 +- tests/test_pipeline/test_duckdb_pipeline.py | 31 +++++++++++++++ 5 files changed, 80 insertions(+), 5 deletions(-) create mode 100644 src/dve/pipeline/foundry_ddb_pipeline.py diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 4e7707b..483f71e 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -24,23 +24,23 @@ class DDBDVEPipeline(BaseDVEPipeline): def __init__( self, audit_tables: DDBAuditingManager, - job_run_id: int, connection: DuckDBPyConnection, rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None, ): self._connection = connection super().__init__( audit_tables, - job_run_id, DuckDBDataContract(connection=self._connection), DuckDBStepImplementations.register_udfs(connection=self._connection), rules_path, processed_files_path, submitted_files_path, reference_data_loader, + job_run_id ) # pylint: disable=arguments-differ @@ -50,3 +50,4 @@ def write_file_to_parquet( # type: ignore return super().write_file_to_parquet( submission_file_uri, submission_info, output, DuckDBPyRelation ) + diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py new file mode 100644 index 0000000..0f86f49 --- /dev/null +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -0,0 +1,43 @@ +"""A duckdb pipeline for running on Foundry platform""" +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet +from dve.core_engine.models import SubmissionInfo +from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus +from dve.parser import file_handling as fh + +@duckdb_write_parquet +class FoundryDDBPipeline(DDBDVEPipeline): + """DuckDB pipeline for running on Foundry Platform""" + def persist_audit_records(self, submission_info: SubmissionInfo): + """Write out key audit relations to parquet for persisting to datasets""" + write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") + self.write_parquet( + self._audit_tables._processing_status.get_relation(), + write_to + "processing_status.parquet") + self.write_parquet( + self._audit_tables._submission_statistics.get_relation(), + write_to + "submission_statistics.parquet") + + def run_pipeline(self, submission_info: SubmissionInfo): + """Sequential single submission pipeline runner""" + try: + sub_id: str = submission_info.submission_id + self._audit_tables.add_new_submissions(submissions=[submission_info]) + self._audit_tables.mark_transform(submission_ids=[sub_id]) + sub_info = self.file_transformation(submission_info=submission_info) + if isinstance(sub_info, SubmissionInfo): + self._audit_tables.mark_data_contract(submission_ids=[sub_id]) + sub_info, failed = self.apply_data_contract(submission_info=submission_info) + self._audit_tables.mark_business_rules(submissions=[(sub_info, failed)]) + sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed) + else: + sub_status = SubmissionStatus(failed=True) + self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)]) + sub_info, sub_status, sub_stats = self.error_report(submission_info=submission_info) + self._audit_tables.add_submission_statistics_records(subs_stats=[sub_stats]) + except Exception as err: + self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}") + self._audit_tables.mark_failed(submissions=[sub_id]) + finally: + self.persist_audit_records(submission_info=submission_info) + \ No newline at end of file diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 69cb141..e78ad21 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -44,13 +44,13 @@ class BaseDVEPipeline: def __init__( self, audit_tables: BaseAuditingManager, - job_run_id: int, data_contract: BaseDataContract, step_implementations: Optional[BaseStepImplementations[EntityType]], rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index d31a2ee..db5cc0a 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -26,23 +26,23 @@ class SparkDVEPipeline(BaseDVEPipeline): def __init__( self, audit_tables: SparkAuditingManager, - job_run_id: int, rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, + job_run_id: Optional[int] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( audit_tables, - job_run_id, SparkDataContract(spark_session=self._spark), SparkStepImplementations.register_udfs(self._spark), rules_path, processed_files_path, submitted_files_path, reference_data_loader, + job_run_id ) # pylint: disable=arguments-differ diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 5619735..96c77b9 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -4,6 +4,7 @@ from concurrent.futures import ThreadPoolExecutor from pathlib import Path +import shutil from typing import Dict, Tuple from uuid import uuid4 @@ -16,6 +17,7 @@ from dve.core_engine.models import SubmissionInfo import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -204,3 +206,32 @@ def test_error_report_step( audit_result = audit_manager.get_current_processing_info(submitted_file_info.submission_id) assert audit_result.processing_status == "success" + +def test_foundry_runner_success(planet_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=PLANETS_RULES_PATH) + + shutil.copytree() + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=PLANETS_RULES_PATH, + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + + +def test_foundry_runner_fail(): + pass + +def test_foundry_runner_error(): + pass From 047c6a53ed8888c57ef205d96d4644f1a59ab808 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 8 Dec 2025 23:47:07 +0000 Subject: [PATCH 2/8] feat: amend foundry pipeline to include exception handling as not using steps. Ensure that file transformation errors are being persisted --- src/dve/core_engine/exceptions.py | 13 +- src/dve/core_engine/message.py | 1 - src/dve/pipeline/foundry_ddb_pipeline.py | 52 +++++++- src/dve/pipeline/pipeline.py | 24 ++-- tests/features/books.feature | 104 +++++++-------- tests/fixtures.py | 1 + .../test_duckdb/test_rules.py | 2 +- tests/test_pipeline/pipeline_helpers.py | 7 ++ tests/test_pipeline/test_duckdb_pipeline.py | 30 ----- .../test_foundry_ddb_pipeline.py | 118 ++++++++++++++++++ tests/testdata/movies/good_movies.json | 46 +++++++ 11 files changed, 299 insertions(+), 99 deletions(-) create mode 100644 tests/test_pipeline/test_foundry_ddb_pipeline.py create mode 100644 tests/testdata/movies/good_movies.json diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index dae8647..fd54a8c 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,6 +1,7 @@ """Exceptions emitted by the pipeline.""" from collections.abc import Iterator +from typing import Any from dve.core_engine.backends.implementations.spark.types import SparkEntities from dve.core_engine.message import FeedbackMessage @@ -14,7 +15,7 @@ def __init__( self, error_message: str, *args: object, messages: Messages, entities: SparkEntities ) -> None: super().__init__(error_message, *args) - self.error_messsage = error_message + self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages """The messages gathered at the time the error was emitted.""" @@ -25,6 +26,16 @@ def __init__( def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) + + def to_feedback_message(self) -> FeedbackMessage: + return FeedbackMessage( + entity=None, + record=None, + failure_type="integrity", + error_type="processing", + error_location="Whole File", + error_message=self.error_message + ) class EntityTypeMismatch(TypeError): diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index 7dd4f02..bfa0bb4 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -445,7 +445,6 @@ def to_dict( self.to_row(key_field, max_number_of_values, value_separator, record_converter), ) ) - def __hash__(self): return hash(str(self)) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 0f86f49..cd999cd 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -1,6 +1,9 @@ """A duckdb pipeline for running on Foundry platform""" +from typing import List, Optional, Tuple from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet +from dve.core_engine.backends.utilities import dump_errors from dve.core_engine.models import SubmissionInfo +from dve.core_engine.type_hints import URI, Failed from dve.pipeline.duckdb_pipeline import DDBDVEPipeline from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh @@ -8,7 +11,7 @@ @duckdb_write_parquet class FoundryDDBPipeline(DDBDVEPipeline): """DuckDB pipeline for running on Foundry Platform""" - def persist_audit_records(self, submission_info: SubmissionInfo): + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") self.write_parquet( @@ -17,8 +20,37 @@ def persist_audit_records(self, submission_info: SubmissionInfo): self.write_parquet( self._audit_tables._submission_statistics.get_relation(), write_to + "submission_statistics.parquet") + return write_to - def run_pipeline(self, submission_info: SubmissionInfo): + def file_transformation(self, submission_info: SubmissionInfo) -> SubmissionInfo | dict[str, str]: + try: + return super().file_transformation(submission_info) + except Exception as exc: + self._logger.error(f"File transformation raised exception: {exc}") + self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + return submission_info.dict() + + def apply_data_contract(self, submission_info: SubmissionInfo) -> Tuple[SubmissionInfo | bool]: + try: + return super().apply_data_contract(submission_info) + except Exception as exc: + self._logger.error(f"Apply data contract raised exception: {exc}") + self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + return submission_info, True + + def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): + try: + return super().apply_business_rules(submission_info, failed) + except Exception as exc: + self._logger.error(f"Apply business rules raised exception: {exc}") + self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + return submission_info, SubmissionStatus(failed=True) + + + def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], URI, URI]: """Sequential single submission pipeline runner""" try: sub_id: str = submission_info.submission_id @@ -28,16 +60,24 @@ def run_pipeline(self, submission_info: SubmissionInfo): if isinstance(sub_info, SubmissionInfo): self._audit_tables.mark_data_contract(submission_ids=[sub_id]) sub_info, failed = self.apply_data_contract(submission_info=submission_info) - self._audit_tables.mark_business_rules(submissions=[(sub_info, failed)]) + self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed) else: sub_status = SubmissionStatus(failed=True) self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)]) - sub_info, sub_status, sub_stats = self.error_report(submission_info=submission_info) - self._audit_tables.add_submission_statistics_records(subs_stats=[sub_stats]) + sub_info, sub_status, sub_stats, report_uri = self.error_report(submission_info=submission_info, status=sub_status) + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) except Exception as err: self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}") self._audit_tables.mark_failed(submissions=[sub_id]) finally: - self.persist_audit_records(submission_info=submission_info) + audit_files_uri = self.persist_audit_records(submission_info=submission_info) + return ( + None if sub_status.failed else fh.joinuri( + self.processed_files_path, + sub_id, + "business_rules"), + report_uri, + audit_files_uri + ) \ No newline at end of file diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index eb77c62..d36796b 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -13,13 +13,15 @@ import polars as pl from pydantic import validate_arguments +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.message import FeedbackMessage import dve.reporting.excel_report as er from dve.core_engine.backends.base.auditing import BaseAuditingManager from dve.core_engine.backends.base.contract import BaseDataContract from dve.core_engine.backends.base.core import EntityManager from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.base.rules import BaseStepImplementations -from dve.core_engine.backends.exceptions import MessageBearingError +from dve.core_engine.backends.exceptions import BackendError, MessageBearingError, ReaderLacksEntityTypeSupport from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType from dve.core_engine.backends.utilities import dump_errors, stringify_model @@ -274,6 +276,16 @@ def file_transformation( errors = self.write_file_to_parquet( submission_file_uri, submission_info, self.processed_files_path ) + + except Exception as exc: # pylint: disable=broad-except + self._logger.error(f"Unexpected file transformation error: {exc}") + self._logger.exception(exc) + # TODO: should this go to processing_errors.json? + # TODO: shouldn't be seen by user and don't need to maintain feedback message structure + errors = [CriticalProcessingError(entities=None, + error_message=repr(exc), + messages=[]).to_feedback_message()] + finally: if errors: dump_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), @@ -282,13 +294,6 @@ def file_transformation( ) return submission_info.dict() return submission_info - except ValueError as exc: - self._logger.error(f"File transformation write_file_to_parquet raised error: {exc}") - return submission_info.dict() - except Exception as exc: # pylint: disable=broad-except - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) - return submission_info.dict() def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] @@ -321,6 +326,7 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue @@ -423,6 +429,7 @@ def data_contract_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Data Contract raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue @@ -562,6 +569,7 @@ def business_rule_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Business Rules raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue diff --git a/tests/features/books.feature b/tests/features/books.feature index 6313a07..5b03e7d 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -4,59 +4,59 @@ Feature: Pipeline tests using the books dataset This tests submissions using nested, complex JSON datasets with arrays, and introduces more complex transformations that require aggregation. - Scenario: Validate complex nested XML data (spark) - Given I submit the books file nested_books.xml for processing - And A spark pipeline is configured with schema file 'nested_books.dischema.json' - And I add initial audit entries for the submission - Then the latest audit record for the submission is marked with processing status file_transformation - When I run the file transformation phase - Then the header entity is stored as a parquet after the file_transformation phase - And the nested_books entity is stored as a parquet after the file_transformation phase - And the latest audit record for the submission is marked with processing status data_contract - When I run the data contract phase - Then there is 1 record rejection from the data_contract phase - And the header entity is stored as a parquet after the data_contract phase - And the nested_books entity is stored as a parquet after the data_contract phase - And the latest audit record for the submission is marked with processing status business_rules - When I run the business rules phase - Then The rules restrict "nested_books" to 3 qualifying records - And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - And the nested_books entity is stored as a parquet after the business_rules phase - And the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced - And The statistics entry for the submission shows the following information - | parameter | value | - | record_count | 4 | - | number_record_rejections | 2 | - | number_warnings | 0 | + # Scenario: Validate complex nested XML data (spark) + # Given I submit the books file nested_books.xml for processing + # And A spark pipeline is configured with schema file 'nested_books.dischema.json' + # And I add initial audit entries for the submission + # Then the latest audit record for the submission is marked with processing status file_transformation + # When I run the file transformation phase + # Then the header entity is stored as a parquet after the file_transformation phase + # And the nested_books entity is stored as a parquet after the file_transformation phase + # And the latest audit record for the submission is marked with processing status data_contract + # When I run the data contract phase + # Then there is 1 record rejection from the data_contract phase + # And the header entity is stored as a parquet after the data_contract phase + # And the nested_books entity is stored as a parquet after the data_contract phase + # And the latest audit record for the submission is marked with processing status business_rules + # When I run the business rules phase + # Then The rules restrict "nested_books" to 3 qualifying records + # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + # And the nested_books entity is stored as a parquet after the business_rules phase + # And the latest audit record for the submission is marked with processing status error_report + # When I run the error report phase + # Then An error report is produced + # And The statistics entry for the submission shows the following information + # | parameter | value | + # | record_count | 4 | + # | number_record_rejections | 2 | + # | number_warnings | 0 | - Scenario: Validate complex nested XML data (duckdb) - Given I submit the books file nested_books.xml for processing - And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' - And I add initial audit entries for the submission - Then the latest audit record for the submission is marked with processing status file_transformation - When I run the file transformation phase - Then the header entity is stored as a parquet after the file_transformation phase - And the nested_books entity is stored as a parquet after the file_transformation phase - And the latest audit record for the submission is marked with processing status data_contract - When I run the data contract phase - Then there is 1 record rejection from the data_contract phase - And the header entity is stored as a parquet after the data_contract phase - And the nested_books entity is stored as a parquet after the data_contract phase - And the latest audit record for the submission is marked with processing status business_rules - When I run the business rules phase - Then The rules restrict "nested_books" to 3 qualifying records - And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - And the nested_books entity is stored as a parquet after the business_rules phase - And the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced - And The statistics entry for the submission shows the following information - | parameter | value | - | record_count | 4 | - | number_record_rejections | 2 | - | number_warnings | 0 | + # Scenario: Validate complex nested XML data (duckdb) + # Given I submit the books file nested_books.xml for processing + # And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' + # And I add initial audit entries for the submission + # Then the latest audit record for the submission is marked with processing status file_transformation + # When I run the file transformation phase + # Then the header entity is stored as a parquet after the file_transformation phase + # And the nested_books entity is stored as a parquet after the file_transformation phase + # And the latest audit record for the submission is marked with processing status data_contract + # When I run the data contract phase + # Then there is 1 record rejection from the data_contract phase + # And the header entity is stored as a parquet after the data_contract phase + # And the nested_books entity is stored as a parquet after the data_contract phase + # And the latest audit record for the submission is marked with processing status business_rules + # When I run the business rules phase + # Then The rules restrict "nested_books" to 3 qualifying records + # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + # And the nested_books entity is stored as a parquet after the business_rules phase + # And the latest audit record for the submission is marked with processing status error_report + # When I run the error report phase + # Then An error report is produced + # And The statistics entry for the submission shows the following information + # | parameter | value | + # | record_count | 4 | + # | number_record_rejections | 2 | + # | number_warnings | 0 | Scenario: Handle a file with a malformed tag (duckdb) Given I submit the books file malformed_books.xml for processing diff --git a/tests/fixtures.py b/tests/fixtures.py index 9229fc8..8a9a147 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -120,4 +120,5 @@ def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]: with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp: db_file = Path(tmp, db + ".duckdb") conn = connect(database=db_file, read_only=False) + yield db_file, conn diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py index dafac4d..038b4e9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py @@ -457,7 +457,7 @@ def test_one_to_one_join_multi_matches_raises( new_columns={"satellites.name": "satellite"}, ) entities = EntityManager({"planets": planets_rel, "satellites": satellites_rel}) - with pytest.raises(ValueError, match="Multiple matches for some records.+"): + with pytest.raises(ValueError, match="Multiple matches for some records.*"): DUCKDB_STEP_BACKEND.one_to_one_join(entities, config=join) diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py index de2ee10..1518ccf 100644 --- a/tests/test_pipeline/pipeline_helpers.py +++ b/tests/test_pipeline/pipeline_helpers.py @@ -66,6 +66,13 @@ def planet_test_files() -> Iterator[str]: shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets")) yield tdir + "/planets" +@pytest.fixture(scope="function") +def movies_test_files() -> Iterator[str]: + clear_config_cache() + with tempfile.TemporaryDirectory() as tdir: + shutil.copytree(get_test_file_path("movies/"), Path(tdir, "movies")) + yield tdir + "/movies" + @pytest.fixture(scope="function") def planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionInfo, str]]: diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 96c77b9..8512494 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -17,7 +17,6 @@ from dve.core_engine.models import SubmissionInfo import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline -from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -206,32 +205,3 @@ def test_error_report_step( audit_result = audit_manager.get_current_processing_info(submitted_file_info.submission_id) assert audit_result.processing_status == "success" - -def test_foundry_runner_success(planet_test_files, temp_ddb_conn): - db_file, conn = temp_ddb_conn - processing_folder = planet_test_files - - DuckDBRefDataLoader.connection = conn - DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) - sub_id = uuid4().hex - sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, - metadata_uri=PLANETS_RULES_PATH) - - shutil.copytree() - - with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: - dve_pipeline = FoundryDDBPipeline( - audit_tables=audit_manager, - connection=conn, - rules_path=PLANETS_RULES_PATH, - processed_files_path=processing_folder, - submitted_files_path=None, - reference_data_loader=DuckDBRefDataLoader, - ) - - -def test_foundry_runner_fail(): - pass - -def test_foundry_runner_error(): - pass diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py new file mode 100644 index 0000000..dcdb59b --- /dev/null +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -0,0 +1,118 @@ +"""Test DuckDBPipeline object methods""" +# pylint: disable=missing-function-docstring +# pylint: disable=protected-access + +from datetime import datetime +from pathlib import Path +import shutil +from uuid import uuid4 + +import pytest + +from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager +from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader +from dve.core_engine.models import SubmissionInfo +import dve.parser.file_handling as fh +from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline + +from ..conftest import get_test_file_path +from ..fixtures import temp_ddb_conn # pylint: disable=unused-import +from .pipeline_helpers import ( # pylint: disable=unused-import + PLANETS_RULES_PATH, + planet_test_files, + movies_test_files +) + +def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 + + +def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + # add movies refdata to conn + ref_db_file = Path(db_file.parent, f"movies_refdata.duckdb").as_posix() + conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata") + conn.read_parquet(get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()).to_table(f"movies_refdata.sequels") + processing_folder = movies_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo(submission_id=sub_id, + dataset_id="movies", + file_name="good_movies", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5)) + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(movies_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = None + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert len(list(fh.iter_prefix(output_loc))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 2 + +def test_foundry_runner_error(planet_test_files, temp_ddb_conn): + # using spark reader config - should error in file transformation - check gracefully handled + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/testdata/movies/good_movies.json b/tests/testdata/movies/good_movies.json new file mode 100644 index 0000000..acc7026 --- /dev/null +++ b/tests/testdata/movies/good_movies.json @@ -0,0 +1,46 @@ +[ + { + "title": "Not a great one", + "year": 2017, + "genre": ["Animation", "Comedy", "Family"], + "duration_minutes": 88, + "ratings": [6.9, 7.8], + "cast": [ + { "name": "J. Smith", "role": "Lion", "date_joined": "2015-11-12" }, + { "name": "T. Car", "role": "Mouse", "date_joined": "2015-11-12" } + ] + }, + { + "title": "Good family movie", + "year": 2022, + "genre": ["Sci-Fi", "Family"], + "duration_minutes": 95, + "ratings": [7, 8.2, 6.3], + "cast": [ + { "name": "D. Farnesbarnes", "role": "Robot" }, + { "name": "G. Adams", "role": "Alien", "date_joined": "2017-08-01" } + ] + }, + { + "title": "One with a cat and a dog", + "year": 2020, + "genre": ["Fantasy", "Family"], + "duration_minutes": 110, + "ratings": [6.1, 6.2], + "cast": [ + { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, + { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } + ] + }, + { + "title": "A bad 'un", + "year": 2011, + "genre": ["Mystery", "Family"], + "duration_minutes": 97, + "ratings": [1.2, 3.4, 5.6, 3.4], + "cast": [ + { "name": "R. Green", "role": "Baby", "date_joined": "2013-11-12" }, + { "name": "P. Plum", "role": "Dad", "date_joined": "2013-10-08" } + ] + } +] \ No newline at end of file From b356bd270bab3b6cc64a94d4c33c4f17d0bb411d Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 9 Dec 2025 21:02:06 +0000 Subject: [PATCH 3/8] feat: some formatting. Tweaked how errors are handled within file transformation --- src/dve/core_engine/exceptions.py | 8 +-- src/dve/core_engine/message.py | 1 + src/dve/pipeline/duckdb_pipeline.py | 3 +- src/dve/pipeline/foundry_ddb_pipeline.py | 73 +++++++++++++----------- src/dve/pipeline/pipeline.py | 52 ++++++++++------- src/dve/pipeline/spark_pipeline.py | 2 +- 6 files changed, 80 insertions(+), 59 deletions(-) diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index fd54a8c..d2faaf5 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,7 +1,6 @@ """Exceptions emitted by the pipeline.""" from collections.abc import Iterator -from typing import Any from dve.core_engine.backends.implementations.spark.types import SparkEntities from dve.core_engine.message import FeedbackMessage @@ -26,16 +25,17 @@ def __init__( def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) - + def to_feedback_message(self) -> FeedbackMessage: + "Convert to feedback message to write to json file" return FeedbackMessage( entity=None, record=None, failure_type="integrity", error_type="processing", error_location="Whole File", - error_message=self.error_message - ) + error_message=self.error_message, + ) class EntityTypeMismatch(TypeError): diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index bfa0bb4..7dd4f02 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -445,6 +445,7 @@ def to_dict( self.to_row(key_field, max_number_of_values, value_separator, record_converter), ) ) + def __hash__(self): return hash(str(self)) diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 483f71e..390e15b 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -40,7 +40,7 @@ def __init__( processed_files_path, submitted_files_path, reference_data_loader, - job_run_id + job_run_id, ) # pylint: disable=arguments-differ @@ -50,4 +50,3 @@ def write_file_to_parquet( # type: ignore return super().write_file_to_parquet( submission_file_uri, submission_info, output, DuckDBPyRelation ) - diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index cd999cd..04da114 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -1,6 +1,7 @@ """A duckdb pipeline for running on Foundry platform""" -from typing import List, Optional, Tuple -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet + +from typing import Optional +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet from dve.core_engine.backends.utilities import dump_errors from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed @@ -8,49 +9,49 @@ from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh -@duckdb_write_parquet class FoundryDDBPipeline(DDBDVEPipeline): """DuckDB pipeline for running on Foundry Platform""" + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") self.write_parquet( self._audit_tables._processing_status.get_relation(), - write_to + "processing_status.parquet") + write_to + "processing_status.parquet", + ) self.write_parquet( self._audit_tables._submission_statistics.get_relation(), - write_to + "submission_statistics.parquet") + write_to + "submission_statistics.parquet", + ) return write_to - - def file_transformation(self, submission_info: SubmissionInfo) -> SubmissionInfo | dict[str, str]: + + def file_transformation( + self, submission_info: SubmissionInfo + ) -> SubmissionInfo | dict[str, str]: try: return super().file_transformation(submission_info) - except Exception as exc: + except Exception as exc: # pylint: disable=W0718 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) return submission_info.dict() - - def apply_data_contract(self, submission_info: SubmissionInfo) -> Tuple[SubmissionInfo | bool]: + + def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]: try: return super().apply_data_contract(submission_info) - except Exception as exc: + except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply data contract raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) return submission_info, True - + def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): try: return super().apply_business_rules(submission_info, failed) - except Exception as exc: + except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply business rules raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) return submission_info, SubmissionStatus(failed=True) - - - def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], URI, URI]: + + def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]: """Sequential single submission pipeline runner""" try: sub_id: str = submission_info.submission_id @@ -61,23 +62,31 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], self._audit_tables.mark_data_contract(submission_ids=[sub_id]) sub_info, failed = self.apply_data_contract(submission_info=submission_info) self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) - sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed) + sub_info, sub_status = self.apply_business_rules( + submission_info=submission_info, failed=failed + ) else: - sub_status = SubmissionStatus(failed=True) - self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)]) - sub_info, sub_status, sub_stats, report_uri = self.error_report(submission_info=submission_info, status=sub_status) + sub_status = SubmissionStatus(failed=True) + self._audit_tables.mark_error_report( + submissions=[(sub_id, sub_status.submission_result)] + ) + sub_info, sub_status, sub_stats, report_uri = self.error_report( + submission_info=submission_info, status=sub_status + ) self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) - except Exception as err: - self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}") + except Exception as err: # pylint: disable=W0718 + self._logger.error( + f"During processing of submission_id: {sub_id}, the following exception was raised: {err}" + ) self._audit_tables.mark_failed(submissions=[sub_id]) finally: audit_files_uri = self.persist_audit_records(submission_info=submission_info) - return ( - None if sub_status.failed else fh.joinuri( - self.processed_files_path, - sub_id, - "business_rules"), + return ( + ( + None + if sub_status.failed + else fh.joinuri(self.processed_files_path, sub_id, "business_rules") + ), report_uri, - audit_files_uri + audit_files_uri, ) - \ No newline at end of file diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index d36796b..15008a9 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -21,7 +21,11 @@ from dve.core_engine.backends.base.core import EntityManager from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.base.rules import BaseStepImplementations -from dve.core_engine.backends.exceptions import BackendError, MessageBearingError, ReaderLacksEntityTypeSupport +from dve.core_engine.backends.exceptions import ( + BackendError, + MessageBearingError, + ReaderLacksEntityTypeSupport, +) from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType from dve.core_engine.backends.utilities import dump_errors, stringify_model @@ -52,7 +56,7 @@ def __init__( processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, - job_run_id: Optional[int] = None + job_run_id: Optional[int] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -267,33 +271,41 @@ def file_transformation( if not self.processed_files_path: raise AttributeError("processed files path not provided") + errors: list[FeedbackMessage] = [] submission_file_uri: URI = fh.joinuri( self.processed_files_path, submission_info.submission_id, submission_info.file_name_with_ext, ) try: - errors = self.write_file_to_parquet( + errors.extend(self.write_file_to_parquet( submission_file_uri, submission_info, self.processed_files_path - ) + )) + + except MessageBearingError as exc: + self._logger.error(f"Unexpected file transformation error: {exc}") + self._logger.exception(exc) + errors.extend(exc.messages) - except Exception as exc: # pylint: disable=broad-except + except BackendError as exc: # pylint: disable=broad-except self._logger.error(f"Unexpected file transformation error: {exc}") self._logger.exception(exc) - # TODO: should this go to processing_errors.json? - # TODO: shouldn't be seen by user and don't need to maintain feedback message structure - errors = [CriticalProcessingError(entities=None, - error_message=repr(exc), - messages=[]).to_feedback_message()] - finally: - if errors: - dump_errors( - fh.joinuri(self.processed_files_path, submission_info.submission_id), - "file_transformation", - errors, - ) - return submission_info.dict() - return submission_info + errors.extend([ + CriticalProcessingError( + entities=None, + error_message=repr(exc), + messages=[], + ).to_feedback_message() + ]) + + if errors: + dump_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + errors, + ) + return submission_info.dict() + return submission_info def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] @@ -326,7 +338,7 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index db5cc0a..ae77cfd 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -42,7 +42,7 @@ def __init__( processed_files_path, submitted_files_path, reference_data_loader, - job_run_id + job_run_id, ) # pylint: disable=arguments-differ From 8b787cbadafd2fda743c85dd8f71ca13cd6f352d Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Wed, 10 Dec 2025 08:09:28 +0000 Subject: [PATCH 4/8] test: reenabled books behave tests --- tests/features/books.feature | 104 +++++++++++++++++------------------ 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/tests/features/books.feature b/tests/features/books.feature index 5b03e7d..6313a07 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -4,59 +4,59 @@ Feature: Pipeline tests using the books dataset This tests submissions using nested, complex JSON datasets with arrays, and introduces more complex transformations that require aggregation. - # Scenario: Validate complex nested XML data (spark) - # Given I submit the books file nested_books.xml for processing - # And A spark pipeline is configured with schema file 'nested_books.dischema.json' - # And I add initial audit entries for the submission - # Then the latest audit record for the submission is marked with processing status file_transformation - # When I run the file transformation phase - # Then the header entity is stored as a parquet after the file_transformation phase - # And the nested_books entity is stored as a parquet after the file_transformation phase - # And the latest audit record for the submission is marked with processing status data_contract - # When I run the data contract phase - # Then there is 1 record rejection from the data_contract phase - # And the header entity is stored as a parquet after the data_contract phase - # And the nested_books entity is stored as a parquet after the data_contract phase - # And the latest audit record for the submission is marked with processing status business_rules - # When I run the business rules phase - # Then The rules restrict "nested_books" to 3 qualifying records - # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - # And the nested_books entity is stored as a parquet after the business_rules phase - # And the latest audit record for the submission is marked with processing status error_report - # When I run the error report phase - # Then An error report is produced - # And The statistics entry for the submission shows the following information - # | parameter | value | - # | record_count | 4 | - # | number_record_rejections | 2 | - # | number_warnings | 0 | + Scenario: Validate complex nested XML data (spark) + Given I submit the books file nested_books.xml for processing + And A spark pipeline is configured with schema file 'nested_books.dischema.json' + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the header entity is stored as a parquet after the file_transformation phase + And the nested_books entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there is 1 record rejection from the data_contract phase + And the header entity is stored as a parquet after the data_contract phase + And the nested_books entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "nested_books" to 3 qualifying records + And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + And the nested_books entity is stored as a parquet after the business_rules phase + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 4 | + | number_record_rejections | 2 | + | number_warnings | 0 | - # Scenario: Validate complex nested XML data (duckdb) - # Given I submit the books file nested_books.xml for processing - # And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' - # And I add initial audit entries for the submission - # Then the latest audit record for the submission is marked with processing status file_transformation - # When I run the file transformation phase - # Then the header entity is stored as a parquet after the file_transformation phase - # And the nested_books entity is stored as a parquet after the file_transformation phase - # And the latest audit record for the submission is marked with processing status data_contract - # When I run the data contract phase - # Then there is 1 record rejection from the data_contract phase - # And the header entity is stored as a parquet after the data_contract phase - # And the nested_books entity is stored as a parquet after the data_contract phase - # And the latest audit record for the submission is marked with processing status business_rules - # When I run the business rules phase - # Then The rules restrict "nested_books" to 3 qualifying records - # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - # And the nested_books entity is stored as a parquet after the business_rules phase - # And the latest audit record for the submission is marked with processing status error_report - # When I run the error report phase - # Then An error report is produced - # And The statistics entry for the submission shows the following information - # | parameter | value | - # | record_count | 4 | - # | number_record_rejections | 2 | - # | number_warnings | 0 | + Scenario: Validate complex nested XML data (duckdb) + Given I submit the books file nested_books.xml for processing + And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the header entity is stored as a parquet after the file_transformation phase + And the nested_books entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there is 1 record rejection from the data_contract phase + And the header entity is stored as a parquet after the data_contract phase + And the nested_books entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "nested_books" to 3 qualifying records + And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + And the nested_books entity is stored as a parquet after the business_rules phase + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 4 | + | number_record_rejections | 2 | + | number_warnings | 0 | Scenario: Handle a file with a malformed tag (duckdb) Given I submit the books file malformed_books.xml for processing From f922f0a3101a966b6acc32e5e75071a73f353c99 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Fri, 12 Dec 2025 20:50:59 +0000 Subject: [PATCH 5/8] feat: added pass through of submission status to provide context to services processing files --- src/dve/core_engine/backends/base/auditing.py | 16 ++ src/dve/core_engine/backends/utilities.py | 36 --- src/dve/core_engine/exceptions.py | 18 +- src/dve/core_engine/type_hints.py | 2 +- src/dve/pipeline/foundry_ddb_pipeline.py | 87 +++++-- src/dve/pipeline/pipeline.py | 213 ++++++++++-------- src/dve/pipeline/spark_pipeline.py | 4 +- src/dve/pipeline/utils.py | 29 +-- src/dve/reporting/utils.py | 65 ++++++ tests/features/books.feature | 5 +- tests/features/steps/steps_pipeline.py | 33 ++- tests/test_pipeline/test_duckdb_pipeline.py | 7 +- tests/test_pipeline/test_spark_pipeline.py | 27 ++- 13 files changed, 328 insertions(+), 214 deletions(-) create mode 100644 src/dve/reporting/utils.py diff --git a/src/dve/core_engine/backends/base/auditing.py b/src/dve/core_engine/backends/base/auditing.py index 37b1774..d9f6ca6 100644 --- a/src/dve/core_engine/backends/base/auditing.py +++ b/src/dve/core_engine/backends/base/auditing.py @@ -31,6 +31,7 @@ QueueType, SubmissionResult, ) +from dve.pipeline.utils import SubmissionStatus AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name @@ -493,6 +494,21 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt ) except StopIteration: return None + def get_submission_status(self, submission_id: str) -> SubmissionStatus: + """Get the latest submission status for a submission""" + sub_status = SubmissionStatus() + processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(self._processing_status.get_most_recent_records(order_criteria=[OrderCriteria("time_updated", True)], + pre_filter_criteria=[FilterCriteria("submission_id", + submission_id)]))) + sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id) + if processing_rec.processing_status == "failed": + sub_status.processing_failed = True + if processing_rec.submission_result == "failed": + sub_status.validation_failed = True + if sub_stats_rec: + sub_status.number_of_records = sub_stats_rec.record_count + + return sub_status def __enter__(self): """Use audit table as context manager""" diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index 1a993e9..64a9d11 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -15,7 +15,6 @@ import dve.parser.file_handling as fh from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type from dve.core_engine.type_hints import URI, Messages -from dve.reporting.error_report import conditional_cast # We need to rely on a Python typing implementation detail in Python <= 3.7. if sys.version_info[:2] <= (3, 7): @@ -179,38 +178,3 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType: if polars_type: return polars_type raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") - - -def dump_errors( - working_folder: URI, - step_name: str, - messages: Messages, - key_fields: Optional[dict[str, list[str]]] = None, -): - """Write out to disk captured feedback error messages.""" - if not working_folder: - raise AttributeError("processed files path not passed") - - if not key_fields: - key_fields = {} - - errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json") - processed = [] - - for message in messages: - primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) - error = message.to_dict( - key_field=primary_keys, - value_separator=" -- ", - max_number_of_values=10, - record_converter=None, - ) - error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") - processed.append(error) - - with fh.open_stream(errors, "a") as f: - json.dump( - processed, - f, - default=str, - ) diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index d2faaf5..1e68d79 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -25,18 +25,12 @@ def __init__( def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) - - def to_feedback_message(self) -> FeedbackMessage: - "Convert to feedback message to write to json file" - return FeedbackMessage( - entity=None, - record=None, - failure_type="integrity", - error_type="processing", - error_location="Whole File", - error_message=self.error_message, - ) - + + @classmethod + def from_exception(cls, exc:Exception): + return cls(error_message = repr(exc), + entities=None, + messages=[]) class EntityTypeMismatch(TypeError): """An exception emitted if entity type outputs from two collaborative objects are different.""" diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py index 991f46f..62b7793 100644 --- a/src/dve/core_engine/type_hints.py +++ b/src/dve/core_engine/type_hints.py @@ -236,7 +236,7 @@ PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus))) """List of all possible DVE submission statuses""" -SubmissionResult = Literal["success", "failed", "failed_xml_generation", "archived"] +SubmissionResult = Literal["success", "failed", "archived", "processing_failed"] """Allowed DVE submission results""" SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult))) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 04da114..3f6c80c 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -2,13 +2,16 @@ from typing import Optional from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet -from dve.core_engine.backends.utilities import dump_errors +from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed from dve.pipeline.duckdb_pipeline import DDBDVEPipeline from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh +from dve.reporting.utils import dump_processing_errors +@duckdb_get_entity_count +@duckdb_write_parquet class FoundryDDBPipeline(DDBDVEPipeline): """DuckDB pipeline for running on Foundry Platform""" @@ -27,29 +30,59 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: def file_transformation( self, submission_info: SubmissionInfo - ) -> SubmissionInfo | dict[str, str]: + ) -> tuple[SubmissionInfo, SubmissionStatus]: try: return super().file_transformation(submission_info) except Exception as exc: # pylint: disable=W0718 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - return submission_info.dict() + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + [CriticalProcessingError.from_exception(exc)] + ) + return submission_info, SubmissionStatus(processing_failed=True) - def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]: + def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo | SubmissionStatus]: try: - return super().apply_data_contract(submission_info) + return super().apply_data_contract(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply data contract raised exception: {exc}") self._logger.exception(exc) - return submission_info, True + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "contract", + [CriticalProcessingError.from_exception(exc)] + ) + return submission_info, SubmissionStatus(processing_failed=True) - def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): + def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus): try: - return super().apply_business_rules(submission_info, failed) + return super().apply_business_rules(submission_info, submission_status) except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply business rules raised exception: {exc}") self._logger.exception(exc) - return submission_info, SubmissionStatus(failed=True) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "business_rules", + [CriticalProcessingError.from_exception(exc)] + ) + return submission_info, SubmissionStatus(processing_failed=True) + + def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus): + try: + return super().error_report(submission_info, submission_status) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Error reports raised exception: {exc}") + self._logger.exception(exc) + sub_stats = None + report_uri = None + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "error_report", + [CriticalProcessingError.from_exception(exc)] + ) + return submission_info, submission_status, sub_stats, report_uri def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]: """Sequential single submission pipeline runner""" @@ -57,16 +90,15 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], sub_id: str = submission_info.submission_id self._audit_tables.add_new_submissions(submissions=[submission_info]) self._audit_tables.mark_transform(submission_ids=[sub_id]) - sub_info = self.file_transformation(submission_info=submission_info) - if isinstance(sub_info, SubmissionInfo): + sub_info, sub_status = self.file_transformation(submission_info=submission_info) + if not (sub_status.validation_failed or sub_status.processing_failed): self._audit_tables.mark_data_contract(submission_ids=[sub_id]) - sub_info, failed = self.apply_data_contract(submission_info=submission_info) - self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) + sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status) + self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)]) sub_info, sub_status = self.apply_business_rules( - submission_info=submission_info, failed=failed + submission_info=submission_info, submission_status=sub_status ) - else: - sub_status = SubmissionStatus(failed=True) + self._audit_tables.mark_error_report( submissions=[(sub_id, sub_status.submission_result)] ) @@ -78,15 +110,20 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], self._logger.error( f"During processing of submission_id: {sub_id}, the following exception was raised: {err}" ) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "run_pipeline", + [CriticalProcessingError.from_exception(err)] + ) self._audit_tables.mark_failed(submissions=[sub_id]) finally: audit_files_uri = self.persist_audit_records(submission_info=submission_info) - return ( - ( - None - if sub_status.failed - else fh.joinuri(self.processed_files_path, sub_id, "business_rules") - ), - report_uri, - audit_files_uri, - ) + return ( + ( + None + if (sub_status.validation_failed or sub_status.processing_failed) + else fh.joinuri(self.processed_files_path, sub_id, "business_rules") + ), + report_uri, + audit_files_uri, + ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 15008a9..be846b3 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -1,5 +1,6 @@ # pylint: disable=protected-access,too-many-instance-attributes,too-many-arguments,line-too-long """Generic Pipeline object to define how DVE should be interacted with.""" +from itertools import starmap import json import re from collections import defaultdict @@ -28,13 +29,14 @@ ) from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType -from dve.core_engine.backends.utilities import dump_errors, stringify_model +from dve.core_engine.backends.utilities import stringify_model from dve.core_engine.loggers import get_logger from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord from dve.core_engine.type_hints import URI, Failed, FileURI, InfoURI from dve.parser import file_handling as fh from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates +from dve.reporting.utils import dump_feedback_errors, dump_processing_errors PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = ( FileNotFoundError, # type: ignore @@ -236,6 +238,11 @@ def audit_received_file_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"audit_received_file raised exception: {exc}") self._logger.exception(exc) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "audit_received", + [CriticalProcessingError.from_exception(exc)] + ) # sub_info should at least # be populated with file_name and file_extension failed.append( @@ -266,12 +273,13 @@ def audit_received_file_step( def file_transformation( self, submission_info: SubmissionInfo - ) -> Union[SubmissionInfo, dict[str, str]]: + ) -> tuple[SubmissionInfo, SubmissionStatus]: """Transform a file from its original format into a 'stringified' parquet file""" if not self.processed_files_path: raise AttributeError("processed files path not provided") errors: list[FeedbackMessage] = [] + submission_status: SubmissionStatus = SubmissionStatus() submission_file_uri: URI = fh.joinuri( self.processed_files_path, submission_info.submission_id, @@ -287,29 +295,19 @@ def file_transformation( self._logger.exception(exc) errors.extend(exc.messages) - except BackendError as exc: # pylint: disable=broad-except - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) - errors.extend([ - CriticalProcessingError( - entities=None, - error_message=repr(exc), - messages=[], - ).to_feedback_message() - ]) - if errors: - dump_errors( + dump_feedback_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "file_transformation", errors, ) - return submission_info.dict() - return submission_info + submission_status.validation_failed = True + return submission_info, submission_status + return submission_info, submission_status def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] - ) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]: + ) -> tuple[list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]]: """Step to transform files from their original format into parquet files""" file_transform_futures: list[tuple[SubmissionInfo, Future]] = [] @@ -318,15 +316,21 @@ def file_transformation_step( future = pool.submit(self.file_transformation, submission_info) file_transform_futures.append((submission_info, future)) - success: list[SubmissionInfo] = [] - failed: list[SubmissionInfo] = [] - failed_processing: list[SubmissionInfo] = [] + success: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for sub_info, future in file_transform_futures: try: # sub_info passed here either return SubInfo or dict. If SubInfo, not actually # modified in anyway during this step. - result = future.result() + submission_info: SubmissionInfo + submission_status: SubmissionStatus + submission_info, submission_status = future.result() + if submission_status.validation_failed: + failed.append((submission_info, submission_status)) + else: + success.append((submission_info, submission_status)) except AttributeError as exc: self._logger.error(f"File transformation raised exception: {exc}") raise exc @@ -338,25 +342,25 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "file_transformation", + [CriticalProcessingError.from_exception(exc)] + ) + submission_status = SubmissionStatus(processing_failed=True) + failed_processing.append((sub_info, submission_status)) continue - if isinstance(result, SubmissionInfo): - success.append(sub_info) - else: - failed.append(sub_info) - if len(success) > 0: self._audit_tables.mark_data_contract( - list(map(lambda x: x.submission_id, success)), job_run_id=self.job_run_id + list(starmap(lambda x, _: x.submission_id, success)), job_run_id=self.job_run_id ) if len(failed) > 0: self._audit_tables.mark_error_report( list( - map( - lambda x: ( + starmap( + lambda x, _: ( x.submission_id, "failed", ), @@ -368,12 +372,12 @@ def file_transformation_step( if len(failed_processing) > 0: self._audit_tables.mark_failed( - [si.submission_id for si in failed_processing], job_run_id=self.job_run_id + [si.submission_id for si, _ in failed_processing], job_run_id=self.job_run_id ) return success, failed - def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo, Failed]: + def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" if not self.processed_files_path: @@ -403,33 +407,36 @@ def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[Submissi key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if messages: - dump_errors( + dump_feedback_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "contract", messages, key_fields=key_fields, ) - failed = any(not rule_message.is_informational for rule_message in messages) + validation_failed = any(not rule_message.is_informational for rule_message in messages) + + if validation_failed: + submission_status.validation_failed = True - return submission_info, failed + return submission_info, submission_status def data_contract_step( - self, pool: Executor, file_transform_results: list[SubmissionInfo] - ) -> tuple[list[tuple[SubmissionInfo, Failed]], list[SubmissionInfo]]: + self, pool: Executor, file_transform_results: list[tuple[SubmissionInfo, SubmissionStatus]] + ) -> tuple[list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]]: """Step to validate the types of an untyped (stringly typed) parquet file""" - processed_files: list[tuple[SubmissionInfo, Failed]] = [] - failed_processing: list[SubmissionInfo] = [] - dc_futures: list[tuple[SubmissionInfo, Future]] = [] + processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] - for info in file_transform_results: - dc_futures.append((info, pool.submit(self.apply_data_contract, info))) + for info, sub_status in file_transform_results: + dc_futures.append((info, sub_status, pool.submit(self.apply_data_contract, info, sub_status))) - for sub_info, future in dc_futures: + for sub_info, sub_status, future in dc_futures: try: submission_info: SubmissionInfo - failed: Failed - submission_info, failed = future.result() + submission_status: SubmissionStatus + submission_info, submission_status = future.result() except AttributeError as exc: self._logger.error(f"Data Contract raised exception: {exc}") raise exc @@ -441,30 +448,35 @@ def data_contract_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Data Contract raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "contract", + [CriticalProcessingError.from_exception(exc)] + ) + sub_status.processing_failed=True + failed_processing.append((sub_info, sub_status)) continue - processed_files.append((submission_info, failed)) + processed_files.append((submission_info, submission_status)) if len(processed_files) > 0: self._audit_tables.mark_business_rules( [ - (sub_info.submission_id, failed) # type: ignore - for sub_info, failed in processed_files + (sub_info.submission_id, submission_status.validation_failed) # type: ignore + for sub_info, submission_status in processed_files ], job_run_id=self.job_run_id, ) if len(failed_processing) > 0: self._audit_tables.mark_failed( - [sub_info.submission_id for sub_info in failed_processing], + [sub_info.submission_id for sub_info, _ in failed_processing], job_run_id=self.job_run_id, ) return processed_files, failed_processing - def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): + def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus): """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ @@ -506,14 +518,17 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if rule_messages: - dump_errors( + dump_feedback_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), "business_rules", rule_messages, key_fields, ) - failed = any(not rule_message.is_informational for rule_message in rule_messages) or failed + submission_status.validation_failed = ( + any(not rule_message.is_informational for rule_message in rule_messages) + or submission_status.validation_failed + ) for entity_name, entity in entity_manager.entities.items(): projected = self._step_implementations.write_parquet( # type: ignore @@ -529,47 +544,50 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): projected ) - status = SubmissionStatus( - failed=failed, - number_of_records=self.get_entity_count( + submission_status.number_of_records = self.get_entity_count( entity=entity_manager.entities[ f"""Original{rules.global_variables.get( 'entity', submission_info.dataset_id)}""" ] - ), - ) + ) - return submission_info, status + return submission_info, submission_status def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, Failed]], + files: list[tuple[SubmissionInfo, SubmissionStatus]], ) -> tuple[ list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]], - list[SubmissionInfo], + list[tuple[SubmissionInfo, SubmissionStatus]], ]: """Step to apply business rules (Step impl) to a typed parquet file""" - future_files: list[tuple[SubmissionInfo, Future]] = [] + future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] - for submission_info, submission_failed in files: + for submission_info, submission_status in files: future_files.append( ( submission_info, - pool.submit(self.apply_business_rules, submission_info, submission_failed), + submission_status, + pool.submit(self.apply_business_rules, submission_info, submission_status), ) ) - failed_processing: list[SubmissionInfo] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] unsucessful_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] successful_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] - for sub_info, future in future_files: - status: SubmissionStatus + for sub_info, sub_status, future in future_files: try: - submission_info, status = future.result() + submission_info: SubmissionInfo + submission_status: SubmissionStatus + submission_info, submission_status = future.result() + if submission_status.validation_failed: + unsucessful_files.append((submission_info, submission_status)) + else: + successful_files.append((submission_info, submission_status)) except AttributeError as exc: self._logger.error(f"Business Rules raised exception: {exc}") raise exc @@ -581,14 +599,16 @@ def business_rule_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Business Rules raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "business_rules", + [CriticalProcessingError.from_exception(exc)] + ) + sub_status.processing_failed = True + failed_processing.append((sub_info, sub_status)) continue - if status.failed: - unsucessful_files.append((submission_info, status)) - else: - successful_files.append((submission_info, status)) + if len(unsucessful_files + successful_files) > 0: self._audit_tables.mark_error_report( @@ -601,7 +621,7 @@ def business_rule_step( if len(failed_processing) > 0: self._audit_tables.mark_failed( - [si.submission_id for si in failed_processing], job_run_id=self.job_run_id + [si.submission_id for si, _ in failed_processing], job_run_id=self.job_run_id ) return successful_files, unsucessful_files, failed_processing @@ -657,14 +677,14 @@ def _get_error_dataframes(self, submission_id: str): return errors_df, aggregates - def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus): + def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]]: """Creates the error reports given a submission info and submission status""" if not self.processed_files_path: raise AttributeError("processed files path not provided") errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id) - if not status.number_of_records: + if not submission_status.number_of_records: sub_stats = None else: err_types = { @@ -675,7 +695,7 @@ def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus } sub_stats = SubmissionStatisticsRecord( submission_id=submission_info.submission_id, - record_count=status.number_of_records, + record_count=submission_status.number_of_records, number_record_rejections=err_types.get("Submission Failure", 0), number_warnings=err_types.get("Warning", 0), ) @@ -703,35 +723,36 @@ def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) - return submission_info, status, sub_stats, report_uri + return submission_info, submission_status, sub_stats, report_uri def error_report_step( self, pool: Executor, processed: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), - failed_file_transformation: Iterable[SubmissionInfo] = tuple(), + failed_file_transformation: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), ) -> list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] ]: """Step to produce error reports takes processed files and files that failed file transformation """ - futures: list[tuple[SubmissionInfo, Future]] = [] + futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] reports: list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] ] = [] - failed_processing: list[SubmissionInfo] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for info, status in processed: - futures.append((info, pool.submit(self.error_report, info, status))) + futures.append((info, status, pool.submit(self.error_report, info, status))) - for info_dict in failed_file_transformation: - status = SubmissionStatus(True, 0) - futures.append((info_dict, pool.submit(self.error_report, info_dict, status))) + for info_dict, status in failed_file_transformation: + status.number_of_records = 0 + futures.append((info_dict, status, pool.submit(self.error_report, info_dict, status))) - for sub_info, future in futures: + for sub_info, status, future in futures: try: - submission_info, status, stats, feedback_uri = future.result() + submission_info, submission_status, submission_stats, feedback_uri = future.result() + reports.append((submission_info, submission_status, submission_stats, feedback_uri)) except AttributeError as exc: self._logger.error(f"Error reports raised exception: {exc}") raise exc @@ -743,9 +764,15 @@ def error_report_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Error reports raised exception: {exc}") self._logger.exception(exc) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "error_report", + [CriticalProcessingError.from_exception(exc)] + ) + status.processing_failed = True + failed_processing.append((sub_info, status)) continue - reports.append((submission_info, status, stats, feedback_uri)) + if reports: self._audit_tables.mark_finished( @@ -761,7 +788,7 @@ def error_report_step( if failed_processing: self._audit_tables.mark_failed( - [submission_info.submission_id for submission_info in failed_processing], + [submission_info.submission_id for submission_info, _ in failed_processing], job_run_id=self.job_run_id, ) diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index ae77cfd..57f112c 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -13,7 +13,7 @@ from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed from dve.pipeline.pipeline import BaseDVEPipeline -from dve.pipeline.utils import unpersist_all_rdds +from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds # pylint: disable=abstract-method @@ -56,7 +56,7 @@ def write_file_to_parquet( # type: ignore def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, Failed]], + files: list[tuple[SubmissionInfo, SubmissionStatus]], ): successful_files, unsucessful_files, failed_processing = super().business_rule_step( pool, files diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index 4fa9a02..bfffc53 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -73,23 +73,18 @@ class SubmissionStatus: # _logger = get_logger("submission_status") - def __init__(self, failed: bool, number_of_records: Optional[int] = None): - self._failed = failed - self._number_of_records = number_of_records - - @property - def failed(self): - """Whether the submission was successfully processed.""" - return self._failed - - @property - def number_of_records(self) -> Union[int, None]: - """Number of records within a submission.""" - return self._number_of_records - + def __init__(self, + validation_failed: bool = False, + number_of_records: Optional[int] = None, + processing_failed: bool = False): + self.validation_failed = validation_failed + self.number_of_records = number_of_records + self.processing_failed = processing_failed + @property def submission_result(self) -> SubmissionResult: - """The result of the submission, either succes, failed or failed_xml_generation""" - if self.failed: + if self.processing_failed: + return "processing_failed" + if self.validation_failed: return "failed" - return "success" + return "success" \ No newline at end of file diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py new file mode 100644 index 0000000..df2b28a --- /dev/null +++ b/src/dve/reporting/utils.py @@ -0,0 +1,65 @@ +import json +from typing import Optional +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.type_hints import URI, Messages +import dve.parser.file_handling as fh +from dve.reporting.error_report import conditional_cast + +def dump_feedback_errors( + working_folder: URI, + step_name: str, + messages: Messages, + key_fields: Optional[dict[str, list[str]]] = None, +): + """Write out captured feedback error messages.""" + if not working_folder: + raise AttributeError("processed files path not passed") + + if not key_fields: + key_fields = {} + + errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json") + processed = [] + + for message in messages: + primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) + error = message.to_dict( + key_field=primary_keys, + value_separator=" -- ", + max_number_of_values=10, + record_converter=None, + ) + error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") + processed.append(error) + + with fh.open_stream(errors, "a") as f: + json.dump( + processed, + f, + default=str, + ) + +def dump_processing_errors( + working_folder: URI, + step_name: str, + errors: list[CriticalProcessingError] +): + """Write out critical processing errors""" + if not working_folder: + raise AttributeError("processed files path not passed") + + errors = fh.joinuri(working_folder, "errors", f"processing_errors.json") + processed = [] + + for error in errors: + processed.append({"step_name": step_name, + "error_location": "processing", + "error_level": "integrity", + "error_message": repr(error)}) + + with fh.open_stream(errors, "a") as f: + json.dump( + processed, + f, + default=str, + ) \ No newline at end of file diff --git a/tests/features/books.feature b/tests/features/books.feature index 6313a07..9bc0611 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -64,9 +64,8 @@ Feature: Pipeline tests using the books dataset And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase - Then the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced + Then the latest audit record for the submission is marked with processing status failed + # TODO - handle above within the stream xml reader - specific Scenario: Handle a file that fails XSD validation (duckdb) Given I submit the books file books_xsd_fail.xml for processing diff --git a/tests/features/steps/steps_pipeline.py b/tests/features/steps/steps_pipeline.py index 3dbc2b2..fda37bc 100644 --- a/tests/features/steps/steps_pipeline.py +++ b/tests/features/steps/steps_pipeline.py @@ -108,15 +108,18 @@ def run_file_transformation_step(context: Context): pool=ThreadPoolExecutor(1), submissions_to_process=[ctxt.get_submission_info(context)] ) if failed: - ctxt.set_failed_file_transformation(context, failed[0]) + ctxt.set_failed_file_transformation(context, failed[0][0]) + @when("I run the data contract phase") def apply_data_contract_with_error(context: Context): """Apply the data contract stage""" pipeline = ctxt.get_pipeline(context) + sub_info = ctxt.get_submission_info(context) + sub_status = pipeline._audit_tables.get_submission_status(sub_info.submission_id) pipeline.data_contract_step( - pool=ThreadPoolExecutor(1), file_transform_results=[ctxt.get_submission_info(context)] + pool=ThreadPoolExecutor(1), file_transform_results=[(sub_info, sub_status)] ) @@ -126,10 +129,9 @@ def apply_business_rules(context: Context): pipeline = ctxt.get_pipeline(context) sub_info = ctxt.get_submission_info(context) - proc_rec = pipeline._audit_tables.get_current_processing_info(sub_info.submission_id) - # add row count here so sub stats calculated + sub_status = pipeline._audit_tables.get_submission_status(sub_info.submission_id) success, failed, _ = pipeline.business_rule_step( - pool=ThreadPoolExecutor(1), files=[(sub_info, proc_rec.submission_result == "failed")] + pool=ThreadPoolExecutor(1), files=[(sub_info, sub_status)] ) assert len(success + failed) == 1 sub_status = (success + failed)[0][1] @@ -142,19 +144,28 @@ def create_error_report(context: Context): pipeline = ctxt.get_pipeline(context) try: - failed_file_transformation = [ctxt.get_failed_file_transformation(context)] - processed = [] + failed_ft = ctxt.get_failed_file_transformation(context) + sub_status = pipeline._audit_tables.get_submission_status(failed_ft.submission_id) + + pipeline.error_report_step( + pool=ThreadPoolExecutor(1), + processed=[], + failed_file_transformation=[(failed_ft, sub_status)] + + ) + except AttributeError: sub_info = ctxt.get_submission_info(context) processed = [(sub_info, ctxt.get_submission_status(context))] - failed_file_transformation = [] - - pipeline.error_report_step( + pipeline.error_report_step( pool=ThreadPoolExecutor(1), processed=processed, - failed_file_transformation=failed_file_transformation, + failed_file_transformation=[] + ) + + @then("there are {expected_num_errors:d} record rejections from the {service} phase") @then("there is {expected_num_errors:d} record rejection from the {service} phase") diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 8512494..81660dd 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -17,6 +17,7 @@ from dve.core_engine.models import SubmissionInfo import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -122,10 +123,11 @@ def test_data_contract_step( ) success, failed = dve_pipeline.data_contract_step( - pool=ThreadPoolExecutor(2), file_transform_results=[sub_info] + pool=ThreadPoolExecutor(2), file_transform_results=[(sub_info, SubmissionStatus())] ) assert len(success) == 1 + assert not success[0][1].validation_failed assert len(failed) == 0 assert Path(processed_file_path, sub_info.submission_id, "contract", "planets").exists() @@ -159,10 +161,11 @@ def test_business_rule_step( audit_manager.add_new_submissions([sub_info], job_run_id=1) successful_files, unsuccessful_files, failed_processing = dve_pipeline.business_rule_step( - pool=ThreadPoolExecutor(2), files=[(sub_info, None)] + pool=ThreadPoolExecutor(2), files=[(sub_info, SubmissionStatus())] ) assert len(successful_files) == 1 + assert not successful_files[0][1].validation_failed assert len(unsuccessful_files) == 0 assert len(failed_processing) == 0 diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index c3a7fb2..79d5815 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -126,10 +126,10 @@ def test_apply_data_contract_success( reference_data_loader=None, spark=spark, ) + sub_status = SubmissionStatus() + sub_info, sub_status = dve_pipeline.apply_data_contract(sub_info, sub_status) - _, failed = dve_pipeline.apply_data_contract(sub_info) - - assert not failed + assert not sub_status.validation_failed assert Path(Path(processed_file_path), sub_info.submission_id, "contract", "planets").exists() @@ -147,9 +147,10 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name reference_data_loader=None, spark=spark, ) + sub_status = SubmissionStatus() - _, failed = dve_pipeline.apply_data_contract(sub_info) - assert failed + sub_info, sub_status = dve_pipeline.apply_data_contract(sub_info, sub_status) + assert sub_status.validation_failed output_path = Path(processed_file_path) / sub_info.submission_id assert Path(output_path, "contract", "planets").exists() @@ -210,6 +211,7 @@ def test_data_contract_step( spark_test_database, ): # pylint: disable=redefined-outer-name sub_info, processed_file_path = planet_data_after_file_transformation + sub_status = SubmissionStatus() with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( audit_tables=audit_manager, @@ -221,10 +223,11 @@ def test_data_contract_step( ) success, failed = dve_pipeline.data_contract_step( - pool=ThreadPoolExecutor(2), file_transform_results=[sub_info] + pool=ThreadPoolExecutor(2), file_transform_results=[(sub_info, sub_status)] ) assert len(success) == 1 + assert not success[0][1].validation_failed assert len(failed) == 0 assert Path(processed_file_path, sub_info.submission_id, "contract", "planets").exists() @@ -254,9 +257,9 @@ def test_apply_business_rules_success( spark=spark, ) - _, status = dve_pipeline.apply_business_rules(sub_info, False) + _, status = dve_pipeline.apply_business_rules(sub_info, SubmissionStatus()) - assert not status.failed + assert not status.validation_failed assert status.number_of_records == 1 planets_entity_path = Path( @@ -299,9 +302,9 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out spark=spark, ) - _, status = dve_pipeline.apply_business_rules(sub_info, False) + _, status = dve_pipeline.apply_business_rules(sub_info, SubmissionStatus()) - assert status.failed + assert status.validation_failed assert status.number_of_records == 1 br_path = Path( @@ -382,7 +385,7 @@ def test_business_rule_step( audit_manager.add_new_submissions([sub_info], job_run_id=1) successful_files, unsuccessful_files, failed_processing = dve_pipeline.business_rule_step( - pool=ThreadPoolExecutor(2), files=[(sub_info, None)] + pool=ThreadPoolExecutor(2), files=[(sub_info, SubmissionStatus())] ) assert len(successful_files) == 1 @@ -418,7 +421,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out sub_info, SubmissionStatus(True, 9) ) - assert status.failed + assert status.validation_failed expected = { "submission_id": submission_info.submission_id, From 08ac20b2b8eb84c20785e64b8eb16edd53c95cf2 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 15 Dec 2025 18:09:13 +0000 Subject: [PATCH 6/8] refactor: tweaks around submission status - now optional to pass as can determine from audit (in service based approach). Tests fixed and added to --- src/dve/core_engine/backends/base/auditing.py | 25 +++-- src/dve/core_engine/type_hints.py | 2 +- src/dve/pipeline/foundry_ddb_pipeline.py | 24 +++-- src/dve/pipeline/pipeline.py | 36 +++++-- src/dve/pipeline/utils.py | 2 +- src/dve/reporting/utils.py | 6 +- tests/features/planets.feature | 4 +- .../test_duckdb/test_audit_ddb.py | 98 ++++++++++++++++++- .../test_spark/test_audit_spark.py | 96 +++++++++++++++++- .../test_foundry_ddb_pipeline.py | 2 +- tests/test_pipeline/test_pipeline.py | 5 +- 11 files changed, 257 insertions(+), 43 deletions(-) diff --git a/src/dve/core_engine/backends/base/auditing.py b/src/dve/core_engine/backends/base/auditing.py index d9f6ca6..596a22a 100644 --- a/src/dve/core_engine/backends/base/auditing.py +++ b/src/dve/core_engine/backends/base/auditing.py @@ -330,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs): ProcessingStatusRecord( submission_id=submission_id, processing_status="business_rules", - submission_result="failed" if failed else None, + submission_result="validation_failed" if failed else None, **kwargs, ) for submission_id, failed in submissions @@ -380,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs): """Update submission processing_status to failed.""" recs = [ ProcessingStatusRecord( - submission_id=submission_id, processing_status="failed", **kwargs + submission_id=submission_id, + processing_status="failed", + submission_result="processing_failed", + **kwargs ) for submission_id in submissions ] @@ -494,16 +497,22 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt ) except StopIteration: return None - def get_submission_status(self, submission_id: str) -> SubmissionStatus: + def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]: """Get the latest submission status for a submission""" + + try: + processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records( + self._processing_status.get_most_recent_records( + order_criteria=[OrderCriteria("time_updated", True)], + pre_filter_criteria=[FilterCriteria("submission_id", submission_id)] + ))) + except StopIteration: + return None sub_status = SubmissionStatus() - processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(self._processing_status.get_most_recent_records(order_criteria=[OrderCriteria("time_updated", True)], - pre_filter_criteria=[FilterCriteria("submission_id", - submission_id)]))) sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id) - if processing_rec.processing_status == "failed": + if processing_rec.submission_result == "processing_failed": sub_status.processing_failed = True - if processing_rec.submission_result == "failed": + if processing_rec.submission_result == "validation_failed": sub_status.validation_failed = True if sub_stats_rec: sub_status.number_of_records = sub_stats_rec.record_count diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py index 62b7793..0be3763 100644 --- a/src/dve/core_engine/type_hints.py +++ b/src/dve/core_engine/type_hints.py @@ -236,7 +236,7 @@ PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus))) """List of all possible DVE submission statuses""" -SubmissionResult = Literal["success", "failed", "archived", "processing_failed"] +SubmissionResult = Literal["success", "validation_failed", "archived", "processing_failed"] """Allowed DVE submission results""" SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult))) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 3f6c80c..d59402b 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -41,6 +41,7 @@ def file_transformation( "file_transformation", [CriticalProcessingError.from_exception(exc)] ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) return submission_info, SubmissionStatus(processing_failed=True) def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo | SubmissionStatus]: @@ -54,6 +55,7 @@ def apply_data_contract(self, submission_info: SubmissionInfo, submission_status "contract", [CriticalProcessingError.from_exception(exc)] ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) return submission_info, SubmissionStatus(processing_failed=True) def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus): @@ -67,6 +69,7 @@ def apply_business_rules(self, submission_info: SubmissionInfo, submission_statu "business_rules", [CriticalProcessingError.from_exception(exc)] ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) return submission_info, SubmissionStatus(processing_failed=True) def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus): @@ -82,12 +85,14 @@ def error_report(self, submission_info: SubmissionInfo, submission_status: Submi "error_report", [CriticalProcessingError.from_exception(exc)] ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) return submission_info, submission_status, sub_stats, report_uri - def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]: + def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]: """Sequential single submission pipeline runner""" try: sub_id: str = submission_info.submission_id + report_uri = None self._audit_tables.add_new_submissions(submissions=[submission_info]) self._audit_tables.mark_transform(submission_ids=[sub_id]) sub_info, sub_status = self.file_transformation(submission_info=submission_info) @@ -99,13 +104,14 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], submission_info=submission_info, submission_status=sub_status ) - self._audit_tables.mark_error_report( - submissions=[(sub_id, sub_status.submission_result)] - ) - sub_info, sub_status, sub_stats, report_uri = self.error_report( - submission_info=submission_info, status=sub_status - ) - self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + if not sub_status.processing_failed: + self._audit_tables.mark_error_report( + submissions=[(sub_id, sub_status.submission_result)] + ) + sub_info, sub_status, sub_stats, report_uri = self.error_report( + submission_info=submission_info, submission_status=sub_status + ) + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) except Exception as err: # pylint: disable=W0718 self._logger.error( f"During processing of submission_id: {sub_id}, the following exception was raised: {err}" @@ -124,6 +130,6 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], if (sub_status.validation_failed or sub_status.processing_failed) else fh.joinuri(self.processed_files_path, sub_id, "business_rules") ), - report_uri, + report_uri if report_uri else None, audit_files_uri, ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index be846b3..6270bdd 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -362,7 +362,7 @@ def file_transformation_step( starmap( lambda x, _: ( x.submission_id, - "failed", + "validation_failed", ), failed, ) @@ -377,9 +377,10 @@ def file_transformation_step( return success, failed - def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo, SubmissionStatus]: + def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" - + if not submission_status: + submission_status = self._audit_tables.get_submission_status(submission_info.submission_id) if not self.processed_files_path: raise AttributeError("processed files path not provided") @@ -422,7 +423,7 @@ def apply_data_contract(self, submission_info: SubmissionInfo, submission_status return submission_info, submission_status def data_contract_step( - self, pool: Executor, file_transform_results: list[tuple[SubmissionInfo, SubmissionStatus]] + self, pool: Executor, file_transform_results: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]] ) -> tuple[list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]]: """Step to validate the types of an untyped (stringly typed) parquet file""" processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] @@ -430,6 +431,9 @@ def data_contract_step( dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] for info, sub_status in file_transform_results: + sub_status = ( + sub_status if sub_status + else self._audit_tables.get_submission_status(info.submission_id)) dc_futures.append((info, sub_status, pool.submit(self.apply_data_contract, info, sub_status))) for sub_info, sub_status, future in dc_futures: @@ -476,10 +480,13 @@ def data_contract_step( return processed_files, failed_processing - def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus): + def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None): """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ + if not submission_status: + submission_status = self._audit_tables.get_submission_status(submission_info.submission_id) + if not self.rules_path: raise AttributeError("business rules path not provided.") @@ -557,7 +564,7 @@ def apply_business_rules(self, submission_info: SubmissionInfo, submission_statu def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, SubmissionStatus]], + files: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]], ) -> tuple[ list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]], @@ -567,6 +574,9 @@ def business_rule_step( future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] for submission_info, submission_status in files: + submission_status = ( + submission_status if submission_status + else self._audit_tables.get_submission_status(submission_info.submission_id)) future_files.append( ( submission_info, @@ -677,8 +687,14 @@ def _get_error_dataframes(self, submission_id: str): return errors_df, aggregates - def error_report(self, submission_info: SubmissionInfo, submission_status: SubmissionStatus) -> tuple[SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]]: + def error_report(self, + submission_info: SubmissionInfo, + submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]]: """Creates the error reports given a submission info and submission status""" + + if not submission_status: + submission_status = self._audit_tables.get_submission_status(submission_info.submission_id) + if not self.processed_files_path: raise AttributeError("processed files path not provided") @@ -728,7 +744,7 @@ def error_report(self, submission_info: SubmissionInfo, submission_status: Submi def error_report_step( self, pool: Executor, - processed: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), + processed: Iterable[tuple[SubmissionInfo, Optional[SubmissionStatus]]] = tuple(), failed_file_transformation: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), ) -> list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] @@ -743,6 +759,10 @@ def error_report_step( failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for info, status in processed: + status = ( + status if status + else self._audit_tables.get_submission_status(info.submission_id) + ) futures.append((info, status, pool.submit(self.error_report, info, status))) for info_dict, status in failed_file_transformation: diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index bfffc53..e7d0d64 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -86,5 +86,5 @@ def submission_result(self) -> SubmissionResult: if self.processing_failed: return "processing_failed" if self.validation_failed: - return "failed" + return "validation_failed" return "success" \ No newline at end of file diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py index df2b28a..9cd34cc 100644 --- a/src/dve/reporting/utils.py +++ b/src/dve/reporting/utils.py @@ -48,16 +48,16 @@ def dump_processing_errors( if not working_folder: raise AttributeError("processed files path not passed") - errors = fh.joinuri(working_folder, "errors", f"processing_errors.json") + error_file: URI = fh.joinuri(working_folder, "errors", f"processing_errors.json") processed = [] for error in errors: processed.append({"step_name": step_name, "error_location": "processing", "error_level": "integrity", - "error_message": repr(error)}) + "error_message": error.error_message}) - with fh.open_stream(errors, "a") as f: + with fh.open_stream(error_file, "a") as f: json.dump( processed, f, diff --git a/tests/features/planets.feature b/tests/features/planets.feature index 321f047..c469a92 100644 --- a/tests/features/planets.feature +++ b/tests/features/planets.feature @@ -42,9 +42,7 @@ Feature: Pipeline tests using the planets dataset And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase - Then the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced + Then the latest audit record for the submission is marked with processing status failed Scenario: Handle a file with duplicated extension provided (spark) Given I submit the planets file planets.csv.csv for processing diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py index 989a928..b0598eb 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py @@ -10,7 +10,8 @@ from duckdb import ColumnExpression, ConstantExpression, DuckDBPyConnection from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager -from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord +from dve.pipeline.utils import SubmissionStatus from .....fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -115,14 +116,14 @@ def test_audit_table_update_status(ddb_audit_manager: DDBAuditingManager): == "business_rules" ) - ddb_audit_manager.mark_error_report([(_sub_info.submission_id, "failed")]) + ddb_audit_manager.mark_error_report([(_sub_info.submission_id, "validation_failed")]) assert ( ddb_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status == "error_report" ) - ddb_audit_manager.mark_finished([(_sub_info.submission_id, "failed")]) + ddb_audit_manager.mark_finished([(_sub_info.submission_id, "validation_failed")]) assert ( ddb_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status @@ -350,3 +351,94 @@ def test_get_error_report_submissions(ddb_audit_manager_threaded: DDBAuditingMan assert len(processed) == 2 assert len(dodgy) == 0 assert processed == expected + +def test_get_submission_status(ddb_audit_manager_threaded: DDBAuditingManager): + with ddb_audit_manager_threaded as aud: + # add four submissions + sub_1 = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_2 = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_3 = SubmissionInfo( + submission_id="3", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_4 = SubmissionInfo( + submission_id="4", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + # mark 1 as failed validation, 2 as failed processing, 3 as null and 4 as successful + aud.add_new_submissions([sub_1, sub_2, sub_3, sub_4]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_1.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_2.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ProcessingStatusRecord( + submission_id=sub_3.submission_id, processing_status="business_rules" + ), + ProcessingStatusRecord( + submission_id=sub_4.submission_id, processing_status="error_report", submission_result="success" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_1.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + SubmissionStatisticsRecord(submission_id=sub_4.submission_id, record_count=20, number_record_rejections=0, number_warnings=1) + ]) + + while not aud.queue.empty(): + time.sleep(0.05) + + sub_stats_1 = aud.get_submission_status(sub_1.submission_id) + assert sub_stats_1.submission_result == "validation_failed" + assert sub_stats_1.validation_failed + assert not sub_stats_1.processing_failed + assert sub_stats_1.number_of_records == 5 + sub_stats_2 = aud.get_submission_status(sub_2.submission_id) + assert sub_stats_2.submission_result == "processing_failed" + assert not sub_stats_2.validation_failed + assert sub_stats_2.processing_failed + sub_stats_3 = aud.get_submission_status(sub_3.submission_id) + assert not sub_stats_3.validation_failed + assert not sub_stats_3.processing_failed + sub_stats_4 = aud.get_submission_status(sub_4.submission_id) + assert sub_stats_4.submission_result == "success" + assert not sub_stats_4.validation_failed + assert not sub_stats_4.processing_failed + assert sub_stats_4.number_of_records == 20 + assert not aud.get_submission_status("5") \ No newline at end of file diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py index 4045a90..a5146f9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py @@ -10,8 +10,8 @@ from pyspark.sql.functions import col, lit from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager -from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo -from dve.pipeline.utils import unpersist_all_rdds +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord +from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds from dve.core_engine.backends.implementations.spark.spark_helpers import PYTHON_TYPE_TO_SPARK_TYPE from .....conftest import get_test_file_path @@ -123,14 +123,14 @@ def test_audit_table_update_status(spark_audit_manager: SparkAuditingManager): == "business_rules" ) - spark_audit_manager.mark_error_report([(_sub_info.submission_id, "failed")]) + spark_audit_manager.mark_error_report([(_sub_info.submission_id, "validation_failed")]) assert ( spark_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status == "error_report" ) - spark_audit_manager.mark_finished([(_sub_info.submission_id, "failed")]) + spark_audit_manager.mark_finished([(_sub_info.submission_id, "validation_failed")]) assert ( spark_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status @@ -367,3 +367,91 @@ def test_get_error_report_submissions(spark_audit_manager: SparkAuditingManager) assert len(processed) == 2 assert len(dodgy) == 0 assert processed == expected + +def test_get_submission_status(spark_audit_manager: SparkAuditingManager): + with spark_audit_manager as aud: + # add three submissions + sub_1 = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_2 = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_3 = SubmissionInfo( + submission_id="3", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_4 = SubmissionInfo( + submission_id="4", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + # mark 1 as failed validation, 2 as failed processing, 3 as null and 4 as successful + aud.add_new_submissions([sub_1, sub_2, sub_3, sub_4]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_1.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_2.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ProcessingStatusRecord( + submission_id=sub_3.submission_id, processing_status="business_rules" + ), + ProcessingStatusRecord( + submission_id=sub_4.submission_id, processing_status="error_report", submission_result="success" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_1.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + SubmissionStatisticsRecord(submission_id=sub_4.submission_id, record_count=20, number_record_rejections=0, number_warnings=1) + ]) + + sub_stats_1 = aud.get_submission_status(sub_1.submission_id) + assert sub_stats_1.submission_result == "validation_failed" + assert sub_stats_1.validation_failed + assert not sub_stats_1.processing_failed + assert sub_stats_1.number_of_records == 5 + sub_stats_2 = aud.get_submission_status(sub_2.submission_id) + assert sub_stats_2.submission_result == "processing_failed" + assert not sub_stats_2.validation_failed + assert sub_stats_2.processing_failed + sub_stats_3 = aud.get_submission_status(sub_3.submission_id) + assert not sub_stats_3.validation_failed + assert not sub_stats_3.processing_failed + sub_stats_4 = aud.get_submission_status(sub_4.submission_id) + assert sub_stats_4.submission_result == "success" + assert not sub_stats_4.validation_failed + assert not sub_stats_4.processing_failed + assert sub_stats_4.number_of_records == 20 + assert not aud.get_submission_status("5") diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index dcdb59b..df58b59 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -113,6 +113,6 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn): reference_data_loader=DuckDBRefDataLoader, ) output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) - assert fh.get_resource_exists(report_uri) + assert not fh.get_resource_exists(report_uri) assert not output_loc assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/test_pipeline/test_pipeline.py b/tests/test_pipeline/test_pipeline.py index 429e947..38418d6 100644 --- a/tests/test_pipeline/test_pipeline.py +++ b/tests/test_pipeline/test_pipeline.py @@ -90,7 +90,8 @@ def test_file_transformation(planet_test_files): # pylint: disable=redefined-ou output_path = Path(tdir, submitted_file_info.submission_id, "transform") - result = dve_pipeline.file_transformation(submitted_file_info) + sub_info, sub_status = dve_pipeline.file_transformation(submitted_file_info) - assert isinstance(result, SubmissionInfo) + assert isinstance(sub_info, SubmissionInfo) + assert not sub_status.processing_failed assert output_path.joinpath("planets").exists() From 947efddefa79ed0e590feb1fe8b6e5052a5f5dea Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:36:28 +0000 Subject: [PATCH 7/8] feat: Added logic to persist error aggregates data to audit subfolder in processing location --- src/dve/pipeline/foundry_ddb_pipeline.py | 6 ++++++ src/dve/pipeline/pipeline.py | 19 ++++++++++++++++++- tests/features/movies.feature | 2 ++ tests/features/steps/steps_post_pipeline.py | 6 ++++++ 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index d59402b..90dcee5 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -5,6 +5,8 @@ from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed +from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation +from dve.parser.file_handling.service import _get_implementation from dve.pipeline.duckdb_pipeline import DDBDVEPipeline from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh @@ -18,6 +20,10 @@ class FoundryDDBPipeline(DDBDVEPipeline): def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") + if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation): + output_uri = fh.file_uri_to_local_path(output_uri) + output_uri.parent.mkdir(parents=True, exist_ok=True) + output_uri = output_uri.as_posix() self.write_parquet( self._audit_tables._processing_status.get_relation(), write_to + "processing_status.parquet", diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 6270bdd..20d0738 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -2,6 +2,7 @@ """Generic Pipeline object to define how DVE should be interacted with.""" from itertools import starmap import json +from pathlib import Path import re from collections import defaultdict from collections.abc import Generator, Iterable, Iterator @@ -16,6 +17,8 @@ from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.message import FeedbackMessage +from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation +from dve.parser.file_handling.service import _get_implementation import dve.reporting.excel_report as er from dve.core_engine.backends.base.auditing import BaseAuditingManager from dve.core_engine.backends.base.contract import BaseDataContract @@ -635,7 +638,18 @@ def business_rule_step( ) return successful_files, unsucessful_files, failed_processing - + + def _publish_error_aggregates(self, submission_id:str, aggregates_df: pl.DataFrame) -> URI: + """Store error aggregates as parquet for auditing""" + output_uri = fh.joinuri(self.processed_files_path, submission_id, "audit", "error_aggregates.parquet") + if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation): + output_uri = fh.file_uri_to_local_path(output_uri) + output_uri.parent.mkdir(parents=True, exist_ok=True) + output_uri = output_uri.as_posix() + aggregates_df = aggregates_df.with_columns(pl.lit(submission_id).alias("submission_id")) + aggregates_df.write_parquet(output_uri) + return output_uri + @lru_cache() # noqa: B019 def _get_error_dataframes(self, submission_id: str): if not self.processed_files_path: @@ -738,6 +752,8 @@ def error_report(self, ) with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) + + self._publish_error_aggregates(submission_info.submission_id, aggregates) return submission_info, submission_status, sub_stats, report_uri @@ -842,3 +858,4 @@ def cluster_pipeline_run( ) yield from report_results # type: ignore + diff --git a/tests/features/movies.feature b/tests/features/movies.feature index e55de5c..b148547 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -41,6 +41,7 @@ Feature: Pipeline tests using the movies dataset | record_count | 5 | | number_record_rejections | 4 | | number_warnings | 1 | + And the error aggregates are persisted Scenario: Validate and filter movies (duckdb) Given I submit the movies file movies.json for processing @@ -76,4 +77,5 @@ Feature: Pipeline tests using the movies dataset | record_count | 5 | | number_record_rejections | 4 | | number_warnings | 1 | + And the error aggregates are persisted diff --git a/tests/features/steps/steps_post_pipeline.py b/tests/features/steps/steps_post_pipeline.py index 23ea97d..be679ba 100644 --- a/tests/features/steps/steps_post_pipeline.py +++ b/tests/features/steps/steps_post_pipeline.py @@ -111,3 +111,9 @@ def check_stats_record(context): get_pipeline(context)._audit_tables.get_submission_statistics(sub_info.submission_id).dict() ) assert all([val == stats.get(fld) for fld, val in expected.items()]) + +@then("the error aggregates are persisted") +def check_error_aggregates_persisted(context): + processing_location = get_processing_location(context) + agg_file = Path(processing_location, "audit", "error_aggregates.parquet") + assert agg_file.exists() and agg_file.is_file() From 765a26336277dfc9a3d9086786b83aab054ba719 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 15 Dec 2025 22:28:46 +0000 Subject: [PATCH 8/8] fix: fixed failing test for foundry pipeline introduced by previous commit --- src/dve/pipeline/foundry_ddb_pipeline.py | 12 ++++++------ tests/test_pipeline/test_foundry_ddb_pipeline.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 90dcee5..c7ef8ad 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -20,17 +20,17 @@ class FoundryDDBPipeline(DDBDVEPipeline): def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") - if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation): - output_uri = fh.file_uri_to_local_path(output_uri) - output_uri.parent.mkdir(parents=True, exist_ok=True) - output_uri = output_uri.as_posix() + if isinstance(_get_implementation(write_to), LocalFilesystemImplementation): + write_to = fh.file_uri_to_local_path(write_to) + write_to.parent.mkdir(parents=True, exist_ok=True) + write_to = write_to.as_posix() self.write_parquet( self._audit_tables._processing_status.get_relation(), - write_to + "processing_status.parquet", + fh.joinuri(write_to, "processing_status.parquet"), ) self.write_parquet( self._audit_tables._submission_statistics.get_relation(), - write_to + "submission_statistics.parquet", + fh.joinuri(write_to, "submission_statistics.parquet"), ) return write_to diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index df58b59..431941e 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -49,7 +49,7 @@ def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert fh.get_resource_exists(report_uri) assert not output_loc - assert len(list(fh.iter_prefix(audit_files))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 3 def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): @@ -86,7 +86,7 @@ def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert fh.get_resource_exists(report_uri) assert len(list(fh.iter_prefix(output_loc))) == 2 - assert len(list(fh.iter_prefix(audit_files))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 3 def test_foundry_runner_error(planet_test_files, temp_ddb_conn): # using spark reader config - should error in file transformation - check gracefully handled