Skip to content
Merged
29 changes: 27 additions & 2 deletions src/dve/core_engine/backends/base/auditing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
QueueType,
SubmissionResult,
)
from dve.pipeline.utils import SubmissionStatus

AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name

Expand Down Expand Up @@ -329,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs):
ProcessingStatusRecord(
submission_id=submission_id,
processing_status="business_rules",
submission_result="failed" if failed else None,
submission_result="validation_failed" if failed else None,
**kwargs,
)
for submission_id, failed in submissions
Expand Down Expand Up @@ -379,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs):
"""Update submission processing_status to failed."""
recs = [
ProcessingStatusRecord(
submission_id=submission_id, processing_status="failed", **kwargs
submission_id=submission_id,
processing_status="failed",
submission_result="processing_failed",
**kwargs
)
for submission_id in submissions
]
Expand Down Expand Up @@ -493,6 +497,27 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
)
except StopIteration:
return None
def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]:
"""Get the latest submission status for a submission"""

try:
processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(
self._processing_status.get_most_recent_records(
order_criteria=[OrderCriteria("time_updated", True)],
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)]
)))
except StopIteration:
return None
sub_status = SubmissionStatus()
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id)
if processing_rec.submission_result == "processing_failed":
sub_status.processing_failed = True
if processing_rec.submission_result == "validation_failed":
sub_status.validation_failed = True
if sub_stats_rec:
sub_status.number_of_records = sub_stats_rec.record_count

return sub_status

def __enter__(self):
"""Use audit table as context manager"""
Expand Down
36 changes: 0 additions & 36 deletions src/dve/core_engine/backends/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import dve.parser.file_handling as fh
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
from dve.core_engine.type_hints import URI, Messages
from dve.reporting.error_report import conditional_cast

# We need to rely on a Python typing implementation detail in Python <= 3.7.
if sys.version_info[:2] <= (3, 7):
Expand Down Expand Up @@ -179,38 +178,3 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
if polars_type:
return polars_type
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")


def dump_errors(
working_folder: URI,
step_name: str,
messages: Messages,
key_fields: Optional[dict[str, list[str]]] = None,
):
"""Write out to disk captured feedback error messages."""
if not working_folder:
raise AttributeError("processed files path not passed")

if not key_fields:
key_fields = {}

errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json")
processed = []

for message in messages:
primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", [])
error = message.to_dict(
key_field=primary_keys,
value_separator=" -- ",
max_number_of_values=10,
record_converter=None,
)
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
processed.append(error)

with fh.open_stream(errors, "a") as f:
json.dump(
processed,
f,
default=str,
)
18 changes: 6 additions & 12 deletions src/dve/core_engine/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,12 @@ def __init__(
def critical_messages(self) -> Iterator[FeedbackMessage]:
"""Critical messages which caused the processing error."""
yield from filter(lambda message: message.is_critical, self.messages)

def to_feedback_message(self) -> FeedbackMessage:
"Convert to feedback message to write to json file"
return FeedbackMessage(
entity=None,
record=None,
failure_type="integrity",
error_type="processing",
error_location="Whole File",
error_message=self.error_message,
)


@classmethod
def from_exception(cls, exc:Exception):
return cls(error_message = repr(exc),
entities=None,
messages=[])

class EntityTypeMismatch(TypeError):
"""An exception emitted if entity type outputs from two collaborative objects are different."""
2 changes: 1 addition & 1 deletion src/dve/core_engine/type_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus)))
"""List of all possible DVE submission statuses"""

SubmissionResult = Literal["success", "failed", "failed_xml_generation", "archived"]
SubmissionResult = Literal["success", "validation_failed", "archived", "processing_failed"]
"""Allowed DVE submission results"""

SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult)))
Expand Down
113 changes: 77 additions & 36 deletions src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

from typing import Optional
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
from dve.core_engine.backends.utilities import dump_errors
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.models import SubmissionInfo
from dve.core_engine.type_hints import URI, Failed
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
from dve.pipeline.utils import SubmissionStatus
from dve.parser import file_handling as fh
from dve.reporting.utils import dump_processing_errors

@duckdb_get_entity_count
@duckdb_write_parquet
class FoundryDDBPipeline(DDBDVEPipeline):
"""DuckDB pipeline for running on Foundry Platform.
Polymorphed to allow for exception handling when processing
single files sequentially through services."""
"""DuckDB pipeline for running on Foundry Platform"""

def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
"""Write out key audit relations to parquet for persisting to datasets"""
Expand All @@ -29,66 +30,106 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:

def file_transformation(
self, submission_info: SubmissionInfo
) -> SubmissionInfo | dict[str, str]:
) -> tuple[SubmissionInfo, SubmissionStatus]:
try:
return super().file_transformation(submission_info)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"File transformation raised exception: {exc}")
self._logger.exception(exc)
return submission_info.dict()
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"file_transformation",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, SubmissionStatus(processing_failed=True)

def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo | SubmissionStatus]:
try:
return super().apply_data_contract(submission_info)
return super().apply_data_contract(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply data contract raised exception: {exc}")
self._logger.exception(exc)
return submission_info, True
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"contract",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, SubmissionStatus(processing_failed=True)

def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
try:
return super().apply_business_rules(submission_info, failed)
return super().apply_business_rules(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Apply business rules raised exception: {exc}")
self._logger.exception(exc)
return submission_info, SubmissionStatus(failed=True)
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"business_rules",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, SubmissionStatus(processing_failed=True)

def error_report(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
try:
return super().error_report(submission_info, submission_status)
except Exception as exc: # pylint: disable=W0718
self._logger.error(f"Error reports raised exception: {exc}")
self._logger.exception(exc)
sub_stats = None
report_uri = None
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"error_report",
[CriticalProcessingError.from_exception(exc)]
)
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
return submission_info, submission_status, sub_stats, report_uri

def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]:
"""Sequential single submission pipeline runner"""
try:
sub_id: str = submission_info.submission_id
report_uri = None
self._audit_tables.add_new_submissions(submissions=[submission_info])
self._audit_tables.mark_transform(submission_ids=[sub_id])
sub_info = self.file_transformation(submission_info=submission_info)
if isinstance(sub_info, SubmissionInfo):
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
if not (sub_status.validation_failed or sub_status.processing_failed):
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status)
self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)])
sub_info, sub_status = self.apply_business_rules(
submission_info=submission_info, failed=failed
submission_info=submission_info, submission_status=sub_status
)
else:
sub_status = SubmissionStatus(failed=True)
self._audit_tables.mark_error_report(
submissions=[(sub_id, sub_status.submission_result)]
)
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])

if not sub_status.processing_failed:
self._audit_tables.mark_error_report(
submissions=[(sub_id, sub_status.submission_result)]
)
sub_info, sub_status, sub_stats, report_uri = self.error_report(
submission_info=submission_info, submission_status=sub_status
)
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
except Exception as err: # pylint: disable=W0718
self._logger.error(
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
)
dump_processing_errors(
fh.joinuri(self.processed_files_path, submission_info.submission_id),
"run_pipeline",
[CriticalProcessingError.from_exception(err)]
)
self._audit_tables.mark_failed(submissions=[sub_id])
finally:
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
return (
(
None
if sub_status.failed
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
),
report_uri,
audit_files_uri,
)
return (
(
None
if (sub_status.validation_failed or sub_status.processing_failed)
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
),
report_uri if report_uri else None,
audit_files_uri,
)
Loading