Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, Failed
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
from dve.parser.file_handling.service import _get_implementation
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
from dve.pipeline.utils import SubmissionStatus
from dve.parser import file_handling as fh
Expand All @@ -18,13 +20,17 @@ class FoundryDDBPipeline(DDBDVEPipeline):
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
"""Write out key audit relations to parquet for persisting to datasets"""
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
if isinstance(_get_implementation(write_to), LocalFilesystemImplementation):
write_to = fh.file_uri_to_local_path(write_to)
write_to.parent.mkdir(parents=True, exist_ok=True)
write_to = write_to.as_posix()
self.write_parquet(
self._audit_tables._processing_status.get_relation(),
write_to + "processing_status.parquet",
fh.joinuri(write_to, "processing_status.parquet"),
)
self.write_parquet(
self._audit_tables._submission_statistics.get_relation(),
write_to + "submission_statistics.parquet",
fh.joinuri(write_to, "submission_statistics.parquet"),
)
return write_to

Expand Down
19 changes: 18 additions & 1 deletion src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Generic Pipeline object to define how DVE should be interacted with."""
from itertools import starmap
import json
from pathlib import Path
import re
from collections import defaultdict
from collections.abc import Generator, Iterable, Iterator
Expand All @@ -16,6 +17,8 @@

from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.message import FeedbackMessage
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
from dve.parser.file_handling.service import _get_implementation
import dve.reporting.excel_report as er
from dve.core_engine.backends.base.auditing import BaseAuditingManager
from dve.core_engine.backends.base.contract import BaseDataContract
Expand Down Expand Up @@ -635,7 +638,18 @@ def business_rule_step(
)

return successful_files, unsucessful_files, failed_processing


def _publish_error_aggregates(self, submission_id:str, aggregates_df: pl.DataFrame) -> URI:
"""Store error aggregates as parquet for auditing"""
output_uri = fh.joinuri(self.processed_files_path, submission_id, "audit", "error_aggregates.parquet")
if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation):
output_uri = fh.file_uri_to_local_path(output_uri)
output_uri.parent.mkdir(parents=True, exist_ok=True)
output_uri = output_uri.as_posix()
aggregates_df = aggregates_df.with_columns(pl.lit(submission_id).alias("submission_id"))
aggregates_df.write_parquet(output_uri)
return output_uri

@lru_cache() # noqa: B019
def _get_error_dataframes(self, submission_id: str):
if not self.processed_files_path:
Expand Down Expand Up @@ -738,6 +752,8 @@ def error_report(self,
)
with fh.open_stream(report_uri, "wb") as stream:
stream.write(er.ExcelFormat.convert_to_bytes(workbook))

self._publish_error_aggregates(submission_info.submission_id, aggregates)

return submission_info, submission_status, sub_stats, report_uri

Expand Down Expand Up @@ -842,3 +858,4 @@ def cluster_pipeline_run(
)

yield from report_results # type: ignore

2 changes: 2 additions & 0 deletions tests/features/movies.feature
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Feature: Pipeline tests using the movies dataset
| record_count | 5 |
| number_record_rejections | 4 |
| number_warnings | 1 |
And the error aggregates are persisted

Scenario: Validate and filter movies (duckdb)
Given I submit the movies file movies.json for processing
Expand Down Expand Up @@ -76,4 +77,5 @@ Feature: Pipeline tests using the movies dataset
| record_count | 5 |
| number_record_rejections | 4 |
| number_warnings | 1 |
And the error aggregates are persisted

6 changes: 6 additions & 0 deletions tests/features/steps/steps_post_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,9 @@ def check_stats_record(context):
get_pipeline(context)._audit_tables.get_submission_statistics(sub_info.submission_id).dict()
)
assert all([val == stats.get(fld) for fld, val in expected.items()])

@then("the error aggregates are persisted")
def check_error_aggregates_persisted(context):
processing_location = get_processing_location(context)
agg_file = Path(processing_location, "audit", "error_aggregates.parquet")
assert agg_file.exists() and agg_file.is_file()
4 changes: 2 additions & 2 deletions tests/test_pipeline/test_foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn):
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert fh.get_resource_exists(report_uri)
assert not output_loc
assert len(list(fh.iter_prefix(audit_files))) == 2
assert len(list(fh.iter_prefix(audit_files))) == 3


def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn):
Expand Down Expand Up @@ -86,7 +86,7 @@ def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn):
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)
assert fh.get_resource_exists(report_uri)
assert len(list(fh.iter_prefix(output_loc))) == 2
assert len(list(fh.iter_prefix(audit_files))) == 2
assert len(list(fh.iter_prefix(audit_files))) == 3

def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
# using spark reader config - should error in file transformation - check gracefully handled
Expand Down