From f0dbbe78908e29fd977484c4a1412f17370a1fdf Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:11:33 +0000 Subject: [PATCH 1/5] feat: initial work to add Foundry pipeline for running DVE --- src/dve/pipeline/duckdb_pipeline.py | 5 ++- src/dve/pipeline/foundry_ddb_pipeline.py | 43 +++++++++++++++++++++ src/dve/pipeline/pipeline.py | 2 +- src/dve/pipeline/spark_pipeline.py | 4 +- tests/test_pipeline/test_duckdb_pipeline.py | 31 +++++++++++++++ 5 files changed, 80 insertions(+), 5 deletions(-) create mode 100644 src/dve/pipeline/foundry_ddb_pipeline.py diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 4e7707b..483f71e 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -24,23 +24,23 @@ class DDBDVEPipeline(BaseDVEPipeline): def __init__( self, audit_tables: DDBAuditingManager, - job_run_id: int, connection: DuckDBPyConnection, rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None, ): self._connection = connection super().__init__( audit_tables, - job_run_id, DuckDBDataContract(connection=self._connection), DuckDBStepImplementations.register_udfs(connection=self._connection), rules_path, processed_files_path, submitted_files_path, reference_data_loader, + job_run_id ) # pylint: disable=arguments-differ @@ -50,3 +50,4 @@ def write_file_to_parquet( # type: ignore return super().write_file_to_parquet( submission_file_uri, submission_info, output, DuckDBPyRelation ) + diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py new file mode 100644 index 0000000..0f86f49 --- /dev/null +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -0,0 +1,43 @@ +"""A duckdb pipeline for running on Foundry platform""" +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet +from dve.core_engine.models import SubmissionInfo +from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus +from dve.parser import file_handling as fh + +@duckdb_write_parquet +class FoundryDDBPipeline(DDBDVEPipeline): + """DuckDB pipeline for running on Foundry Platform""" + def persist_audit_records(self, submission_info: SubmissionInfo): + """Write out key audit relations to parquet for persisting to datasets""" + write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") + self.write_parquet( + self._audit_tables._processing_status.get_relation(), + write_to + "processing_status.parquet") + self.write_parquet( + self._audit_tables._submission_statistics.get_relation(), + write_to + "submission_statistics.parquet") + + def run_pipeline(self, submission_info: SubmissionInfo): + """Sequential single submission pipeline runner""" + try: + sub_id: str = submission_info.submission_id + self._audit_tables.add_new_submissions(submissions=[submission_info]) + self._audit_tables.mark_transform(submission_ids=[sub_id]) + sub_info = self.file_transformation(submission_info=submission_info) + if isinstance(sub_info, SubmissionInfo): + self._audit_tables.mark_data_contract(submission_ids=[sub_id]) + sub_info, failed = self.apply_data_contract(submission_info=submission_info) + self._audit_tables.mark_business_rules(submissions=[(sub_info, failed)]) + sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed) + else: + sub_status = SubmissionStatus(failed=True) + self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)]) + sub_info, sub_status, sub_stats = self.error_report(submission_info=submission_info) + self._audit_tables.add_submission_statistics_records(subs_stats=[sub_stats]) + except Exception as err: + self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}") + self._audit_tables.mark_failed(submissions=[sub_id]) + finally: + self.persist_audit_records(submission_info=submission_info) + \ No newline at end of file diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 69cb141..e78ad21 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -44,13 +44,13 @@ class BaseDVEPipeline: def __init__( self, audit_tables: BaseAuditingManager, - job_run_id: int, data_contract: BaseDataContract, step_implementations: Optional[BaseStepImplementations[EntityType]], rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index d31a2ee..db5cc0a 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -26,23 +26,23 @@ class SparkDVEPipeline(BaseDVEPipeline): def __init__( self, audit_tables: SparkAuditingManager, - job_run_id: int, rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, + job_run_id: Optional[int] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( audit_tables, - job_run_id, SparkDataContract(spark_session=self._spark), SparkStepImplementations.register_udfs(self._spark), rules_path, processed_files_path, submitted_files_path, reference_data_loader, + job_run_id ) # pylint: disable=arguments-differ diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 5619735..96c77b9 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -4,6 +4,7 @@ from concurrent.futures import ThreadPoolExecutor from pathlib import Path +import shutil from typing import Dict, Tuple from uuid import uuid4 @@ -16,6 +17,7 @@ from dve.core_engine.models import SubmissionInfo import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -204,3 +206,32 @@ def test_error_report_step( audit_result = audit_manager.get_current_processing_info(submitted_file_info.submission_id) assert audit_result.processing_status == "success" + +def test_foundry_runner_success(planet_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=PLANETS_RULES_PATH) + + shutil.copytree() + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=PLANETS_RULES_PATH, + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + + +def test_foundry_runner_fail(): + pass + +def test_foundry_runner_error(): + pass From 047c6a53ed8888c57ef205d96d4644f1a59ab808 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 8 Dec 2025 23:47:07 +0000 Subject: [PATCH 2/5] feat: amend foundry pipeline to include exception handling as not using steps. Ensure that file transformation errors are being persisted --- src/dve/core_engine/exceptions.py | 13 +- src/dve/core_engine/message.py | 1 - src/dve/pipeline/foundry_ddb_pipeline.py | 52 +++++++- src/dve/pipeline/pipeline.py | 24 ++-- tests/features/books.feature | 104 +++++++-------- tests/fixtures.py | 1 + .../test_duckdb/test_rules.py | 2 +- tests/test_pipeline/pipeline_helpers.py | 7 ++ tests/test_pipeline/test_duckdb_pipeline.py | 30 ----- .../test_foundry_ddb_pipeline.py | 118 ++++++++++++++++++ tests/testdata/movies/good_movies.json | 46 +++++++ 11 files changed, 299 insertions(+), 99 deletions(-) create mode 100644 tests/test_pipeline/test_foundry_ddb_pipeline.py create mode 100644 tests/testdata/movies/good_movies.json diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index dae8647..fd54a8c 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,6 +1,7 @@ """Exceptions emitted by the pipeline.""" from collections.abc import Iterator +from typing import Any from dve.core_engine.backends.implementations.spark.types import SparkEntities from dve.core_engine.message import FeedbackMessage @@ -14,7 +15,7 @@ def __init__( self, error_message: str, *args: object, messages: Messages, entities: SparkEntities ) -> None: super().__init__(error_message, *args) - self.error_messsage = error_message + self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages """The messages gathered at the time the error was emitted.""" @@ -25,6 +26,16 @@ def __init__( def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) + + def to_feedback_message(self) -> FeedbackMessage: + return FeedbackMessage( + entity=None, + record=None, + failure_type="integrity", + error_type="processing", + error_location="Whole File", + error_message=self.error_message + ) class EntityTypeMismatch(TypeError): diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index 7dd4f02..bfa0bb4 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -445,7 +445,6 @@ def to_dict( self.to_row(key_field, max_number_of_values, value_separator, record_converter), ) ) - def __hash__(self): return hash(str(self)) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 0f86f49..cd999cd 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -1,6 +1,9 @@ """A duckdb pipeline for running on Foundry platform""" +from typing import List, Optional, Tuple from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet +from dve.core_engine.backends.utilities import dump_errors from dve.core_engine.models import SubmissionInfo +from dve.core_engine.type_hints import URI, Failed from dve.pipeline.duckdb_pipeline import DDBDVEPipeline from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh @@ -8,7 +11,7 @@ @duckdb_write_parquet class FoundryDDBPipeline(DDBDVEPipeline): """DuckDB pipeline for running on Foundry Platform""" - def persist_audit_records(self, submission_info: SubmissionInfo): + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") self.write_parquet( @@ -17,8 +20,37 @@ def persist_audit_records(self, submission_info: SubmissionInfo): self.write_parquet( self._audit_tables._submission_statistics.get_relation(), write_to + "submission_statistics.parquet") + return write_to - def run_pipeline(self, submission_info: SubmissionInfo): + def file_transformation(self, submission_info: SubmissionInfo) -> SubmissionInfo | dict[str, str]: + try: + return super().file_transformation(submission_info) + except Exception as exc: + self._logger.error(f"File transformation raised exception: {exc}") + self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + return submission_info.dict() + + def apply_data_contract(self, submission_info: SubmissionInfo) -> Tuple[SubmissionInfo | bool]: + try: + return super().apply_data_contract(submission_info) + except Exception as exc: + self._logger.error(f"Apply data contract raised exception: {exc}") + self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + return submission_info, True + + def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): + try: + return super().apply_business_rules(submission_info, failed) + except Exception as exc: + self._logger.error(f"Apply business rules raised exception: {exc}") + self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + return submission_info, SubmissionStatus(failed=True) + + + def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], URI, URI]: """Sequential single submission pipeline runner""" try: sub_id: str = submission_info.submission_id @@ -28,16 +60,24 @@ def run_pipeline(self, submission_info: SubmissionInfo): if isinstance(sub_info, SubmissionInfo): self._audit_tables.mark_data_contract(submission_ids=[sub_id]) sub_info, failed = self.apply_data_contract(submission_info=submission_info) - self._audit_tables.mark_business_rules(submissions=[(sub_info, failed)]) + self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed) else: sub_status = SubmissionStatus(failed=True) self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)]) - sub_info, sub_status, sub_stats = self.error_report(submission_info=submission_info) - self._audit_tables.add_submission_statistics_records(subs_stats=[sub_stats]) + sub_info, sub_status, sub_stats, report_uri = self.error_report(submission_info=submission_info, status=sub_status) + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) except Exception as err: self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}") self._audit_tables.mark_failed(submissions=[sub_id]) finally: - self.persist_audit_records(submission_info=submission_info) + audit_files_uri = self.persist_audit_records(submission_info=submission_info) + return ( + None if sub_status.failed else fh.joinuri( + self.processed_files_path, + sub_id, + "business_rules"), + report_uri, + audit_files_uri + ) \ No newline at end of file diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index eb77c62..d36796b 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -13,13 +13,15 @@ import polars as pl from pydantic import validate_arguments +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.message import FeedbackMessage import dve.reporting.excel_report as er from dve.core_engine.backends.base.auditing import BaseAuditingManager from dve.core_engine.backends.base.contract import BaseDataContract from dve.core_engine.backends.base.core import EntityManager from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.base.rules import BaseStepImplementations -from dve.core_engine.backends.exceptions import MessageBearingError +from dve.core_engine.backends.exceptions import BackendError, MessageBearingError, ReaderLacksEntityTypeSupport from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType from dve.core_engine.backends.utilities import dump_errors, stringify_model @@ -274,6 +276,16 @@ def file_transformation( errors = self.write_file_to_parquet( submission_file_uri, submission_info, self.processed_files_path ) + + except Exception as exc: # pylint: disable=broad-except + self._logger.error(f"Unexpected file transformation error: {exc}") + self._logger.exception(exc) + # TODO: should this go to processing_errors.json? + # TODO: shouldn't be seen by user and don't need to maintain feedback message structure + errors = [CriticalProcessingError(entities=None, + error_message=repr(exc), + messages=[]).to_feedback_message()] + finally: if errors: dump_errors( fh.joinuri(self.processed_files_path, submission_info.submission_id), @@ -282,13 +294,6 @@ def file_transformation( ) return submission_info.dict() return submission_info - except ValueError as exc: - self._logger.error(f"File transformation write_file_to_parquet raised error: {exc}") - return submission_info.dict() - except Exception as exc: # pylint: disable=broad-except - self._logger.error(f"Unexpected file transformation error: {exc}") - self._logger.exception(exc) - return submission_info.dict() def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] @@ -321,6 +326,7 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue @@ -423,6 +429,7 @@ def data_contract_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Data Contract raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue @@ -562,6 +569,7 @@ def business_rule_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Business Rules raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue diff --git a/tests/features/books.feature b/tests/features/books.feature index 6313a07..5b03e7d 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -4,59 +4,59 @@ Feature: Pipeline tests using the books dataset This tests submissions using nested, complex JSON datasets with arrays, and introduces more complex transformations that require aggregation. - Scenario: Validate complex nested XML data (spark) - Given I submit the books file nested_books.xml for processing - And A spark pipeline is configured with schema file 'nested_books.dischema.json' - And I add initial audit entries for the submission - Then the latest audit record for the submission is marked with processing status file_transformation - When I run the file transformation phase - Then the header entity is stored as a parquet after the file_transformation phase - And the nested_books entity is stored as a parquet after the file_transformation phase - And the latest audit record for the submission is marked with processing status data_contract - When I run the data contract phase - Then there is 1 record rejection from the data_contract phase - And the header entity is stored as a parquet after the data_contract phase - And the nested_books entity is stored as a parquet after the data_contract phase - And the latest audit record for the submission is marked with processing status business_rules - When I run the business rules phase - Then The rules restrict "nested_books" to 3 qualifying records - And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - And the nested_books entity is stored as a parquet after the business_rules phase - And the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced - And The statistics entry for the submission shows the following information - | parameter | value | - | record_count | 4 | - | number_record_rejections | 2 | - | number_warnings | 0 | + # Scenario: Validate complex nested XML data (spark) + # Given I submit the books file nested_books.xml for processing + # And A spark pipeline is configured with schema file 'nested_books.dischema.json' + # And I add initial audit entries for the submission + # Then the latest audit record for the submission is marked with processing status file_transformation + # When I run the file transformation phase + # Then the header entity is stored as a parquet after the file_transformation phase + # And the nested_books entity is stored as a parquet after the file_transformation phase + # And the latest audit record for the submission is marked with processing status data_contract + # When I run the data contract phase + # Then there is 1 record rejection from the data_contract phase + # And the header entity is stored as a parquet after the data_contract phase + # And the nested_books entity is stored as a parquet after the data_contract phase + # And the latest audit record for the submission is marked with processing status business_rules + # When I run the business rules phase + # Then The rules restrict "nested_books" to 3 qualifying records + # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + # And the nested_books entity is stored as a parquet after the business_rules phase + # And the latest audit record for the submission is marked with processing status error_report + # When I run the error report phase + # Then An error report is produced + # And The statistics entry for the submission shows the following information + # | parameter | value | + # | record_count | 4 | + # | number_record_rejections | 2 | + # | number_warnings | 0 | - Scenario: Validate complex nested XML data (duckdb) - Given I submit the books file nested_books.xml for processing - And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' - And I add initial audit entries for the submission - Then the latest audit record for the submission is marked with processing status file_transformation - When I run the file transformation phase - Then the header entity is stored as a parquet after the file_transformation phase - And the nested_books entity is stored as a parquet after the file_transformation phase - And the latest audit record for the submission is marked with processing status data_contract - When I run the data contract phase - Then there is 1 record rejection from the data_contract phase - And the header entity is stored as a parquet after the data_contract phase - And the nested_books entity is stored as a parquet after the data_contract phase - And the latest audit record for the submission is marked with processing status business_rules - When I run the business rules phase - Then The rules restrict "nested_books" to 3 qualifying records - And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - And the nested_books entity is stored as a parquet after the business_rules phase - And the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced - And The statistics entry for the submission shows the following information - | parameter | value | - | record_count | 4 | - | number_record_rejections | 2 | - | number_warnings | 0 | + # Scenario: Validate complex nested XML data (duckdb) + # Given I submit the books file nested_books.xml for processing + # And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' + # And I add initial audit entries for the submission + # Then the latest audit record for the submission is marked with processing status file_transformation + # When I run the file transformation phase + # Then the header entity is stored as a parquet after the file_transformation phase + # And the nested_books entity is stored as a parquet after the file_transformation phase + # And the latest audit record for the submission is marked with processing status data_contract + # When I run the data contract phase + # Then there is 1 record rejection from the data_contract phase + # And the header entity is stored as a parquet after the data_contract phase + # And the nested_books entity is stored as a parquet after the data_contract phase + # And the latest audit record for the submission is marked with processing status business_rules + # When I run the business rules phase + # Then The rules restrict "nested_books" to 3 qualifying records + # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + # And the nested_books entity is stored as a parquet after the business_rules phase + # And the latest audit record for the submission is marked with processing status error_report + # When I run the error report phase + # Then An error report is produced + # And The statistics entry for the submission shows the following information + # | parameter | value | + # | record_count | 4 | + # | number_record_rejections | 2 | + # | number_warnings | 0 | Scenario: Handle a file with a malformed tag (duckdb) Given I submit the books file malformed_books.xml for processing diff --git a/tests/fixtures.py b/tests/fixtures.py index 9229fc8..8a9a147 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -120,4 +120,5 @@ def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]: with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp: db_file = Path(tmp, db + ".duckdb") conn = connect(database=db_file, read_only=False) + yield db_file, conn diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py index dafac4d..038b4e9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py @@ -457,7 +457,7 @@ def test_one_to_one_join_multi_matches_raises( new_columns={"satellites.name": "satellite"}, ) entities = EntityManager({"planets": planets_rel, "satellites": satellites_rel}) - with pytest.raises(ValueError, match="Multiple matches for some records.+"): + with pytest.raises(ValueError, match="Multiple matches for some records.*"): DUCKDB_STEP_BACKEND.one_to_one_join(entities, config=join) diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py index de2ee10..1518ccf 100644 --- a/tests/test_pipeline/pipeline_helpers.py +++ b/tests/test_pipeline/pipeline_helpers.py @@ -66,6 +66,13 @@ def planet_test_files() -> Iterator[str]: shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets")) yield tdir + "/planets" +@pytest.fixture(scope="function") +def movies_test_files() -> Iterator[str]: + clear_config_cache() + with tempfile.TemporaryDirectory() as tdir: + shutil.copytree(get_test_file_path("movies/"), Path(tdir, "movies")) + yield tdir + "/movies" + @pytest.fixture(scope="function") def planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionInfo, str]]: diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 96c77b9..8512494 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -17,7 +17,6 @@ from dve.core_engine.models import SubmissionInfo import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline -from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -206,32 +205,3 @@ def test_error_report_step( audit_result = audit_manager.get_current_processing_info(submitted_file_info.submission_id) assert audit_result.processing_status == "success" - -def test_foundry_runner_success(planet_test_files, temp_ddb_conn): - db_file, conn = temp_ddb_conn - processing_folder = planet_test_files - - DuckDBRefDataLoader.connection = conn - DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) - sub_id = uuid4().hex - sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, - metadata_uri=PLANETS_RULES_PATH) - - shutil.copytree() - - with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: - dve_pipeline = FoundryDDBPipeline( - audit_tables=audit_manager, - connection=conn, - rules_path=PLANETS_RULES_PATH, - processed_files_path=processing_folder, - submitted_files_path=None, - reference_data_loader=DuckDBRefDataLoader, - ) - - -def test_foundry_runner_fail(): - pass - -def test_foundry_runner_error(): - pass diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py new file mode 100644 index 0000000..dcdb59b --- /dev/null +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -0,0 +1,118 @@ +"""Test DuckDBPipeline object methods""" +# pylint: disable=missing-function-docstring +# pylint: disable=protected-access + +from datetime import datetime +from pathlib import Path +import shutil +from uuid import uuid4 + +import pytest + +from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager +from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader +from dve.core_engine.models import SubmissionInfo +import dve.parser.file_handling as fh +from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline + +from ..conftest import get_test_file_path +from ..fixtures import temp_ddb_conn # pylint: disable=unused-import +from .pipeline_helpers import ( # pylint: disable=unused-import + PLANETS_RULES_PATH, + planet_test_files, + movies_test_files +) + +def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 + + +def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + # add movies refdata to conn + ref_db_file = Path(db_file.parent, f"movies_refdata.duckdb").as_posix() + conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata") + conn.read_parquet(get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()).to_table(f"movies_refdata.sequels") + processing_folder = movies_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo(submission_id=sub_id, + dataset_id="movies", + file_name="good_movies", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5)) + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(movies_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = None + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert len(list(fh.iter_prefix(output_loc))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 2 + +def test_foundry_runner_error(planet_test_files, temp_ddb_conn): + # using spark reader config - should error in file transformation - check gracefully handled + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/testdata/movies/good_movies.json b/tests/testdata/movies/good_movies.json new file mode 100644 index 0000000..acc7026 --- /dev/null +++ b/tests/testdata/movies/good_movies.json @@ -0,0 +1,46 @@ +[ + { + "title": "Not a great one", + "year": 2017, + "genre": ["Animation", "Comedy", "Family"], + "duration_minutes": 88, + "ratings": [6.9, 7.8], + "cast": [ + { "name": "J. Smith", "role": "Lion", "date_joined": "2015-11-12" }, + { "name": "T. Car", "role": "Mouse", "date_joined": "2015-11-12" } + ] + }, + { + "title": "Good family movie", + "year": 2022, + "genre": ["Sci-Fi", "Family"], + "duration_minutes": 95, + "ratings": [7, 8.2, 6.3], + "cast": [ + { "name": "D. Farnesbarnes", "role": "Robot" }, + { "name": "G. Adams", "role": "Alien", "date_joined": "2017-08-01" } + ] + }, + { + "title": "One with a cat and a dog", + "year": 2020, + "genre": ["Fantasy", "Family"], + "duration_minutes": 110, + "ratings": [6.1, 6.2], + "cast": [ + { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, + { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } + ] + }, + { + "title": "A bad 'un", + "year": 2011, + "genre": ["Mystery", "Family"], + "duration_minutes": 97, + "ratings": [1.2, 3.4, 5.6, 3.4], + "cast": [ + { "name": "R. Green", "role": "Baby", "date_joined": "2013-11-12" }, + { "name": "P. Plum", "role": "Dad", "date_joined": "2013-10-08" } + ] + } +] \ No newline at end of file From b356bd270bab3b6cc64a94d4c33c4f17d0bb411d Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 9 Dec 2025 21:02:06 +0000 Subject: [PATCH 3/5] feat: some formatting. Tweaked how errors are handled within file transformation --- src/dve/core_engine/exceptions.py | 8 +-- src/dve/core_engine/message.py | 1 + src/dve/pipeline/duckdb_pipeline.py | 3 +- src/dve/pipeline/foundry_ddb_pipeline.py | 73 +++++++++++++----------- src/dve/pipeline/pipeline.py | 52 ++++++++++------- src/dve/pipeline/spark_pipeline.py | 2 +- 6 files changed, 80 insertions(+), 59 deletions(-) diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index fd54a8c..d2faaf5 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,7 +1,6 @@ """Exceptions emitted by the pipeline.""" from collections.abc import Iterator -from typing import Any from dve.core_engine.backends.implementations.spark.types import SparkEntities from dve.core_engine.message import FeedbackMessage @@ -26,16 +25,17 @@ def __init__( def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) - + def to_feedback_message(self) -> FeedbackMessage: + "Convert to feedback message to write to json file" return FeedbackMessage( entity=None, record=None, failure_type="integrity", error_type="processing", error_location="Whole File", - error_message=self.error_message - ) + error_message=self.error_message, + ) class EntityTypeMismatch(TypeError): diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index bfa0bb4..7dd4f02 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -445,6 +445,7 @@ def to_dict( self.to_row(key_field, max_number_of_values, value_separator, record_converter), ) ) + def __hash__(self): return hash(str(self)) diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 483f71e..390e15b 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -40,7 +40,7 @@ def __init__( processed_files_path, submitted_files_path, reference_data_loader, - job_run_id + job_run_id, ) # pylint: disable=arguments-differ @@ -50,4 +50,3 @@ def write_file_to_parquet( # type: ignore return super().write_file_to_parquet( submission_file_uri, submission_info, output, DuckDBPyRelation ) - diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index cd999cd..04da114 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -1,6 +1,7 @@ """A duckdb pipeline for running on Foundry platform""" -from typing import List, Optional, Tuple -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet + +from typing import Optional +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet from dve.core_engine.backends.utilities import dump_errors from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed @@ -8,49 +9,49 @@ from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh -@duckdb_write_parquet class FoundryDDBPipeline(DDBDVEPipeline): """DuckDB pipeline for running on Foundry Platform""" + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") self.write_parquet( self._audit_tables._processing_status.get_relation(), - write_to + "processing_status.parquet") + write_to + "processing_status.parquet", + ) self.write_parquet( self._audit_tables._submission_statistics.get_relation(), - write_to + "submission_statistics.parquet") + write_to + "submission_statistics.parquet", + ) return write_to - - def file_transformation(self, submission_info: SubmissionInfo) -> SubmissionInfo | dict[str, str]: + + def file_transformation( + self, submission_info: SubmissionInfo + ) -> SubmissionInfo | dict[str, str]: try: return super().file_transformation(submission_info) - except Exception as exc: + except Exception as exc: # pylint: disable=W0718 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) return submission_info.dict() - - def apply_data_contract(self, submission_info: SubmissionInfo) -> Tuple[SubmissionInfo | bool]: + + def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]: try: return super().apply_data_contract(submission_info) - except Exception as exc: + except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply data contract raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) return submission_info, True - + def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): try: return super().apply_business_rules(submission_info, failed) - except Exception as exc: + except Exception as exc: # pylint: disable=W0718 self._logger.error(f"Apply business rules raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) return submission_info, SubmissionStatus(failed=True) - - - def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], URI, URI]: + + def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]: """Sequential single submission pipeline runner""" try: sub_id: str = submission_info.submission_id @@ -61,23 +62,31 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> Tuple[Optional[URI], self._audit_tables.mark_data_contract(submission_ids=[sub_id]) sub_info, failed = self.apply_data_contract(submission_info=submission_info) self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) - sub_info, sub_status = self.apply_business_rules(submission_info=submission_info, failed= failed) + sub_info, sub_status = self.apply_business_rules( + submission_info=submission_info, failed=failed + ) else: - sub_status = SubmissionStatus(failed=True) - self._audit_tables.mark_error_report(submissions=[(sub_id, sub_status.submission_result)]) - sub_info, sub_status, sub_stats, report_uri = self.error_report(submission_info=submission_info, status=sub_status) + sub_status = SubmissionStatus(failed=True) + self._audit_tables.mark_error_report( + submissions=[(sub_id, sub_status.submission_result)] + ) + sub_info, sub_status, sub_stats, report_uri = self.error_report( + submission_info=submission_info, status=sub_status + ) self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) - except Exception as err: - self._logger.error(f"During processing of submission_id: {sub_id}, the following exception was raised: {err}") + except Exception as err: # pylint: disable=W0718 + self._logger.error( + f"During processing of submission_id: {sub_id}, the following exception was raised: {err}" + ) self._audit_tables.mark_failed(submissions=[sub_id]) finally: audit_files_uri = self.persist_audit_records(submission_info=submission_info) - return ( - None if sub_status.failed else fh.joinuri( - self.processed_files_path, - sub_id, - "business_rules"), + return ( + ( + None + if sub_status.failed + else fh.joinuri(self.processed_files_path, sub_id, "business_rules") + ), report_uri, - audit_files_uri + audit_files_uri, ) - \ No newline at end of file diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index d36796b..15008a9 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -21,7 +21,11 @@ from dve.core_engine.backends.base.core import EntityManager from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.base.rules import BaseStepImplementations -from dve.core_engine.backends.exceptions import BackendError, MessageBearingError, ReaderLacksEntityTypeSupport +from dve.core_engine.backends.exceptions import ( + BackendError, + MessageBearingError, + ReaderLacksEntityTypeSupport, +) from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType from dve.core_engine.backends.utilities import dump_errors, stringify_model @@ -52,7 +56,7 @@ def __init__( processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, - job_run_id: Optional[int] = None + job_run_id: Optional[int] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -267,33 +271,41 @@ def file_transformation( if not self.processed_files_path: raise AttributeError("processed files path not provided") + errors: list[FeedbackMessage] = [] submission_file_uri: URI = fh.joinuri( self.processed_files_path, submission_info.submission_id, submission_info.file_name_with_ext, ) try: - errors = self.write_file_to_parquet( + errors.extend(self.write_file_to_parquet( submission_file_uri, submission_info, self.processed_files_path - ) + )) + + except MessageBearingError as exc: + self._logger.error(f"Unexpected file transformation error: {exc}") + self._logger.exception(exc) + errors.extend(exc.messages) - except Exception as exc: # pylint: disable=broad-except + except BackendError as exc: # pylint: disable=broad-except self._logger.error(f"Unexpected file transformation error: {exc}") self._logger.exception(exc) - # TODO: should this go to processing_errors.json? - # TODO: shouldn't be seen by user and don't need to maintain feedback message structure - errors = [CriticalProcessingError(entities=None, - error_message=repr(exc), - messages=[]).to_feedback_message()] - finally: - if errors: - dump_errors( - fh.joinuri(self.processed_files_path, submission_info.submission_id), - "file_transformation", - errors, - ) - return submission_info.dict() - return submission_info + errors.extend([ + CriticalProcessingError( + entities=None, + error_message=repr(exc), + messages=[], + ).to_feedback_message() + ]) + + if errors: + dump_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + errors, + ) + return submission_info.dict() + return submission_info def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] @@ -326,7 +338,7 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - # TODO: write errors to file here (maybe processing errors - not to be seen by end user) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index db5cc0a..ae77cfd 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -42,7 +42,7 @@ def __init__( processed_files_path, submitted_files_path, reference_data_loader, - job_run_id + job_run_id, ) # pylint: disable=arguments-differ From 8b787cbadafd2fda743c85dd8f71ca13cd6f352d Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Wed, 10 Dec 2025 08:09:28 +0000 Subject: [PATCH 4/5] test: reenabled books behave tests --- tests/features/books.feature | 104 +++++++++++++++++------------------ 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/tests/features/books.feature b/tests/features/books.feature index 5b03e7d..6313a07 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -4,59 +4,59 @@ Feature: Pipeline tests using the books dataset This tests submissions using nested, complex JSON datasets with arrays, and introduces more complex transformations that require aggregation. - # Scenario: Validate complex nested XML data (spark) - # Given I submit the books file nested_books.xml for processing - # And A spark pipeline is configured with schema file 'nested_books.dischema.json' - # And I add initial audit entries for the submission - # Then the latest audit record for the submission is marked with processing status file_transformation - # When I run the file transformation phase - # Then the header entity is stored as a parquet after the file_transformation phase - # And the nested_books entity is stored as a parquet after the file_transformation phase - # And the latest audit record for the submission is marked with processing status data_contract - # When I run the data contract phase - # Then there is 1 record rejection from the data_contract phase - # And the header entity is stored as a parquet after the data_contract phase - # And the nested_books entity is stored as a parquet after the data_contract phase - # And the latest audit record for the submission is marked with processing status business_rules - # When I run the business rules phase - # Then The rules restrict "nested_books" to 3 qualifying records - # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - # And the nested_books entity is stored as a parquet after the business_rules phase - # And the latest audit record for the submission is marked with processing status error_report - # When I run the error report phase - # Then An error report is produced - # And The statistics entry for the submission shows the following information - # | parameter | value | - # | record_count | 4 | - # | number_record_rejections | 2 | - # | number_warnings | 0 | + Scenario: Validate complex nested XML data (spark) + Given I submit the books file nested_books.xml for processing + And A spark pipeline is configured with schema file 'nested_books.dischema.json' + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the header entity is stored as a parquet after the file_transformation phase + And the nested_books entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there is 1 record rejection from the data_contract phase + And the header entity is stored as a parquet after the data_contract phase + And the nested_books entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "nested_books" to 3 qualifying records + And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + And the nested_books entity is stored as a parquet after the business_rules phase + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 4 | + | number_record_rejections | 2 | + | number_warnings | 0 | - # Scenario: Validate complex nested XML data (duckdb) - # Given I submit the books file nested_books.xml for processing - # And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' - # And I add initial audit entries for the submission - # Then the latest audit record for the submission is marked with processing status file_transformation - # When I run the file transformation phase - # Then the header entity is stored as a parquet after the file_transformation phase - # And the nested_books entity is stored as a parquet after the file_transformation phase - # And the latest audit record for the submission is marked with processing status data_contract - # When I run the data contract phase - # Then there is 1 record rejection from the data_contract phase - # And the header entity is stored as a parquet after the data_contract phase - # And the nested_books entity is stored as a parquet after the data_contract phase - # And the latest audit record for the submission is marked with processing status business_rules - # When I run the business rules phase - # Then The rules restrict "nested_books" to 3 qualifying records - # And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" - # And the nested_books entity is stored as a parquet after the business_rules phase - # And the latest audit record for the submission is marked with processing status error_report - # When I run the error report phase - # Then An error report is produced - # And The statistics entry for the submission shows the following information - # | parameter | value | - # | record_count | 4 | - # | number_record_rejections | 2 | - # | number_warnings | 0 | + Scenario: Validate complex nested XML data (duckdb) + Given I submit the books file nested_books.xml for processing + And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the header entity is stored as a parquet after the file_transformation phase + And the nested_books entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there is 1 record rejection from the data_contract phase + And the header entity is stored as a parquet after the data_contract phase + And the nested_books entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "nested_books" to 3 qualifying records + And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books" + And the nested_books entity is stored as a parquet after the business_rules phase + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 4 | + | number_record_rejections | 2 | + | number_warnings | 0 | Scenario: Handle a file with a malformed tag (duckdb) Given I submit the books file malformed_books.xml for processing From 7772040b911d0846aa63ebf5890df0d121603c1e Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 16 Dec 2025 13:31:54 +0000 Subject: [PATCH 5/5] docs: added some rationale around foundry pipeline --- src/dve/pipeline/foundry_ddb_pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 04da114..d47b4bd 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -10,7 +10,9 @@ from dve.parser import file_handling as fh class FoundryDDBPipeline(DDBDVEPipeline): - """DuckDB pipeline for running on Foundry Platform""" + """DuckDB pipeline for running on Foundry Platform. + Polymorphed to allow for exception handling when processing + single files sequentially through services.""" def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets"""