diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 06f2727..ff0219c 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -5,6 +5,8 @@ from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI, Failed +from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation +from dve.parser.file_handling.service import _get_implementation from dve.pipeline.duckdb_pipeline import DDBDVEPipeline from dve.pipeline.utils import SubmissionStatus from dve.parser import file_handling as fh @@ -18,13 +20,17 @@ class FoundryDDBPipeline(DDBDVEPipeline): def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: """Write out key audit relations to parquet for persisting to datasets""" write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") + if isinstance(_get_implementation(write_to), LocalFilesystemImplementation): + write_to = fh.file_uri_to_local_path(write_to) + write_to.parent.mkdir(parents=True, exist_ok=True) + write_to = write_to.as_posix() self.write_parquet( self._audit_tables._processing_status.get_relation(), - write_to + "processing_status.parquet", + fh.joinuri(write_to, "processing_status.parquet"), ) self.write_parquet( self._audit_tables._submission_statistics.get_relation(), - write_to + "submission_statistics.parquet", + fh.joinuri(write_to, "submission_statistics.parquet"), ) return write_to diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 6270bdd..20d0738 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -2,6 +2,7 @@ """Generic Pipeline object to define how DVE should be interacted with.""" from itertools import starmap import json +from pathlib import Path import re from collections import defaultdict from collections.abc import Generator, Iterable, Iterator @@ -16,6 +17,8 @@ from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.message import FeedbackMessage +from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation +from dve.parser.file_handling.service import _get_implementation import dve.reporting.excel_report as er from dve.core_engine.backends.base.auditing import BaseAuditingManager from dve.core_engine.backends.base.contract import BaseDataContract @@ -635,7 +638,18 @@ def business_rule_step( ) return successful_files, unsucessful_files, failed_processing - + + def _publish_error_aggregates(self, submission_id:str, aggregates_df: pl.DataFrame) -> URI: + """Store error aggregates as parquet for auditing""" + output_uri = fh.joinuri(self.processed_files_path, submission_id, "audit", "error_aggregates.parquet") + if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation): + output_uri = fh.file_uri_to_local_path(output_uri) + output_uri.parent.mkdir(parents=True, exist_ok=True) + output_uri = output_uri.as_posix() + aggregates_df = aggregates_df.with_columns(pl.lit(submission_id).alias("submission_id")) + aggregates_df.write_parquet(output_uri) + return output_uri + @lru_cache() # noqa: B019 def _get_error_dataframes(self, submission_id: str): if not self.processed_files_path: @@ -738,6 +752,8 @@ def error_report(self, ) with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) + + self._publish_error_aggregates(submission_info.submission_id, aggregates) return submission_info, submission_status, sub_stats, report_uri @@ -842,3 +858,4 @@ def cluster_pipeline_run( ) yield from report_results # type: ignore + diff --git a/tests/features/movies.feature b/tests/features/movies.feature index e55de5c..b148547 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -41,6 +41,7 @@ Feature: Pipeline tests using the movies dataset | record_count | 5 | | number_record_rejections | 4 | | number_warnings | 1 | + And the error aggregates are persisted Scenario: Validate and filter movies (duckdb) Given I submit the movies file movies.json for processing @@ -76,4 +77,5 @@ Feature: Pipeline tests using the movies dataset | record_count | 5 | | number_record_rejections | 4 | | number_warnings | 1 | + And the error aggregates are persisted diff --git a/tests/features/steps/steps_post_pipeline.py b/tests/features/steps/steps_post_pipeline.py index 23ea97d..be679ba 100644 --- a/tests/features/steps/steps_post_pipeline.py +++ b/tests/features/steps/steps_post_pipeline.py @@ -111,3 +111,9 @@ def check_stats_record(context): get_pipeline(context)._audit_tables.get_submission_statistics(sub_info.submission_id).dict() ) assert all([val == stats.get(fld) for fld, val in expected.items()]) + +@then("the error aggregates are persisted") +def check_error_aggregates_persisted(context): + processing_location = get_processing_location(context) + agg_file = Path(processing_location, "audit", "error_aggregates.parquet") + assert agg_file.exists() and agg_file.is_file() diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index df58b59..431941e 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -49,7 +49,7 @@ def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert fh.get_resource_exists(report_uri) assert not output_loc - assert len(list(fh.iter_prefix(audit_files))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 3 def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): @@ -86,7 +86,7 @@ def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) assert fh.get_resource_exists(report_uri) assert len(list(fh.iter_prefix(output_loc))) == 2 - assert len(list(fh.iter_prefix(audit_files))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 3 def test_foundry_runner_error(planet_test_files, temp_ddb_conn): # using spark reader config - should error in file transformation - check gracefully handled