diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index dae8647..d2faaf5 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -14,7 +14,7 @@ def __init__( self, error_message: str, *args: object, messages: Messages, entities: SparkEntities ) -> None: super().__init__(error_message, *args) - self.error_messsage = error_message + self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages """The messages gathered at the time the error was emitted.""" @@ -26,6 +26,17 @@ def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" yield from filter(lambda message: message.is_critical, self.messages) + def to_feedback_message(self) -> FeedbackMessage: + "Convert to feedback message to write to json file" + return FeedbackMessage( + entity=None, + record=None, + failure_type="integrity", + error_type="processing", + error_location="Whole File", + error_message=self.error_message, + ) + class EntityTypeMismatch(TypeError): """An exception emitted if entity type outputs from two collaborative objects are different.""" diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 4e7707b..390e15b 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -24,23 +24,23 @@ class DDBDVEPipeline(BaseDVEPipeline): def __init__( self, audit_tables: DDBAuditingManager, - job_run_id: int, connection: DuckDBPyConnection, rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None, ): self._connection = connection super().__init__( audit_tables, - job_run_id, DuckDBDataContract(connection=self._connection), DuckDBStepImplementations.register_udfs(connection=self._connection), rules_path, processed_files_path, submitted_files_path, reference_data_loader, + job_run_id, ) # pylint: disable=arguments-differ diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py new file mode 100644 index 0000000..d47b4bd --- /dev/null +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -0,0 +1,94 @@ +"""A duckdb pipeline for running on Foundry platform""" + +from typing import Optional +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet +from dve.core_engine.backends.utilities import dump_errors +from dve.core_engine.models import SubmissionInfo +from dve.core_engine.type_hints import URI, Failed +from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus +from dve.parser import file_handling as fh + +class FoundryDDBPipeline(DDBDVEPipeline): + """DuckDB pipeline for running on Foundry Platform. + Polymorphed to allow for exception handling when processing + single files sequentially through services.""" + + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: + """Write out key audit relations to parquet for persisting to datasets""" + write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") + self.write_parquet( + self._audit_tables._processing_status.get_relation(), + write_to + "processing_status.parquet", + ) + self.write_parquet( + self._audit_tables._submission_statistics.get_relation(), + write_to + "submission_statistics.parquet", + ) + return write_to + + def file_transformation( + self, submission_info: SubmissionInfo + ) -> SubmissionInfo | dict[str, str]: + try: + return super().file_transformation(submission_info) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"File transformation raised exception: {exc}") + self._logger.exception(exc) + return submission_info.dict() + + def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]: + try: + return super().apply_data_contract(submission_info) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Apply data contract raised exception: {exc}") + self._logger.exception(exc) + return submission_info, True + + def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed): + try: + return super().apply_business_rules(submission_info, failed) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Apply business rules raised exception: {exc}") + self._logger.exception(exc) + return submission_info, SubmissionStatus(failed=True) + + def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]: + """Sequential single submission pipeline runner""" + try: + sub_id: str = submission_info.submission_id + self._audit_tables.add_new_submissions(submissions=[submission_info]) + self._audit_tables.mark_transform(submission_ids=[sub_id]) + sub_info = self.file_transformation(submission_info=submission_info) + if isinstance(sub_info, SubmissionInfo): + self._audit_tables.mark_data_contract(submission_ids=[sub_id]) + sub_info, failed = self.apply_data_contract(submission_info=submission_info) + self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)]) + sub_info, sub_status = self.apply_business_rules( + submission_info=submission_info, failed=failed + ) + else: + sub_status = SubmissionStatus(failed=True) + self._audit_tables.mark_error_report( + submissions=[(sub_id, sub_status.submission_result)] + ) + sub_info, sub_status, sub_stats, report_uri = self.error_report( + submission_info=submission_info, status=sub_status + ) + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + except Exception as err: # pylint: disable=W0718 + self._logger.error( + f"During processing of submission_id: {sub_id}, the following exception was raised: {err}" + ) + self._audit_tables.mark_failed(submissions=[sub_id]) + finally: + audit_files_uri = self.persist_audit_records(submission_info=submission_info) + return ( + ( + None + if sub_status.failed + else fh.joinuri(self.processed_files_path, sub_id, "business_rules") + ), + report_uri, + audit_files_uri, + ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 4a6a9f1..15008a9 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -13,13 +13,19 @@ import polars as pl from pydantic import validate_arguments +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.message import FeedbackMessage import dve.reporting.excel_report as er from dve.core_engine.backends.base.auditing import BaseAuditingManager from dve.core_engine.backends.base.contract import BaseDataContract from dve.core_engine.backends.base.core import EntityManager from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.base.rules import BaseStepImplementations -from dve.core_engine.backends.exceptions import MessageBearingError +from dve.core_engine.backends.exceptions import ( + BackendError, + MessageBearingError, + ReaderLacksEntityTypeSupport, +) from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType from dve.core_engine.backends.utilities import dump_errors, stringify_model @@ -44,13 +50,13 @@ class BaseDVEPipeline: def __init__( self, audit_tables: BaseAuditingManager, - job_run_id: int, data_contract: BaseDataContract, step_implementations: Optional[BaseStepImplementations[EntityType]], rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -265,30 +271,41 @@ def file_transformation( if not self.processed_files_path: raise AttributeError("processed files path not provided") + errors: list[FeedbackMessage] = [] submission_file_uri: URI = fh.joinuri( self.processed_files_path, submission_info.submission_id, submission_info.file_name_with_ext, ) try: - errors = self.write_file_to_parquet( + errors.extend(self.write_file_to_parquet( submission_file_uri, submission_info, self.processed_files_path - ) - if errors: - dump_errors( - fh.joinuri(self.processed_files_path, submission_info.submission_id), - "file_transformation", - errors, - ) - return submission_info.dict() - return submission_info - except ValueError as exc: - self._logger.error(f"File transformation write_file_to_parquet raised error: {exc}") - return submission_info.dict() - except Exception as exc: # pylint: disable=broad-except + )) + + except MessageBearingError as exc: + self._logger.error(f"Unexpected file transformation error: {exc}") + self._logger.exception(exc) + errors.extend(exc.messages) + + except BackendError as exc: # pylint: disable=broad-except self._logger.error(f"Unexpected file transformation error: {exc}") self._logger.exception(exc) + errors.extend([ + CriticalProcessingError( + entities=None, + error_message=repr(exc), + messages=[], + ).to_feedback_message() + ]) + + if errors: + dump_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + errors, + ) return submission_info.dict() + return submission_info def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] @@ -321,6 +338,7 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue @@ -423,6 +441,7 @@ def data_contract_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Data Contract raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue @@ -562,6 +581,7 @@ def business_rule_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Business Rules raised exception: {exc}") self._logger.exception(exc) + # TODO: write errors to file here (maybe processing errors - not to be seen by end user) failed_processing.append(sub_info) continue diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index d31a2ee..ae77cfd 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -26,23 +26,23 @@ class SparkDVEPipeline(BaseDVEPipeline): def __init__( self, audit_tables: SparkAuditingManager, - job_run_id: int, rules_path: Optional[URI], processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, + job_run_id: Optional[int] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( audit_tables, - job_run_id, SparkDataContract(spark_session=self._spark), SparkStepImplementations.register_udfs(self._spark), rules_path, processed_files_path, submitted_files_path, reference_data_loader, + job_run_id, ) # pylint: disable=arguments-differ diff --git a/tests/fixtures.py b/tests/fixtures.py index 9229fc8..8a9a147 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -120,4 +120,5 @@ def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]: with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp: db_file = Path(tmp, db + ".duckdb") conn = connect(database=db_file, read_only=False) + yield db_file, conn diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py index dafac4d..038b4e9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py @@ -457,7 +457,7 @@ def test_one_to_one_join_multi_matches_raises( new_columns={"satellites.name": "satellite"}, ) entities = EntityManager({"planets": planets_rel, "satellites": satellites_rel}) - with pytest.raises(ValueError, match="Multiple matches for some records.+"): + with pytest.raises(ValueError, match="Multiple matches for some records.*"): DUCKDB_STEP_BACKEND.one_to_one_join(entities, config=join) diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py index de2ee10..1518ccf 100644 --- a/tests/test_pipeline/pipeline_helpers.py +++ b/tests/test_pipeline/pipeline_helpers.py @@ -66,6 +66,13 @@ def planet_test_files() -> Iterator[str]: shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets")) yield tdir + "/planets" +@pytest.fixture(scope="function") +def movies_test_files() -> Iterator[str]: + clear_config_cache() + with tempfile.TemporaryDirectory() as tdir: + shutil.copytree(get_test_file_path("movies/"), Path(tdir, "movies")) + yield tdir + "/movies" + @pytest.fixture(scope="function") def planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionInfo, str]]: diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 5619735..8512494 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -4,6 +4,7 @@ from concurrent.futures import ThreadPoolExecutor from pathlib import Path +import shutil from typing import Dict, Tuple from uuid import uuid4 diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py new file mode 100644 index 0000000..dcdb59b --- /dev/null +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -0,0 +1,118 @@ +"""Test DuckDBPipeline object methods""" +# pylint: disable=missing-function-docstring +# pylint: disable=protected-access + +from datetime import datetime +from pathlib import Path +import shutil +from uuid import uuid4 + +import pytest + +from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager +from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader +from dve.core_engine.models import SubmissionInfo +import dve.parser.file_handling as fh +from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline + +from ..conftest import get_test_file_path +from ..fixtures import temp_ddb_conn # pylint: disable=unused-import +from .pipeline_helpers import ( # pylint: disable=unused-import + PLANETS_RULES_PATH, + planet_test_files, + movies_test_files +) + +def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 + + +def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + # add movies refdata to conn + ref_db_file = Path(db_file.parent, f"movies_refdata.duckdb").as_posix() + conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata") + conn.read_parquet(get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()).to_table(f"movies_refdata.sequels") + processing_folder = movies_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo(submission_id=sub_id, + dataset_id="movies", + file_name="good_movies", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5)) + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(movies_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = None + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert len(list(fh.iter_prefix(output_loc))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 2 + +def test_foundry_runner_error(planet_test_files, temp_ddb_conn): + # using spark reader config - should error in file transformation - check gracefully handled + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets.dischema.json").as_posix(), + processed_files_path=processing_folder, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/testdata/movies/good_movies.json b/tests/testdata/movies/good_movies.json new file mode 100644 index 0000000..acc7026 --- /dev/null +++ b/tests/testdata/movies/good_movies.json @@ -0,0 +1,46 @@ +[ + { + "title": "Not a great one", + "year": 2017, + "genre": ["Animation", "Comedy", "Family"], + "duration_minutes": 88, + "ratings": [6.9, 7.8], + "cast": [ + { "name": "J. Smith", "role": "Lion", "date_joined": "2015-11-12" }, + { "name": "T. Car", "role": "Mouse", "date_joined": "2015-11-12" } + ] + }, + { + "title": "Good family movie", + "year": 2022, + "genre": ["Sci-Fi", "Family"], + "duration_minutes": 95, + "ratings": [7, 8.2, 6.3], + "cast": [ + { "name": "D. Farnesbarnes", "role": "Robot" }, + { "name": "G. Adams", "role": "Alien", "date_joined": "2017-08-01" } + ] + }, + { + "title": "One with a cat and a dog", + "year": 2020, + "genre": ["Fantasy", "Family"], + "duration_minutes": 110, + "ratings": [6.1, 6.2], + "cast": [ + { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, + { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } + ] + }, + { + "title": "A bad 'un", + "year": 2011, + "genre": ["Mystery", "Family"], + "duration_minutes": 97, + "ratings": [1.2, 3.4, 5.6, 3.4], + "cast": [ + { "name": "R. Green", "role": "Baby", "date_joined": "2013-11-12" }, + { "name": "P. Plum", "role": "Dad", "date_joined": "2013-10-08" } + ] + } +] \ No newline at end of file