diff --git a/.github/workflows/ci_testing.yml b/.github/workflows/ci_testing.yml index f54eb67..5f9bc95 100644 --- a/.github/workflows/ci_testing.yml +++ b/.github/workflows/ci_testing.yml @@ -17,7 +17,7 @@ jobs: - name: Install extra dependencies for a python install run: | sudo apt-get update - sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev + sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev libxml2-utils - name: Install asdf cli uses: asdf-vm/actions/setup@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 28e687b..1098297 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +## v0.4.0 (2025-12-17) + +### Feat + +- add persistance of error aggregates to pipeline +- add Foundry pipeline + +### Fix + +- issue where templated error messages would not correctly format when passing in parameter values + +### Refactor + +- include submission status for services passthrough + ## v0.3.0 (2025-11-19) ### Feat diff --git a/docs/README.md b/docs/README.md index a7feef1..7ab5d92 100644 --- a/docs/README.md +++ b/docs/README.md @@ -234,10 +234,10 @@ audit_manager = SparkAuditingManager( # Setting up the Pipeline (in this case the Spark implemented one) pipeline = SparkDVEPipeline( + processed_files_path="path/where/my/processed_files/should_go/", audit_tables=audit_manager, job_run_id=1, rules_path="path/to/my_dischema", - processed_files_path="path/where/my/processed_files/should_go/", submitted_files_path="path/to/my/cwt_files/", reference_data_loader=SparkParquetRefDataLoader, spark=spark diff --git a/pyproject.toml b/pyproject.toml index 01da3c4..228b0d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nhs_dve" -version = "0.3.0" +version = "0.4.0" description = "`nhs data validation engine` is a framework used to validate data" authors = ["NHS England "] readme = "README.md" @@ -39,7 +39,7 @@ requests = "2.32.4" # Mitigates security vuln in < 2.31.0 schedula = "1.2.19" sqlalchemy = "2.0.19" typing_extensions = "4.6.2" -urllib3 = "2.5.0" # Mitigates security vuln in < 1.26.19 +urllib3 = "2.6.0" # Mitigates security vuln in < 2.5.0 xmltodict = "0.13.0" [tool.poetry.group.dev] diff --git a/src/dve/core_engine/backends/base/auditing.py b/src/dve/core_engine/backends/base/auditing.py index 37b1774..d120fcb 100644 --- a/src/dve/core_engine/backends/base/auditing.py +++ b/src/dve/core_engine/backends/base/auditing.py @@ -31,6 +31,7 @@ QueueType, SubmissionResult, ) +from dve.pipeline.utils import SubmissionStatus AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name @@ -329,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs): ProcessingStatusRecord( submission_id=submission_id, processing_status="business_rules", - submission_result="failed" if failed else None, + submission_result="validation_failed" if failed else None, **kwargs, ) for submission_id, failed in submissions @@ -379,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs): """Update submission processing_status to failed.""" recs = [ ProcessingStatusRecord( - submission_id=submission_id, processing_status="failed", **kwargs + submission_id=submission_id, + processing_status="failed", + submission_result="processing_failed", + **kwargs, ) for submission_id in submissions ] @@ -494,6 +498,33 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt except StopIteration: return None + def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]: + """Get the latest submission status for a submission""" + + try: + processing_rec: ProcessingStatusRecord = next( # type: ignore + self._processing_status.conv_to_records( + self._processing_status.get_most_recent_records( + order_criteria=[OrderCriteria("time_updated", True)], + pre_filter_criteria=[FilterCriteria("submission_id", submission_id)], + ) + ) + ) + except StopIteration: + return None + sub_status = SubmissionStatus() + sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics( + submission_id + ) + if processing_rec.submission_result == "processing_failed": + sub_status.processing_failed = True + if processing_rec.submission_result == "validation_failed": + sub_status.validation_failed = True + if sub_stats_rec: + sub_status.number_of_records = sub_stats_rec.record_count + + return sub_status + def __enter__(self): """Use audit table as context manager""" if self.pool and self.pool_result.done(): diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index df43348..3998bf5 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader): # TODO - stringify or not def __init__( self, + *, header: bool = True, delim: str = ",", quotechar: str = '"', connection: Optional[DuckDBPyConnection] = None, + **_, ): self.header = header self.delim = delim diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py index a706f11..b1a3ad4 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py @@ -20,7 +20,12 @@ class DuckDBJSONReader(BaseFileReader): """A reader for JSON files""" - def __init__(self, json_format: Optional[str] = "array"): + def __init__( + self, + *, + json_format: Optional[str] = "array", + **_, + ): self._json_format = json_format super().__init__() diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py index 32b5d1d..a955946 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py @@ -8,6 +8,7 @@ from pydantic import BaseModel from dve.core_engine.backends.base.reader import read_function +from dve.core_engine.backends.exceptions import MessageBearingError from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet from dve.core_engine.backends.readers.xml import XMLStreamReader from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model @@ -18,13 +19,21 @@ class DuckDBXMLStreamReader(XMLStreamReader): """A reader for XML files""" - def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs): + def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs): self.ddb_connection = ddb_connection if ddb_connection else default_connection super().__init__(**kwargs) @read_function(DuckDBPyRelation) def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseModel]): """Returns a relation object from the source xml""" + if self.xsd_location: + msg = self._run_xmllint(file_uri=resource) + if msg: + raise MessageBearingError( + "Submitted file failed XSD validation.", + messages=[msg], + ) + polars_schema: dict[str, pl.DataType] = { # type: ignore fld.name: get_polars_type_from_annotation(fld.annotation) for fld in stringify_model(schema).__fields__.values() diff --git a/src/dve/core_engine/backends/implementations/spark/readers/csv.py b/src/dve/core_engine/backends/implementations/spark/readers/csv.py index a95cad2..95db464 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/csv.py @@ -31,6 +31,7 @@ def __init__( multi_line: bool = False, encoding: str = "utf-8-sig", spark_session: Optional[SparkSession] = None, + **_, ) -> None: self.delimiter = delimiter diff --git a/src/dve/core_engine/backends/implementations/spark/readers/json.py b/src/dve/core_engine/backends/implementations/spark/readers/json.py index 56f394e..c336ee0 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/json.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/json.py @@ -27,6 +27,7 @@ def __init__( encoding: Optional[str] = "utf-8", multi_line: Optional[bool] = True, spark_session: Optional[SparkSession] = None, + **_, ) -> None: self.encoding = encoding diff --git a/src/dve/core_engine/backends/implementations/spark/readers/xml.py b/src/dve/core_engine/backends/implementations/spark/readers/xml.py index 8d29f24..30d6756 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/xml.py @@ -5,21 +5,21 @@ from typing import Any, Optional from pydantic import BaseModel +from pyspark.errors.exceptions.base import AnalysisException from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as sf from pyspark.sql.column import Column from pyspark.sql.types import StringType, StructField, StructType -from pyspark.sql.utils import AnalysisException from typing_extensions import Literal -from dve.core_engine.backends.base.reader import BaseFileReader, read_function -from dve.core_engine.backends.exceptions import EmptyFileError +from dve.core_engine.backends.base.reader import read_function +from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError from dve.core_engine.backends.implementations.spark.spark_helpers import ( df_is_empty, get_type_from_annotation, spark_write_parquet, ) -from dve.core_engine.backends.readers.xml import XMLStreamReader +from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader from dve.core_engine.type_hints import URI, EntityName from dve.parser.file_handling import get_content_length from dve.parser.file_handling.service import open_stream @@ -43,7 +43,7 @@ def read_to_dataframe( ) -> DataFrame: """Stream an XML file into a Spark data frame""" if not self.spark: - self.spark = SparkSession.builder.getOrCreate() + self.spark = SparkSession.builder.getOrCreate() # type: ignore spark_schema = get_type_from_annotation(schema) return self.spark.createDataFrame( # type: ignore list(self.read_to_py_iterator(resource, entity_name, schema)), @@ -52,7 +52,7 @@ def read_to_dataframe( @spark_write_parquet -class SparkXMLReader(BaseFileReader): # pylint: disable=too-many-instance-attributes +class SparkXMLReader(BasicXMLFileReader): # pylint: disable=too-many-instance-attributes """A reader for XML files built atop Spark-XML.""" def __init__( @@ -70,21 +70,33 @@ def __init__( sanitise_multiline: bool = True, namespace=None, trim_cells=True, + xsd_location: Optional[URI] = None, + xsd_error_code: Optional[str] = None, + xsd_error_message: Optional[str] = None, + rules_location: Optional[URI] = None, **_, ) -> None: - self.record_tag = record_tag - self.spark_session = spark_session or SparkSession.builder.getOrCreate() + + super().__init__( + record_tag=record_tag, + root_tag=root_tag, + trim_cells=trim_cells, + null_values=null_values, + sanitise_multiline=sanitise_multiline, + xsd_location=xsd_location, + xsd_error_code=xsd_error_code, + xsd_error_message=xsd_error_message, + rules_location=rules_location, + ) + + self.spark_session = spark_session or SparkSession.builder.getOrCreate() # type: ignore self.sampling_ratio = sampling_ratio self.exclude_attribute = exclude_attribute self.mode = mode self.infer_schema = infer_schema self.ignore_namespace = ignore_namespace - self.root_tag = root_tag self.sanitise_multiline = sanitise_multiline - self.null_values = null_values self.namespace = namespace - self.trim_cells = trim_cells - super().__init__() def read_to_py_iterator( self, resource: URI, entity_name: EntityName, schema: type[BaseModel] @@ -106,6 +118,14 @@ def read_to_dataframe( if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") + if self.xsd_location: + msg = self._run_xmllint(file_uri=resource) + if msg: + raise MessageBearingError( + "Submitted file failed XSD validation.", + messages=[msg], + ) + spark_schema: StructType = get_type_from_annotation(schema) kwargs = { "rowTag": self.record_tag, @@ -143,7 +163,7 @@ def read_to_dataframe( kwargs["rowTag"] = f"{namespace}:{self.record_tag}" df = ( self.spark_session.read.format("xml") - .options(**kwargs) + .options(**kwargs) # type: ignore .load(resource, schema=read_schema) ) if self.root_tag and df.columns: diff --git a/src/dve/core_engine/backends/metadata/reporting.py b/src/dve/core_engine/backends/metadata/reporting.py index 0f2079a..cc0aed4 100644 --- a/src/dve/core_engine/backends/metadata/reporting.py +++ b/src/dve/core_engine/backends/metadata/reporting.py @@ -28,7 +28,7 @@ class BaseReportingConfig(BaseModel): """ - UNTEMPLATED_FIELDS: ClassVar[set[str]] = {"message"} + UNTEMPLATED_FIELDS: ClassVar[set[str]] = set() """Fields that should not be templated.""" emit: Optional[str] = None diff --git a/src/dve/core_engine/backends/readers/csv.py b/src/dve/core_engine/backends/readers/csv.py index c0b6479..bc05b58 100644 --- a/src/dve/core_engine/backends/readers/csv.py +++ b/src/dve/core_engine/backends/readers/csv.py @@ -36,6 +36,7 @@ def __init__( trim_cells: bool = True, null_values: Collection[str] = frozenset({"NULL", "null", ""}), encoding: str = "utf-8-sig", + **_, ): """Init function for the base CSV reader. diff --git a/src/dve/core_engine/backends/readers/xml.py b/src/dve/core_engine/backends/readers/xml.py index 5de23c4..e7480f1 100644 --- a/src/dve/core_engine/backends/readers/xml.py +++ b/src/dve/core_engine/backends/readers/xml.py @@ -12,8 +12,10 @@ from dve.core_engine.backends.base.reader import BaseFileReader from dve.core_engine.backends.exceptions import EmptyFileError +from dve.core_engine.backends.readers.xml_linting import run_xmllint from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.loggers import get_logger +from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, EntityName from dve.parser.file_handling import NonClosingTextIOWrapper, get_content_length, open_stream from dve.parser.file_handling.implementations.file import ( @@ -101,7 +103,7 @@ def clear(self) -> None: def __iter__(self) -> Iterator["XMLElement"]: ... -class BasicXMLFileReader(BaseFileReader): +class BasicXMLFileReader(BaseFileReader): # pylint: disable=R0902 """A reader for XML files built atop LXML.""" def __init__( @@ -114,6 +116,10 @@ def __init__( sanitise_multiline: bool = True, encoding: str = "utf-8-sig", n_records_to_read: Optional[int] = None, + xsd_location: Optional[URI] = None, + xsd_error_code: Optional[str] = None, + xsd_error_message: Optional[str] = None, + rules_location: Optional[URI] = None, **_, ): """Init function for the base XML reader. @@ -148,6 +154,15 @@ def __init__( """Encoding of the XML file.""" self.n_records_to_read = n_records_to_read """The maximum number of records to read from a document.""" + if rules_location is not None and xsd_location is not None: + self.xsd_location = rules_location + xsd_location + else: + self.xsd_location = xsd_location # type: ignore + """The URI of the xsd file if wishing to perform xsd validation.""" + self.xsd_error_code = xsd_error_code + """The error code to be reported if xsd validation fails (if xsd)""" + self.xsd_error_message = xsd_error_message + """The error message to be reported if xsd validation fails""" super().__init__() self._logger = get_logger(__name__) @@ -259,6 +274,22 @@ def _parse_xml( for element in elements: yield self._parse_element(element, template_row) + def _run_xmllint(self, file_uri: URI) -> FeedbackMessage | None: + """Run xmllint package to validate against a given xsd. Requires xmlint to be installed + onto the system to run succesfully.""" + if self.xsd_location is None: + raise AttributeError("Trying to run XML lint with no `xsd_location` provided.") + if self.xsd_error_code is None: + raise AttributeError("Trying to run XML with no `xsd_error_code` provided.") + if self.xsd_error_message is None: + raise AttributeError("Trying to run XML with no `xsd_error_message` provided.") + return run_xmllint( + file_uri=file_uri, + schema_uri=self.xsd_location, + error_code=self.xsd_error_code, + error_message=self.xsd_error_message, + ) + def read_to_py_iterator( self, resource: URI, diff --git a/src/dve/core_engine/backends/readers/xml_linting.py b/src/dve/core_engine/backends/readers/xml_linting.py new file mode 100644 index 0000000..529d8ee --- /dev/null +++ b/src/dve/core_engine/backends/readers/xml_linting.py @@ -0,0 +1,142 @@ +"""Implement XML linting for files. Please note that xml linting requires xmllint to be installed +onto your system.""" + +import shutil +import tempfile +from collections.abc import Sequence +from contextlib import ExitStack +from pathlib import Path +from subprocess import PIPE, STDOUT, Popen +from typing import Union +from uuid import uuid4 + +from dve.core_engine.message import FeedbackMessage +from dve.parser.file_handling import copy_resource, get_file_name, get_resource_exists, open_stream +from dve.parser.file_handling.implementations.file import file_uri_to_local_path +from dve.parser.type_hints import URI + +ErrorMessage = str +"""Error message for xml issues""" +ErrorCode = str +"""Error code for xml feedback errors""" + +FIVE_MEBIBYTES = 5 * (1024**2) +"""The size of 5 binary megabytes, in bytes.""" + + +def _ensure_schema_and_resources( + schema_uri: URI, schema_resources: Sequence[URI], temp_dir: Path +) -> Path: + """Given the schema and schema resource URIs and a temp dir, if the resources + are remote or exist in different directories, copy them to the temp dir. + + Return the local schema path. + + """ + if not get_resource_exists(schema_uri): + raise IOError(f"No resource accessible at schema URI {schema_uri!r}") + + missing_resources = list( + filter(lambda resource: not get_resource_exists(resource), schema_resources) + ) + if missing_resources: + raise IOError(f"Some schema resources missing: {missing_resources!r}") + + all_resources = [schema_uri, *schema_resources] + + schemas_are_files = all(map(lambda resource: resource.startswith("file:"), all_resources)) + if schemas_are_files: + paths = list(map(file_uri_to_local_path, all_resources)) + all_paths_have_same_parent = len({path.parent for path in paths}) == 1 + + if all_paths_have_same_parent: + schema_path = paths[0] + return schema_path + + for resource_uri in all_resources: + local_path = temp_dir.joinpath(get_file_name(resource_uri)) + copy_resource(resource_uri, local_path.as_uri()) + + schema_path = temp_dir.joinpath(get_file_name(schema_uri)) + return schema_path + + +def run_xmllint( + file_uri: URI, + schema_uri: URI, + *schema_resources: URI, + error_code: ErrorCode, + error_message: ErrorMessage, +) -> Union[None, FeedbackMessage]: + """Run `xmllint`, given a file and information about the schemas to apply. + + The schema and associated resources will be copied to a temporary directory + for validation, unless they are all already in the same local folder. + + Args: + - `file_uri`: the URI of the file to be streamed into `xmllint` + - `schema_uri`: the URI of the XSD schema for the file. + - `*schema_resources`: URIs for additional XSD files required by the schema. + - `error_code`: The error_code to use in FeedbackMessage if the linting fails. + - `error_message`: The error_message to use in FeedbackMessage if the linting fails. + + Returns a deque of messages produced by the linting. + + """ + if not shutil.which("xmllint"): + raise OSError("Unable to find `xmllint` binary. Please install to use this functionality.") + + if not get_resource_exists(file_uri): + raise IOError(f"No resource accessible at file URI {file_uri!r}") + + # Ensure the schema and resources are local file paths so they can be + # read by xmllint. + # Lots of resources to manage here. + with tempfile.TemporaryDirectory() as temp_dir_str: + temp_dir = Path(temp_dir_str) + schema_path = _ensure_schema_and_resources(schema_uri, schema_resources, temp_dir) + message_file_path = temp_dir.joinpath(uuid4().hex) + + with ExitStack() as linting_context: + # Need to write lint output to a file to avoid deadlock. Kinder to mem this way anyway. + message_file_bytes = linting_context.enter_context(message_file_path.open("wb")) + + # Open an `xmllint` process to pipe into. + command = ["xmllint", "--stream", "--schema", str(schema_path), "-"] + process = linting_context.enter_context( + Popen(command, stdin=PIPE, stdout=message_file_bytes, stderr=STDOUT) + ) + # This should never trigger, bad typing in stdlib. + if process.stdin is None: + raise ValueError("Unable to pipe file into subprocess") + + # Pipe the XML file contents into xmllint. + block = b"" + try: + with open_stream(file_uri, "rb") as byte_stream: + while True: + block = byte_stream.read(FIVE_MEBIBYTES) + if not block: + break + process.stdin.write(block) + except BrokenPipeError: + pass + finally: + # Close the input stream and await the response code. + # Output will be written to the message file. + process.stdin.close() + return_code = process.wait(10) + + if return_code == 0: + return None + + return FeedbackMessage( + entity="xsd_validation", + record={}, + failure_type="submission", + is_informational=False, + error_type="xsd check", + error_location="Whole File", + error_message=error_message, + error_code=error_code, + ) diff --git a/src/dve/core_engine/configuration/v1/__init__.py b/src/dve/core_engine/configuration/v1/__init__.py index 057abe3..89174be 100644 --- a/src/dve/core_engine/configuration/v1/__init__.py +++ b/src/dve/core_engine/configuration/v1/__init__.py @@ -24,7 +24,7 @@ from dve.core_engine.message import DataContractErrorDetail from dve.core_engine.type_hints import EntityName, ErrorCategory, ErrorType, TemplateVariables from dve.core_engine.validation import RowValidator -from dve.parser.file_handling import joinuri, open_stream +from dve.parser.file_handling import joinuri, open_stream, resolve_location from dve.parser.type_hints import URI, Extension TypeName = str @@ -202,6 +202,13 @@ def __init__(self, *args, **kwargs): uri = joinuri(uri_prefix, rule_store_config.filename) self._load_rule_store(uri) + for _, model_config in self.contract.datasets.items(): + for reader_config in model_config.reader_config.values(): + reader_config.kwargs_ = { + **reader_config.kwargs_, + "rules_location": f'{"/".join(self.location.split("/")[:-1])}/', + } + def _resolve_business_filter( self, config: BusinessFilterConfig ) -> tuple[ConcreteFilterConfig, TemplateVariables]: diff --git a/src/dve/core_engine/exceptions.py b/src/dve/core_engine/exceptions.py index dae8647..cba7508 100644 --- a/src/dve/core_engine/exceptions.py +++ b/src/dve/core_engine/exceptions.py @@ -1,6 +1,7 @@ """Exceptions emitted by the pipeline.""" from collections.abc import Iterator +from typing import Optional from dve.core_engine.backends.implementations.spark.types import SparkEntities from dve.core_engine.message import FeedbackMessage @@ -11,10 +12,14 @@ class CriticalProcessingError(ValueError): """An exception emitted if critical errors are received.""" def __init__( - self, error_message: str, *args: object, messages: Messages, entities: SparkEntities + self, + error_message: str, + *args: object, + messages: Optional[Messages], + entities: Optional[SparkEntities] = None ) -> None: super().__init__(error_message, *args) - self.error_messsage = error_message + self.error_message = error_message """The error message explaining the critical processing error.""" self.messages = messages """The messages gathered at the time the error was emitted.""" @@ -24,7 +29,12 @@ def __init__( @property def critical_messages(self) -> Iterator[FeedbackMessage]: """Critical messages which caused the processing error.""" - yield from filter(lambda message: message.is_critical, self.messages) + yield from filter(lambda message: message.is_critical, self.messages) # type: ignore + + @classmethod + def from_exception(cls, exc: Exception): + """Create from broader exception, for recording in processing errors""" + return cls(error_message=repr(exc), entities=None, messages=[]) class EntityTypeMismatch(TypeError): diff --git a/src/dve/core_engine/templating.py b/src/dve/core_engine/templating.py index 4fc39f7..5dd4c4a 100644 --- a/src/dve/core_engine/templating.py +++ b/src/dve/core_engine/templating.py @@ -11,6 +11,17 @@ from dve.core_engine.type_hints import JSONable, TemplateVariables +class PreserveTemplateUndefined(jinja2.Undefined): # pylint: disable=too-few-public-methods + """ + Preserve the original template in instances where the value cannot be populated. Whilst this + may result in templates coming back in the FeedbackMessage object, it's more useful to know + exactly what should have been populated rather than just returning blank values. + """ + + def __str__(self): + return "{{" + self._undefined_name + "}}" + + class RuleTemplateError(ValueError): """A rule template error.""" @@ -21,7 +32,10 @@ def _raise_rule_templating_error(message: str) -> NoReturn: T = TypeVar("T", bound=JSONable) -ENVIRONMENT = jinja2.Environment(autoescape=False) +ENVIRONMENT = jinja2.Environment( + autoescape=jinja2.select_autoescape(default_for_string=False), + undefined=PreserveTemplateUndefined, +) ENVIRONMENT.globals["repr"] = repr ENVIRONMENT.globals["str"] = str ENVIRONMENT.globals["raise"] = _raise_rule_templating_error diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py index 991f46f..0be3763 100644 --- a/src/dve/core_engine/type_hints.py +++ b/src/dve/core_engine/type_hints.py @@ -236,7 +236,7 @@ PROCESSING_STATUSES: tuple[ProcessingStatus, ...] = tuple(list(get_args(ProcessingStatus))) """List of all possible DVE submission statuses""" -SubmissionResult = Literal["success", "failed", "failed_xml_generation", "archived"] +SubmissionResult = Literal["success", "validation_failed", "archived", "processing_failed"] """Allowed DVE submission results""" SUBMISSION_RESULTS: tuple[SubmissionResult, ...] = tuple(list(get_args(SubmissionResult))) diff --git a/src/dve/parser/file_handling/service.py b/src/dve/parser/file_handling/service.py index b638eb6..0422b4c 100644 --- a/src/dve/parser/file_handling/service.py +++ b/src/dve/parser/file_handling/service.py @@ -57,7 +57,7 @@ ) """Supported URI schemes.""" -ALL_FILE_MODES: set[FileOpenMode] = {"r", "a", "w", "ab", "rb", "wb", "ba", "br", "bw"} +ALL_FILE_MODES: set[FileOpenMode] = {"r", "a", "a+", "w", "ab", "rb", "wb", "ba", "br", "bw"} """All supported file modes.""" TEXT_MODES: set[TextFileOpenMode] = {"r", "a", "w"} """Text file modes.""" diff --git a/src/dve/parser/type_hints.py b/src/dve/parser/type_hints.py index c8b2fb7..f18cc4f 100644 --- a/src/dve/parser/type_hints.py +++ b/src/dve/parser/type_hints.py @@ -19,7 +19,7 @@ """The path attribute of the URI.""" Extension = str """A file extension (e.g. '.csv').""" -TextFileOpenMode = Literal["r", "a", "w"] +TextFileOpenMode = Literal["r", "a", "w", "a+"] """An opening mode for a file in text mode.""" BinaryFileOpenMode = Literal["ab", "rb", "wb", "ba", "br", "bw"] """An opening mode for a file in binary mode.""" diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 4e7707b..96156a9 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -23,24 +23,24 @@ class DDBDVEPipeline(BaseDVEPipeline): def __init__( self, + processed_files_path: URI, audit_tables: DDBAuditingManager, - job_run_id: int, connection: DuckDBPyConnection, rules_path: Optional[URI], - processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None, ): self._connection = connection super().__init__( + processed_files_path, audit_tables, - job_run_id, DuckDBDataContract(connection=self._connection), DuckDBStepImplementations.register_udfs(connection=self._connection), rules_path, - processed_files_path, submitted_files_path, reference_data_loader, + job_run_id, ) # pylint: disable=arguments-differ diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py new file mode 100644 index 0000000..5e0b757 --- /dev/null +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -0,0 +1,159 @@ +# pylint: disable=W0223 +"""A duckdb pipeline for running on Foundry platform""" + +from typing import Optional + +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( + duckdb_get_entity_count, + duckdb_write_parquet, +) +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.models import SubmissionInfo +from dve.core_engine.type_hints import URI +from dve.parser import file_handling as fh +from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation +from dve.parser.file_handling.service import _get_implementation +from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus +from dve.reporting.utils import dump_processing_errors + + +@duckdb_get_entity_count +@duckdb_write_parquet +class FoundryDDBPipeline(DDBDVEPipeline): + """DuckDB pipeline for running on Foundry Platform""" + + def persist_audit_records(self, submission_info: SubmissionInfo) -> URI: + """Write out key audit relations to parquet for persisting to datasets""" + write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/") + if isinstance(_get_implementation(write_to), LocalFilesystemImplementation): + write_to = fh.file_uri_to_local_path(write_to) + write_to.parent.mkdir(parents=True, exist_ok=True) + write_to = write_to.as_posix() + self.write_parquet( # type: ignore # pylint: disable=E1101 + self._audit_tables._processing_status.get_relation(), # pylint: disable=W0212 + fh.joinuri(write_to, "processing_status.parquet"), + ) + self.write_parquet( # type: ignore # pylint: disable=E1101 + self._audit_tables._submission_statistics.get_relation(), # pylint: disable=W0212 + fh.joinuri(write_to, "submission_statistics.parquet"), + ) + return write_to + + def file_transformation( + self, submission_info: SubmissionInfo + ) -> tuple[SubmissionInfo, SubmissionStatus]: + try: + return super().file_transformation(submission_info) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"File transformation raised exception: {exc}") + self._logger.exception(exc) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + [CriticalProcessingError.from_exception(exc)], + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, SubmissionStatus(processing_failed=True) + + def apply_data_contract( + self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None + ) -> tuple[SubmissionInfo, SubmissionStatus]: + try: + return super().apply_data_contract(submission_info, submission_status) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Apply data contract raised exception: {exc}") + self._logger.exception(exc) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "contract", + [CriticalProcessingError.from_exception(exc)], + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, SubmissionStatus(processing_failed=True) + + def apply_business_rules( + self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None + ): + try: + return super().apply_business_rules(submission_info, submission_status) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Apply business rules raised exception: {exc}") + self._logger.exception(exc) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "business_rules", + [CriticalProcessingError.from_exception(exc)], + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, SubmissionStatus(processing_failed=True) + + def error_report( + self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None + ): + try: + return super().error_report(submission_info, submission_status) + except Exception as exc: # pylint: disable=W0718 + self._logger.error(f"Error reports raised exception: {exc}") + self._logger.exception(exc) + sub_stats = None + report_uri = None + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "error_report", + [CriticalProcessingError.from_exception(exc)], + ) + self._audit_tables.mark_failed(submissions=[submission_info.submission_id]) + return submission_info, submission_status, sub_stats, report_uri + + def run_pipeline( + self, submission_info: SubmissionInfo + ) -> tuple[Optional[URI], Optional[URI], URI]: + """Sequential single submission pipeline runner""" + try: + sub_id: str = submission_info.submission_id + report_uri = None + self._audit_tables.add_new_submissions(submissions=[submission_info]) + self._audit_tables.mark_transform(submission_ids=[sub_id]) + sub_info, sub_status = self.file_transformation(submission_info=submission_info) + if not (sub_status.validation_failed or sub_status.processing_failed): + self._audit_tables.mark_data_contract(submission_ids=[sub_id]) + sub_info, sub_status = self.apply_data_contract( + submission_info=sub_info, submission_status=sub_status + ) + self._audit_tables.mark_business_rules( + submissions=[(sub_id, sub_status.validation_failed)] + ) + sub_info, sub_status = self.apply_business_rules( + submission_info=submission_info, submission_status=sub_status + ) + + if not sub_status.processing_failed: + self._audit_tables.mark_error_report( + submissions=[(sub_id, sub_status.submission_result)] + ) + sub_info, sub_status, sub_stats, report_uri = self.error_report( + submission_info=submission_info, submission_status=sub_status + ) + self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + except Exception as err: # pylint: disable=W0718 + self._logger.error( + f"During processing of submission_id: {sub_id}, this exception was raised: {err}" + ) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "run_pipeline", + [CriticalProcessingError.from_exception(err)], + ) + self._audit_tables.mark_failed(submissions=[sub_id]) + finally: + audit_files_uri = self.persist_audit_records(submission_info=submission_info) + return ( + ( + None + if (sub_status.validation_failed or sub_status.processing_failed) + else fh.joinuri(self.processed_files_path, sub_id, "business_rules") + ), + report_uri if report_uri else None, + audit_files_uri, + ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 69cb141..819656a 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -6,6 +6,7 @@ from collections.abc import Generator, Iterable, Iterator from concurrent.futures import Executor, Future, ThreadPoolExecutor from functools import lru_cache +from itertools import starmap from threading import Lock from typing import Optional, Union from uuid import uuid4 @@ -23,12 +24,17 @@ from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType from dve.core_engine.backends.utilities import stringify_model +from dve.core_engine.exceptions import CriticalProcessingError from dve.core_engine.loggers import get_logger +from dve.core_engine.message import FeedbackMessage from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord -from dve.core_engine.type_hints import URI, Failed, FileURI, InfoURI, Messages +from dve.core_engine.type_hints import URI, FileURI, InfoURI from dve.parser import file_handling as fh +from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation +from dve.parser.file_handling.service import _get_implementation from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader -from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates, conditional_cast +from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates +from dve.reporting.utils import dump_feedback_errors, dump_processing_errors PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = ( FileNotFoundError, # type: ignore @@ -43,14 +49,14 @@ class BaseDVEPipeline: def __init__( self, + processed_files_path: URI, audit_tables: BaseAuditingManager, - job_run_id: int, data_contract: BaseDataContract, step_implementations: Optional[BaseStepImplementations[EntityType]], rules_path: Optional[URI], - processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, + job_run_id: Optional[int] = None, ): self._submitted_files_path = submitted_files_path self._processed_files_path = processed_files_path @@ -66,12 +72,12 @@ def __init__( self._aggregates_lock = Lock() @property - def job_run_id(self) -> int: + def job_run_id(self) -> Optional[int]: """Unique Identifier for the job/process that is running this Pipeline.""" return self._job_run_id @property - def processed_files_path(self) -> Optional[URI]: + def processed_files_path(self) -> URI: """URI Location for where the files are being processed.""" return self._processed_files_path @@ -95,41 +101,15 @@ def get_entity_count(entity: EntityType) -> int: """Get a row count of an entity stored as parquet""" raise NotImplementedError() - def _dump_errors( - self, - submission_id: str, - step_name: str, - messages: Messages, - key_fields: Optional[dict[str, list[str]]] = None, - ): - if not self.processed_files_path: - raise AttributeError("processed files path not passed") - - if not key_fields: - key_fields = {} - - errors = fh.joinuri( - self.processed_files_path, submission_id, "errors", f"{step_name}_errors.json" - ) - processed = [] - - for message in messages: - primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) - error = message.to_dict( - key_field=primary_keys, - value_separator=" -- ", - max_number_of_values=10, - record_converter=None, - ) - error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") - processed.append(error) - - with fh.open_stream(errors, "w") as f: - json.dump( - processed, - f, - default=str, + def get_submission_status(self, step_name: str, submission_id: str) -> SubmissionStatus: + """Determine submission status of a submission if not explicitly given""" + if not (submission_status := self._audit_tables.get_submission_status(submission_id)): + self._logger.warning( + f"Unable to determine status of submission_id: {submission_id}" + + f" in service {step_name} - assuming no issues." ) + return SubmissionStatus() + return submission_status @validate_arguments def _move_submission_to_working_location( @@ -266,6 +246,11 @@ def audit_received_file_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"audit_received_file raised exception: {exc}") self._logger.exception(exc) + dump_processing_errors( + fh.joinuri(self.processed_files_path, submission_id), + "audit_received", + [CriticalProcessingError.from_exception(exc)], + ) # sub_info should at least # be populated with file_name and file_extension failed.append( @@ -296,35 +281,45 @@ def audit_received_file_step( def file_transformation( self, submission_info: SubmissionInfo - ) -> Union[SubmissionInfo, dict[str, str]]: + ) -> tuple[SubmissionInfo, SubmissionStatus]: """Transform a file from its original format into a 'stringified' parquet file""" if not self.processed_files_path: raise AttributeError("processed files path not provided") + errors: list[FeedbackMessage] = [] + submission_status: SubmissionStatus = SubmissionStatus() submission_file_uri: URI = fh.joinuri( self.processed_files_path, submission_info.submission_id, submission_info.file_name_with_ext, ) try: - errors = self.write_file_to_parquet( - submission_file_uri, submission_info, self.processed_files_path + errors.extend( + self.write_file_to_parquet( + submission_file_uri, submission_info, self.processed_files_path + ) ) - if errors: - self._dump_errors(submission_info.submission_id, "file_transformation", errors) - return submission_info.dict() - return submission_info - except ValueError as exc: - self._logger.error(f"File transformation write_file_to_parquet raised error: {exc}") - return submission_info.dict() - except Exception as exc: # pylint: disable=broad-except + + except MessageBearingError as exc: self._logger.error(f"Unexpected file transformation error: {exc}") self._logger.exception(exc) - return submission_info.dict() + errors.extend(exc.messages) + + if errors: + dump_feedback_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "file_transformation", + errors, + ) + submission_status.validation_failed = True + return submission_info, submission_status + return submission_info, submission_status def file_transformation_step( self, pool: Executor, submissions_to_process: list[SubmissionInfo] - ) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]: + ) -> tuple[ + list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] + ]: """Step to transform files from their original format into parquet files""" file_transform_futures: list[tuple[SubmissionInfo, Future]] = [] @@ -333,15 +328,21 @@ def file_transformation_step( future = pool.submit(self.file_transformation, submission_info) file_transform_futures.append((submission_info, future)) - success: list[SubmissionInfo] = [] - failed: list[SubmissionInfo] = [] - failed_processing: list[SubmissionInfo] = [] + success: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for sub_info, future in file_transform_futures: try: # sub_info passed here either return SubInfo or dict. If SubInfo, not actually # modified in anyway during this step. - result = future.result() + submission_info: SubmissionInfo # type: ignore + submission_status: SubmissionStatus + submission_info, submission_status = future.result() + if submission_status.validation_failed: + failed.append((submission_info, submission_status)) + else: + success.append((submission_info, submission_status)) except AttributeError as exc: self._logger.error(f"File transformation raised exception: {exc}") raise exc @@ -353,26 +354,27 @@ def file_transformation_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"File transformation raised exception: {exc}") self._logger.exception(exc) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "file_transformation", + [CriticalProcessingError.from_exception(exc)], + ) + submission_status = SubmissionStatus(processing_failed=True) + failed_processing.append((sub_info, submission_status)) continue - if isinstance(result, SubmissionInfo): - success.append(sub_info) - else: - failed.append(sub_info) - if len(success) > 0: self._audit_tables.mark_data_contract( - list(map(lambda x: x.submission_id, success)), job_run_id=self.job_run_id + list(starmap(lambda x, _: x.submission_id, success)), job_run_id=self.job_run_id ) if len(failed) > 0: self._audit_tables.mark_error_report( list( - map( - lambda x: ( + starmap( + lambda x, _: ( x.submission_id, - "failed", + "validation_failed", ), failed, ) @@ -382,14 +384,19 @@ def file_transformation_step( if len(failed_processing) > 0: self._audit_tables.mark_failed( - [si.submission_id for si in failed_processing], job_run_id=self.job_run_id + [si.submission_id for si, _ in failed_processing], job_run_id=self.job_run_id ) return success, failed - def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo, Failed]: + def apply_data_contract( + self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None + ) -> tuple[SubmissionInfo, SubmissionStatus]: """Method for applying the data contract given a submission_info""" - + if not submission_status: + submission_status = self.get_submission_status( + "contract", submission_info.submission_id + ) if not self.processed_files_path: raise AttributeError("processed files path not provided") @@ -417,30 +424,51 @@ def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[Submissi key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if messages: - self._dump_errors( - submission_info.submission_id, "contract", messages, key_fields=key_fields + dump_feedback_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "contract", + messages, + key_fields=key_fields, ) - failed = any(not rule_message.is_informational for rule_message in messages) + validation_failed = any(not rule_message.is_informational for rule_message in messages) + + if validation_failed: + submission_status.validation_failed = True - return submission_info, failed + return submission_info, submission_status def data_contract_step( - self, pool: Executor, file_transform_results: list[SubmissionInfo] - ) -> tuple[list[tuple[SubmissionInfo, Failed]], list[SubmissionInfo]]: + self, + pool: Executor, + file_transform_results: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]], + ) -> tuple[ + list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]] + ]: """Step to validate the types of an untyped (stringly typed) parquet file""" - processed_files: list[tuple[SubmissionInfo, Failed]] = [] - failed_processing: list[SubmissionInfo] = [] - dc_futures: list[tuple[SubmissionInfo, Future]] = [] - - for info in file_transform_results: - dc_futures.append((info, pool.submit(self.apply_data_contract, info))) + processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] + dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] + + for info, sub_status in file_transform_results: + sub_status = ( + sub_status + if sub_status + else self.get_submission_status("contract", info.submission_id) + ) + dc_futures.append( + ( + info, + sub_status, # type: ignore + pool.submit(self.apply_data_contract, info, sub_status), + ) + ) - for sub_info, future in dc_futures: + for sub_info, sub_status, future in dc_futures: try: submission_info: SubmissionInfo - failed: Failed - submission_info, failed = future.result() + submission_status: SubmissionStatus + submission_info, submission_status = future.result() except AttributeError as exc: self._logger.error(f"Data Contract raised exception: {exc}") raise exc @@ -452,32 +480,45 @@ def data_contract_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Data Contract raised exception: {exc}") self._logger.exception(exc) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "contract", + [CriticalProcessingError.from_exception(exc)], + ) + sub_status.processing_failed = True + failed_processing.append((sub_info, sub_status)) continue - processed_files.append((submission_info, failed)) + processed_files.append((submission_info, submission_status)) if len(processed_files) > 0: self._audit_tables.mark_business_rules( [ - (sub_info.submission_id, failed) # type: ignore - for sub_info, failed in processed_files + (sub_info.submission_id, submission_status.validation_failed) # type: ignore + for sub_info, submission_status in processed_files ], job_run_id=self.job_run_id, ) if len(failed_processing) > 0: self._audit_tables.mark_failed( - [sub_info.submission_id for sub_info in failed_processing], + [sub_info.submission_id for sub_info, _ in failed_processing], job_run_id=self.job_run_id, ) return processed_files, failed_processing - def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): + def apply_business_rules( + self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None + ): """Apply the business rules to a given submission, the submission may have failed at the data_contract step so this should be passed in as a bool """ + if not submission_status: + submission_status = self.get_submission_status( + "business_rules", submission_info.submission_id + ) + if not self.rules_path: raise AttributeError("business rules path not provided.") @@ -516,11 +557,17 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if rule_messages: - self._dump_errors( - submission_info.submission_id, "business_rules", rule_messages, key_fields + dump_feedback_errors( + fh.joinuri(self.processed_files_path, submission_info.submission_id), + "business_rules", + rule_messages, + key_fields, ) - failed = any(not rule_message.is_informational for rule_message in rule_messages) or failed + submission_status.validation_failed = ( + any(not rule_message.is_informational for rule_message in rule_messages) + or submission_status.validation_failed + ) for entity_name, entity in entity_manager.entities.items(): projected = self._step_implementations.write_parquet( # type: ignore @@ -536,47 +583,58 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): projected ) - status = SubmissionStatus( - failed=failed, - number_of_records=self.get_entity_count( - entity=entity_manager.entities[ - f"""Original{rules.global_variables.get( + submission_status.number_of_records = self.get_entity_count( + entity=entity_manager.entities[ + f"""Original{rules.global_variables.get( 'entity', submission_info.dataset_id)}""" - ] - ), + ] ) - return submission_info, status + return submission_info, submission_status def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, Failed]], + files: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]], ) -> tuple[ list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]], - list[SubmissionInfo], + list[tuple[SubmissionInfo, SubmissionStatus]], ]: """Step to apply business rules (Step impl) to a typed parquet file""" - future_files: list[tuple[SubmissionInfo, Future]] = [] - - for submission_info, submission_failed in files: + future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] + + for submission_info, submission_status in files: + submission_status = ( + submission_status + if submission_status + else self.get_submission_status( + step_name="business_rules", + submission_id=submission_info.submission_id, + ) + ) future_files.append( ( submission_info, - pool.submit(self.apply_business_rules, submission_info, submission_failed), + submission_status, + pool.submit(self.apply_business_rules, submission_info, submission_status), ) ) - failed_processing: list[SubmissionInfo] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] unsucessful_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] successful_files: list[tuple[SubmissionInfo, SubmissionStatus]] = [] - for sub_info, future in future_files: - status: SubmissionStatus + for sub_info, sub_status, future in future_files: try: - submission_info, status = future.result() + submission_info: SubmissionInfo # type: ignore + submission_status: SubmissionStatus # type: ignore + submission_info, submission_status = future.result() + if submission_status.validation_failed: # type: ignore + unsucessful_files.append((submission_info, submission_status)) # type: ignore + else: + successful_files.append((submission_info, submission_status)) # type: ignore except AttributeError as exc: self._logger.error(f"Business Rules raised exception: {exc}") raise exc @@ -588,14 +646,15 @@ def business_rule_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Business Rules raised exception: {exc}") self._logger.exception(exc) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "business_rules", + [CriticalProcessingError.from_exception(exc)], + ) + sub_status.processing_failed = True + failed_processing.append((sub_info, sub_status)) continue - if status.failed: - unsucessful_files.append((submission_info, status)) - else: - successful_files.append((submission_info, status)) - if len(unsucessful_files + successful_files) > 0: self._audit_tables.mark_error_report( [ @@ -607,11 +666,29 @@ def business_rule_step( if len(failed_processing) > 0: self._audit_tables.mark_failed( - [si.submission_id for si in failed_processing], job_run_id=self.job_run_id + [si.submission_id for si, _ in failed_processing], job_run_id=self.job_run_id ) return successful_files, unsucessful_files, failed_processing + def _publish_error_aggregates(self, submission_id: str, aggregates_df: pl.DataFrame) -> URI: # type: ignore + """Store error aggregates as parquet for auditing""" + output_uri = fh.joinuri( + self.processed_files_path, + submission_id, + "audit", + "error_aggregates.parquet", + ) + if isinstance(_get_implementation(output_uri), LocalFilesystemImplementation): + output_uri = fh.file_uri_to_local_path(output_uri) + output_uri.parent.mkdir(parents=True, exist_ok=True) + output_uri = output_uri.as_posix() + aggregates_df = aggregates_df.with_columns( + pl.lit(submission_id).alias("submission_id") # type: ignore + ) + aggregates_df.write_parquet(output_uri) + return output_uri + @lru_cache() # noqa: B019 def _get_error_dataframes(self, submission_id: str): if not self.processed_files_path: @@ -663,14 +740,24 @@ def _get_error_dataframes(self, submission_id: str): return errors_df, aggregates - def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus): + def error_report( + self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None + ) -> tuple[ + SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI] + ]: """Creates the error reports given a submission info and submission status""" + + if not submission_status: + submission_status = self.get_submission_status( + "error_report", submission_info.submission_id + ) + if not self.processed_files_path: raise AttributeError("processed files path not provided") errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id) - if not status.number_of_records: + if not submission_status.number_of_records: sub_stats = None else: err_types = { @@ -681,7 +768,7 @@ def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus } sub_stats = SubmissionStatisticsRecord( submission_id=submission_info.submission_id, - record_count=status.number_of_records, + record_count=submission_status.number_of_records, number_record_rejections=err_types.get("Submission Failure", 0), number_warnings=err_types.get("Warning", 0), ) @@ -709,35 +796,46 @@ def error_report(self, submission_info: SubmissionInfo, status: SubmissionStatus with fh.open_stream(report_uri, "wb") as stream: stream.write(er.ExcelFormat.convert_to_bytes(workbook)) - return submission_info, status, sub_stats, report_uri + self._publish_error_aggregates(submission_info.submission_id, aggregates) + + return submission_info, submission_status, sub_stats, report_uri def error_report_step( self, pool: Executor, - processed: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), - failed_file_transformation: Iterable[SubmissionInfo] = tuple(), + processed: Iterable[tuple[SubmissionInfo, Optional[SubmissionStatus]]] = tuple(), + failed_file_transformation: Iterable[tuple[SubmissionInfo, SubmissionStatus]] = tuple(), ) -> list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] ]: """Step to produce error reports takes processed files and files that failed file transformation """ - futures: list[tuple[SubmissionInfo, Future]] = [] + futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = [] reports: list[ tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI] ] = [] - failed_processing: list[SubmissionInfo] = [] + failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = [] for info, status in processed: - futures.append((info, pool.submit(self.error_report, info, status))) + status = ( + status + if status + else self.get_submission_status( + step_name="error_report", + submission_id=info.submission_id, + ) + ) + futures.append((info, status, pool.submit(self.error_report, info, status))) - for info_dict in failed_file_transformation: - status = SubmissionStatus(True, 0) - futures.append((info_dict, pool.submit(self.error_report, info_dict, status))) + for info_dict, status in failed_file_transformation: + status.number_of_records = 0 + futures.append((info_dict, status, pool.submit(self.error_report, info_dict, status))) - for sub_info, future in futures: + for sub_info, status, future in futures: try: - submission_info, status, stats, feedback_uri = future.result() + submission_info, submission_status, submission_stats, feedback_uri = future.result() + reports.append((submission_info, submission_status, submission_stats, feedback_uri)) except AttributeError as exc: self._logger.error(f"Error reports raised exception: {exc}") raise exc @@ -749,9 +847,14 @@ def error_report_step( except Exception as exc: # pylint: disable=W0703 self._logger.error(f"Error reports raised exception: {exc}") self._logger.exception(exc) - failed_processing.append(sub_info) + dump_processing_errors( + fh.joinuri(self.processed_files_path, sub_info.submission_id), + "error_report", + [CriticalProcessingError.from_exception(exc)], + ) + status.processing_failed = True + failed_processing.append((sub_info, status)) continue - reports.append((submission_info, status, stats, feedback_uri)) if reports: self._audit_tables.mark_finished( @@ -767,7 +870,7 @@ def error_report_step( if failed_processing: self._audit_tables.mark_failed( - [submission_info.submission_id for submission_info in failed_processing], + [submission_info.submission_id for submission_info, _ in failed_processing], job_run_id=self.job_run_id, ) @@ -786,9 +889,9 @@ def cluster_pipeline_run( # what should we do with files that fail auditing - likely to be an internal matter - # no error report required? transformed, failed_transformation = self.file_transformation_step(pool, audited) - passed_contract, _failed_contract = self.data_contract_step(pool, transformed) + passed_contract, _failed_contract = self.data_contract_step(pool, transformed) # type: ignore passed_br, failed_br, _failed_br_other_reason = self.business_rule_step( - pool, passed_contract + pool, passed_contract # type: ignore ) report_results = self.error_report_step( diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index d31a2ee..4111cf3 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -11,9 +11,9 @@ from dve.core_engine.backends.implementations.spark.rules import SparkStepImplementations from dve.core_engine.backends.implementations.spark.spark_helpers import spark_get_entity_count from dve.core_engine.models import SubmissionInfo -from dve.core_engine.type_hints import URI, Failed +from dve.core_engine.type_hints import URI from dve.pipeline.pipeline import BaseDVEPipeline -from dve.pipeline.utils import unpersist_all_rdds +from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds # pylint: disable=abstract-method @@ -25,24 +25,24 @@ class SparkDVEPipeline(BaseDVEPipeline): def __init__( self, + processed_files_path: URI, audit_tables: SparkAuditingManager, - job_run_id: int, rules_path: Optional[URI], - processed_files_path: Optional[URI], submitted_files_path: Optional[URI], reference_data_loader: Optional[type[BaseRefDataLoader]] = None, spark: Optional[SparkSession] = None, + job_run_id: Optional[int] = None, ): self._spark = spark if spark else SparkSession.builder.getOrCreate() super().__init__( + processed_files_path, audit_tables, - job_run_id, SparkDataContract(spark_session=self._spark), SparkStepImplementations.register_udfs(self._spark), rules_path, - processed_files_path, submitted_files_path, reference_data_loader, + job_run_id, ) # pylint: disable=arguments-differ @@ -56,7 +56,7 @@ def write_file_to_parquet( # type: ignore def business_rule_step( self, pool: Executor, - files: list[tuple[SubmissionInfo, Failed]], + files: list[tuple[SubmissionInfo, Optional[SubmissionStatus]]], ): successful_files, unsucessful_files, failed_processing = super().business_rule_step( pool, files diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index 4fa9a02..a7e88aa 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -1,8 +1,9 @@ +# pylint: disable=R0903 """Utilities to be used with services to abstract away some of the config loading and threading""" import json from threading import Lock -from typing import Optional, Union +from typing import Optional from pydantic.main import ModelMetaclass from pyspark.sql import SparkSession @@ -12,6 +13,7 @@ import dve.parser.file_handling as fh from dve.core_engine.backends.readers import _READER_REGISTRY from dve.core_engine.configuration.v1 import SchemaName, V1EngineConfig, _ModelConfig +from dve.core_engine.loggers import get_logger from dve.core_engine.type_hints import URI, SubmissionResult from dve.metadata_parser.model_generator import JSONtoPyd @@ -19,6 +21,8 @@ _configs: dict[str, tuple[dict[str, ModelMetaclass], V1EngineConfig, Dataset]] = {} locks = Lock() +logger = get_logger(__name__) + def load_config( dataset_id: str, @@ -71,25 +75,22 @@ def deadletter_file(source_uri: URI) -> None: class SubmissionStatus: """Submission status for a given submission.""" - # _logger = get_logger("submission_status") - - def __init__(self, failed: bool, number_of_records: Optional[int] = None): - self._failed = failed - self._number_of_records = number_of_records - - @property - def failed(self): - """Whether the submission was successfully processed.""" - return self._failed - - @property - def number_of_records(self) -> Union[int, None]: - """Number of records within a submission.""" - return self._number_of_records + def __init__( + self, + validation_failed: bool = False, + number_of_records: Optional[int] = None, + processing_failed: bool = False, + ): + self.validation_failed = validation_failed + self.number_of_records = number_of_records + self.processing_failed = processing_failed @property def submission_result(self) -> SubmissionResult: - """The result of the submission, either succes, failed or failed_xml_generation""" - if self.failed: - return "failed" + """The current submission result - assumes success if + neither validation nor processing has failed.""" + if self.processing_failed: + return "processing_failed" + if self.validation_failed: + return "validation_failed" return "success" diff --git a/src/dve/pipeline/xml_linting.py b/src/dve/pipeline/xml_linting.py deleted file mode 100644 index fbf53a6..0000000 --- a/src/dve/pipeline/xml_linting.py +++ /dev/null @@ -1,363 +0,0 @@ -"""Implement XML linting for files.""" - -import argparse -import re -import shutil -import sys -import tempfile -from collections.abc import Iterable, Sequence -from contextlib import ExitStack -from pathlib import Path -from subprocess import PIPE, STDOUT, Popen -from typing import Optional -from uuid import uuid4 - -from typing_extensions import Literal - -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import Messages -from dve.parser.file_handling import copy_resource, get_file_name, get_resource_exists, open_stream -from dve.parser.file_handling.implementations.file import file_uri_to_local_path -from dve.parser.type_hints import URI - -Replacement = str -"""A replacement for a regex pattern.""" -Stage = Literal["Pre-validation", "Post-validation"] -"""The stage at which the XML linting is being performed.""" -ErrorMessage = str -"""Error message for xml issues""" -ErrorCode = str -"""Error code for xml feedback errors""" - -FIVE_MEBIBYTES = 5 * (1024**3) -"""The size of 5 binary megabytes, in bytes.""" - -# Patterns/strings for xmllint message sanitisation. -IGNORED_PATTERNS: list[re.Pattern] = [re.compile(r"^Unimplemented block at")] -"""Regex patterns for messages that should result in their omission.""" -ERRONEOUS_PATTERNS: list[tuple[re.Pattern, Replacement]] = [ - ( - re.compile(r"^XSD schema .+/(?P.+) failed to compile$"), - r"Missing required component of XSD schema '\g'", - ) -] -""" -Patterns for messages that should trigger errors. Replacement messages will be -raised as `ValueError`s. - -""" -INCORRECT_NAMESPACE_PATTERN = re.compile( - r"Element .*?{(?P.*?)}.*? " - r"No matching global declaration available for the validation root." -) -"""# 1 capture -Pattern to match incorrect namespace found in the xmllint errors. Captures incorrect namespace -""" -# match literal Element followed by anything up to opening curly brace. capture the contents -# of the braces if the literal "No matching global declaration available..." is present in the -# message. This is raised when the namespace doesn't match between the schema and the file - -MISSING_OR_OUT_OF_ORDER_PATTERN = re.compile( - r"Element .*?{.*?}(?P.*?): This element is not expected\. " - r"Expected is .*?}(?P.*?)\)" -) -"""# 2 captures -Pattern to match out of order elements found in the xmllint errors. Captures incorrect found -and expected fields (in that order) -""" -# match literal Element followed by anything up to opening curly brace. capture the field that -# comes after the braces if the literal "this element is not expected" is present in the -# message. capture the expected value too -# This is raised when the xml fields are out of order or a field is missing. - -MISSING_CHILD_ELEMENTS_PATTERN = re.compile( - r"Element .*?{.*?}.*?: Missing child element\(s\). Expected is.*?}(?P.*?)\)" -) -"""# 1 capture -Pattern to match when a tag has missing child elements. Captures the expected element -""" -FAILS_TO_VALIDATE_PATTERN = re.compile(r"fails to validate") -"""# 0 captures -Pattern to match the fails to validate xmllint message, doesn't capture anything""" - -UNEXPECTED_FIELD_PATTERN = re.compile( - r"Element .*?{.*?}(?P.*?): This element is not expected\.$" -) -"""# 1 capture -Pattern to match unexpected fields rather than out of order fields. Captures incorrect field -""" - -REMOVED_PATTERNS: list[re.Pattern] = [ - re.compile(r"\{.+?\}"), - re.compile(r"[\. ]+$"), -] -"""Regex patterns to remove from the xmllint output.""" -REPLACED_PATTERNS: list[tuple[re.Pattern, Replacement]] = [ - (re.compile(r":(?P\d+):"), r" on line \g:"), -] -"""Regex patterns to replace in the xmllint output.""" -REPLACED_STRINGS: list[tuple[str, Replacement]] = [ - ( - "No matching global declaration available for the validation root", - "Incorrect namespace version, please ensure you have the most recent namespace", - ), - ( - "fails to validate", - "Whole file has failed schema validation - please ensure your data matches the expected " - + "structure", - ), -] -"""Strings to replace in the xmllint output.""" - - -def _sanitise_lint_issue(issue: str, file_name: str) -> Optional[str]: - """Sanitise an xmllint lint message. If the message should be ignored, - this function will return `None` instead of a string. - - If messages are considered erroneous, a `ValueError` will be raised. - - """ - for pattern in IGNORED_PATTERNS: - if pattern.match(issue): - return None - for pattern, error_message in ERRONEOUS_PATTERNS: - if pattern.match(issue): - raise ValueError(pattern.sub(error_message, issue)) - - # Line will start with "-" because that's the file name for streamed XML files. - if issue.startswith("-"): - issue = "".join((file_name, issue[1:])) - - for pattern in REMOVED_PATTERNS: - issue = pattern.sub("", issue) - for pattern, replacement in REPLACED_PATTERNS: - issue = pattern.sub(replacement, issue) - for string, replacement in REPLACED_STRINGS: - issue = issue.replace(string, replacement) - - return issue - - -def _parse_lint_messages( - lint_messages: Iterable[str], - error_mapping: dict[re.Pattern, tuple[ErrorMessage, ErrorCode]], - stage: Stage = "Pre-validation", - file_name: Optional[str] = None, -) -> Messages: - """Parse a sequence of messages from `xmllint` into a deque of feedback messages.""" - messages: Messages = [] - for issue in lint_messages: - for regex, (error_message, error_code) in error_mapping.items(): - match_ = regex.search(issue) - if match_: - groups = {key: group.strip(",' ") for key, group in match_.groupdict().items()} - if not groups: - reporting_field = "Whole file" - groups[reporting_field] = file_name - elif len(groups) > 1: - reporting_field = list(groups) # type: ignore - else: - reporting_field = list(groups)[0] - - messages.append( - FeedbackMessage( - entity=stage, - record={"Wholefile": groups}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_code=error_code, - error_message=error_message.format(*groups.values()), - reporting_field=reporting_field, - category="Bad file", - ) - ) - return list(dict.fromkeys(messages)) # remove duplicate errors but preserve order - - -def _ensure_schema_and_resources( - schema_uri: URI, schema_resources: Sequence[URI], temp_dir: Path -) -> Path: - """Given the schema and schema resource URIs and a temp dir, if the resources - are remote or exist in different directories, copy them to the temp dir. - - Return the local schema path. - - """ - if not get_resource_exists(schema_uri): - raise IOError(f"No resource accessible at schema URI {schema_uri!r}") - - missing_resources = list( - filter(lambda resource: not get_resource_exists(resource), schema_resources) - ) - if missing_resources: - raise IOError(f"Some schema resources missing: {missing_resources!r}") - - all_resources = [schema_uri, *schema_resources] - - schemas_are_files = all(map(lambda resource: resource.startswith("file:"), all_resources)) - if schemas_are_files: - paths = list(map(file_uri_to_local_path, all_resources)) - all_paths_have_same_parent = len({path.parent for path in paths}) == 1 - - if all_paths_have_same_parent: - schema_path = paths[0] - return schema_path - - for resource_uri in all_resources: - local_path = temp_dir.joinpath(get_file_name(resource_uri)) - copy_resource(resource_uri, local_path.as_uri()) - - schema_path = temp_dir.joinpath(get_file_name(schema_uri)) - return schema_path - - -def run_xmllint( - file_uri: URI, - schema_uri: URI, - *schema_resources: URI, - error_mapping: dict[re.Pattern, tuple[ErrorMessage, ErrorCode]], - stage: Stage = "Pre-validation", -) -> Messages: - """Run `xmllint`, given a file and information about the schemas to apply. - - The schema and associated resources will be copied to a temporary directory - for validation, unless they are all already in the same local folder. - - Args: - - `file_uri`: the URI of the file to be streamed into `xmllint` - - `schema_uri`: the URI of the XSD schema for the file. - - `*schema_resources`: URIs for additional XSD files required by the schema. - - `stage` (keyword only): One of `{'Pre-validation', 'Post-validation'}` - - Returns a deque of messages produced by the linting. - - """ - if not shutil.which("xmllint"): - raise OSError("Unable to find `xmllint` binary") - - if not get_resource_exists(file_uri): - raise IOError(f"No resource accessible at file URI {file_uri!r}") - - # Ensure the schema and resources are local file paths so they can be - # read by xmllint. - # Lots of resources to manage here. - with tempfile.TemporaryDirectory() as temp_dir_str: - temp_dir = Path(temp_dir_str) - schema_path = _ensure_schema_and_resources(schema_uri, schema_resources, temp_dir) - message_file_path = temp_dir.joinpath(uuid4().hex) - - with ExitStack() as linting_context: - # Need to write lint output to a file to avoid deadlock. Kinder to mem this way anyway. - message_file_bytes = linting_context.enter_context(message_file_path.open("wb")) - - # Open an `xmllint` process to pipe into. - command = ["xmllint", "--stream", "--schema", str(schema_path), "-"] - process = linting_context.enter_context( - Popen(command, stdin=PIPE, stdout=message_file_bytes, stderr=STDOUT) - ) - # This should never trigger, bad typing in stdlib. - if process.stdin is None: - raise ValueError("Unable to pipe file into subprocess") - - # Pipe the XML file contents into xmllint. - block = b"" - try: - with open_stream(file_uri, "rb") as byte_stream: - while True: - block = byte_stream.read(FIVE_MEBIBYTES) - if not block: - break - process.stdin.write(block) - except BrokenPipeError: - file_name = get_file_name(file_uri) - return [ - FeedbackMessage( - entity=stage, - record={"Whole file": file_name}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="failed schema check", - category="Bad file", - ), - FeedbackMessage( - entity=stage, - record={"Whole file": block[:50].decode(errors="replace")}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="Failed xml validation", - reporting_field="Whole file", - category="Bad file", - ), - ] - - # Close the input stream and await the response code. - # Output will be written to the message file. - process.stdin.close() - # TODO: Identify an appropriate timeout. - return_code = process.wait() - - if return_code == 0: - return [] - with message_file_path.open("r", encoding="utf-8") as message_file: - lint_messages = (line for line in map(str.rstrip, message_file) if line) - file_name = get_file_name(file_uri) - messages = _parse_lint_messages(lint_messages, error_mapping, stage, file_name) - - # Nonzero exit code without messages _shouldn't_ happen, but it's possible - # if `xmllint` is killed (and presumably possible if it runs out of mem) - # so we should handle that possibility. - if not messages: - messages.append( - FeedbackMessage( - entity=stage, - record={"Whole file": file_name}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="failed schema check", - category="Bad file", - ) - ) - - return messages - - -def _main(cli_args: list[str]): - """Command line interface for XML linting. Useful for testing.""" - parser = argparse.ArgumentParser() - parser.add_argument("xml_file_path", help="The path to the XML file to be validated") - parser.add_argument( - "xsd_file_paths", help="The path to the XSD schemas (primary schema first)", nargs="+" - ) - args = parser.parse_args(cli_args) - - xml_path = Path(args.xml_file_path).resolve() - xsd_uris = list(map(lambda path_str: Path(path_str).resolve().as_uri(), args.xsd_file_paths)) - try: - messages = run_xmllint(xml_path.as_uri(), *xsd_uris, error_mapping={}) - except Exception as err: # pylint: disable=broad-except - print(f"Exception ({type(err).__name__}) raised in XML linting.", file=sys.stderr) - print(err, file=sys.stderr) - sys.exit(2) - - if not messages: - print(f"File {xml_path.name!r} validated successfully\n", file=sys.stderr) - sys.exit(0) - else: - print(f"File {xml_path.name!r} validation failed\n", file=sys.stderr) - - print("|".join(FeedbackMessage.HEADER)) - for message in messages: - print("|".join(map(lambda col: str(col) if col else "", message.to_row()))) - sys.exit(1) - - -if __name__ == "__main__": - _main(sys.argv[1:]) diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py new file mode 100644 index 0000000..3dac919 --- /dev/null +++ b/src/dve/reporting/utils.py @@ -0,0 +1,76 @@ +"""Utilities to support reporting""" + +import json +from typing import Optional + +import dve.parser.file_handling as fh +from dve.core_engine.exceptions import CriticalProcessingError +from dve.core_engine.type_hints import URI, Messages +from dve.reporting.error_report import conditional_cast + + +def dump_feedback_errors( + working_folder: URI, + step_name: str, + messages: Messages, + key_fields: Optional[dict[str, list[str]]] = None, +): + """Write out captured feedback error messages.""" + if not working_folder: + raise AttributeError("processed files path not passed") + + if not key_fields: + key_fields = {} + + errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json") + processed = [] + + for message in messages: + primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) + error = message.to_dict( + key_field=primary_keys, + value_separator=" -- ", + max_number_of_values=10, + record_converter=None, + ) + error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") + processed.append(error) + + with fh.open_stream(errors, "a") as f: + json.dump( + processed, + f, + default=str, + ) + + +def dump_processing_errors( + working_folder: URI, step_name: str, errors: list[CriticalProcessingError] +): + """Write out critical processing errors""" + if not working_folder: + raise AttributeError("processed files path not passed") + if not step_name: + raise AttributeError("step name not passed") + if not errors: + raise AttributeError("errors list not passed") + + error_file: URI = fh.joinuri(working_folder, "errors", "processing_errors.json") + processed = [] + + for error in errors: + processed.append( + { + "step_name": step_name, + "error_location": "processing", + "error_level": "integrity", + "error_message": error.error_message, + } + ) + + with fh.open_stream(error_file, "a") as f: + json.dump( + processed, + f, + default=str, + ) diff --git a/tests/features/books.feature b/tests/features/books.feature index 49cf2b3..9bc0611 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -64,6 +64,15 @@ Feature: Pipeline tests using the books dataset And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase + Then the latest audit record for the submission is marked with processing status failed + # TODO - handle above within the stream xml reader - specific + + Scenario: Handle a file that fails XSD validation (duckdb) + Given I submit the books file books_xsd_fail.xml for processing + And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase Then the latest audit record for the submission is marked with processing status error_report When I run the error report phase Then An error report is produced diff --git a/tests/features/movies.feature b/tests/features/movies.feature index e55de5c..b148547 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -41,6 +41,7 @@ Feature: Pipeline tests using the movies dataset | record_count | 5 | | number_record_rejections | 4 | | number_warnings | 1 | + And the error aggregates are persisted Scenario: Validate and filter movies (duckdb) Given I submit the movies file movies.json for processing @@ -76,4 +77,5 @@ Feature: Pipeline tests using the movies dataset | record_count | 5 | | number_record_rejections | 4 | | number_warnings | 1 | + And the error aggregates are persisted diff --git a/tests/features/planets.feature b/tests/features/planets.feature index 321f047..c469a92 100644 --- a/tests/features/planets.feature +++ b/tests/features/planets.feature @@ -42,9 +42,7 @@ Feature: Pipeline tests using the planets dataset And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase - Then the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced + Then the latest audit record for the submission is marked with processing status failed Scenario: Handle a file with duplicated extension provided (spark) Given I submit the planets file planets.csv.csv for processing diff --git a/tests/features/steps/steps_pipeline.py b/tests/features/steps/steps_pipeline.py index 3dbc2b2..fb85cfb 100644 --- a/tests/features/steps/steps_pipeline.py +++ b/tests/features/steps/steps_pipeline.py @@ -57,13 +57,13 @@ def setup_spark_pipeline( SparkRefDataLoader.dataset_config_uri = fh.get_parent(rules_path) return SparkDVEPipeline( + processed_files_path=processing_path.as_uri(), audit_tables=SparkAuditingManager( database="dve", spark=spark, ), job_run_id=12345, rules_path=rules_path, - processed_files_path=processing_path.as_uri(), submitted_files_path=processing_path.as_uri(), reference_data_loader=SparkRefDataLoader, spark=spark, @@ -86,6 +86,7 @@ def setup_duckdb_pipeline( DuckDBRefDataLoader.connection = connection DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(rules_path) return DDBDVEPipeline( + processed_files_path=processing_path.as_posix(), audit_tables=DDBAuditingManager( database_uri=db_file.as_posix(), # pool=ThreadPoolExecutor(1), @@ -94,7 +95,6 @@ def setup_duckdb_pipeline( job_run_id=12345, connection=connection, rules_path=rules_path, - processed_files_path=processing_path.as_posix(), submitted_files_path=processing_path.as_posix(), reference_data_loader=DuckDBRefDataLoader, ) @@ -108,15 +108,18 @@ def run_file_transformation_step(context: Context): pool=ThreadPoolExecutor(1), submissions_to_process=[ctxt.get_submission_info(context)] ) if failed: - ctxt.set_failed_file_transformation(context, failed[0]) + ctxt.set_failed_file_transformation(context, failed[0][0]) + @when("I run the data contract phase") def apply_data_contract_with_error(context: Context): """Apply the data contract stage""" pipeline = ctxt.get_pipeline(context) + sub_info = ctxt.get_submission_info(context) + sub_status = pipeline._audit_tables.get_submission_status(sub_info.submission_id) pipeline.data_contract_step( - pool=ThreadPoolExecutor(1), file_transform_results=[ctxt.get_submission_info(context)] + pool=ThreadPoolExecutor(1), file_transform_results=[(sub_info, sub_status)] ) @@ -126,10 +129,9 @@ def apply_business_rules(context: Context): pipeline = ctxt.get_pipeline(context) sub_info = ctxt.get_submission_info(context) - proc_rec = pipeline._audit_tables.get_current_processing_info(sub_info.submission_id) - # add row count here so sub stats calculated + sub_status = pipeline._audit_tables.get_submission_status(sub_info.submission_id) success, failed, _ = pipeline.business_rule_step( - pool=ThreadPoolExecutor(1), files=[(sub_info, proc_rec.submission_result == "failed")] + pool=ThreadPoolExecutor(1), files=[(sub_info, sub_status)] ) assert len(success + failed) == 1 sub_status = (success + failed)[0][1] @@ -142,19 +144,28 @@ def create_error_report(context: Context): pipeline = ctxt.get_pipeline(context) try: - failed_file_transformation = [ctxt.get_failed_file_transformation(context)] - processed = [] + failed_ft = ctxt.get_failed_file_transformation(context) + sub_status = pipeline._audit_tables.get_submission_status(failed_ft.submission_id) + + pipeline.error_report_step( + pool=ThreadPoolExecutor(1), + processed=[], + failed_file_transformation=[(failed_ft, sub_status)] + + ) + except AttributeError: sub_info = ctxt.get_submission_info(context) processed = [(sub_info, ctxt.get_submission_status(context))] - failed_file_transformation = [] - - pipeline.error_report_step( + pipeline.error_report_step( pool=ThreadPoolExecutor(1), processed=processed, - failed_file_transformation=failed_file_transformation, + failed_file_transformation=[] + ) + + @then("there are {expected_num_errors:d} record rejections from the {service} phase") @then("there is {expected_num_errors:d} record rejection from the {service} phase") diff --git a/tests/features/steps/steps_post_pipeline.py b/tests/features/steps/steps_post_pipeline.py index 23ea97d..be679ba 100644 --- a/tests/features/steps/steps_post_pipeline.py +++ b/tests/features/steps/steps_post_pipeline.py @@ -111,3 +111,9 @@ def check_stats_record(context): get_pipeline(context)._audit_tables.get_submission_statistics(sub_info.submission_id).dict() ) assert all([val == stats.get(fld) for fld, val in expected.items()]) + +@then("the error aggregates are persisted") +def check_error_aggregates_persisted(context): + processing_location = get_processing_location(context) + agg_file = Path(processing_location, "audit", "error_aggregates.parquet") + assert agg_file.exists() and agg_file.is_file() diff --git a/tests/fixtures.py b/tests/fixtures.py index 9229fc8..8a9a147 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -120,4 +120,5 @@ def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]: with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp: db_file = Path(tmp, db + ".duckdb") conn = connect(database=db_file, read_only=False) + yield db_file, conn diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py index 989a928..b0598eb 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py @@ -10,7 +10,8 @@ from duckdb import ColumnExpression, ConstantExpression, DuckDBPyConnection from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager -from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord +from dve.pipeline.utils import SubmissionStatus from .....fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -115,14 +116,14 @@ def test_audit_table_update_status(ddb_audit_manager: DDBAuditingManager): == "business_rules" ) - ddb_audit_manager.mark_error_report([(_sub_info.submission_id, "failed")]) + ddb_audit_manager.mark_error_report([(_sub_info.submission_id, "validation_failed")]) assert ( ddb_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status == "error_report" ) - ddb_audit_manager.mark_finished([(_sub_info.submission_id, "failed")]) + ddb_audit_manager.mark_finished([(_sub_info.submission_id, "validation_failed")]) assert ( ddb_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status @@ -350,3 +351,94 @@ def test_get_error_report_submissions(ddb_audit_manager_threaded: DDBAuditingMan assert len(processed) == 2 assert len(dodgy) == 0 assert processed == expected + +def test_get_submission_status(ddb_audit_manager_threaded: DDBAuditingManager): + with ddb_audit_manager_threaded as aud: + # add four submissions + sub_1 = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_2 = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_3 = SubmissionInfo( + submission_id="3", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_4 = SubmissionInfo( + submission_id="4", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + # mark 1 as failed validation, 2 as failed processing, 3 as null and 4 as successful + aud.add_new_submissions([sub_1, sub_2, sub_3, sub_4]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_1.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_2.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ProcessingStatusRecord( + submission_id=sub_3.submission_id, processing_status="business_rules" + ), + ProcessingStatusRecord( + submission_id=sub_4.submission_id, processing_status="error_report", submission_result="success" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_1.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + SubmissionStatisticsRecord(submission_id=sub_4.submission_id, record_count=20, number_record_rejections=0, number_warnings=1) + ]) + + while not aud.queue.empty(): + time.sleep(0.05) + + sub_stats_1 = aud.get_submission_status(sub_1.submission_id) + assert sub_stats_1.submission_result == "validation_failed" + assert sub_stats_1.validation_failed + assert not sub_stats_1.processing_failed + assert sub_stats_1.number_of_records == 5 + sub_stats_2 = aud.get_submission_status(sub_2.submission_id) + assert sub_stats_2.submission_result == "processing_failed" + assert not sub_stats_2.validation_failed + assert sub_stats_2.processing_failed + sub_stats_3 = aud.get_submission_status(sub_3.submission_id) + assert not sub_stats_3.validation_failed + assert not sub_stats_3.processing_failed + sub_stats_4 = aud.get_submission_status(sub_4.submission_id) + assert sub_stats_4.submission_result == "success" + assert not sub_stats_4.validation_failed + assert not sub_stats_4.processing_failed + assert sub_stats_4.number_of_records == 20 + assert not aud.get_submission_status("5") \ No newline at end of file diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py index 5093150..e4c08ad 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py @@ -144,10 +144,10 @@ def test_duckdb_data_contract_xml(temp_xml_file): contract_dict = json.loads(contract_meta).get("contract") entities: Dict[str, DuckDBPyRelation] = { "test_header": DuckDBXMLStreamReader( - connection, root_tag="root", record_tag="Header" + ddb_connection=connection, root_tag="root", record_tag="Header" ).read_to_relation(str(uri), "header", header_model), "test_class_info": DuckDBXMLStreamReader( - connection, root_tag="root", record_tag="ClassData" + ddb_connection=connection, root_tag="root", record_tag="ClassData" ).read_to_relation(str(uri), "class_info", class_model), } diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py index dafac4d..038b4e9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py @@ -457,7 +457,7 @@ def test_one_to_one_join_multi_matches_raises( new_columns={"satellites.name": "satellite"}, ) entities = EntityManager({"planets": planets_rel, "satellites": satellites_rel}) - with pytest.raises(ValueError, match="Multiple matches for some records.+"): + with pytest.raises(ValueError, match="Multiple matches for some records.*"): DUCKDB_STEP_BACKEND.one_to_one_join(entities, config=join) diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py index 4045a90..a5146f9 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py @@ -10,8 +10,8 @@ from pyspark.sql.functions import col, lit from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager -from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo -from dve.pipeline.utils import unpersist_all_rdds +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord +from dve.pipeline.utils import SubmissionStatus, unpersist_all_rdds from dve.core_engine.backends.implementations.spark.spark_helpers import PYTHON_TYPE_TO_SPARK_TYPE from .....conftest import get_test_file_path @@ -123,14 +123,14 @@ def test_audit_table_update_status(spark_audit_manager: SparkAuditingManager): == "business_rules" ) - spark_audit_manager.mark_error_report([(_sub_info.submission_id, "failed")]) + spark_audit_manager.mark_error_report([(_sub_info.submission_id, "validation_failed")]) assert ( spark_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status == "error_report" ) - spark_audit_manager.mark_finished([(_sub_info.submission_id, "failed")]) + spark_audit_manager.mark_finished([(_sub_info.submission_id, "validation_failed")]) assert ( spark_audit_manager.get_current_processing_info(_sub_info.submission_id).processing_status @@ -367,3 +367,91 @@ def test_get_error_report_submissions(spark_audit_manager: SparkAuditingManager) assert len(processed) == 2 assert len(dodgy) == 0 assert processed == expected + +def test_get_submission_status(spark_audit_manager: SparkAuditingManager): + with spark_audit_manager as aud: + # add three submissions + sub_1 = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_2 = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_3 = SubmissionInfo( + submission_id="3", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + sub_4 = SubmissionInfo( + submission_id="4", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + # mark 1 as failed validation, 2 as failed processing, 3 as null and 4 as successful + aud.add_new_submissions([sub_1, sub_2, sub_3, sub_4]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_1.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_2.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ProcessingStatusRecord( + submission_id=sub_3.submission_id, processing_status="business_rules" + ), + ProcessingStatusRecord( + submission_id=sub_4.submission_id, processing_status="error_report", submission_result="success" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_1.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + SubmissionStatisticsRecord(submission_id=sub_4.submission_id, record_count=20, number_record_rejections=0, number_warnings=1) + ]) + + sub_stats_1 = aud.get_submission_status(sub_1.submission_id) + assert sub_stats_1.submission_result == "validation_failed" + assert sub_stats_1.validation_failed + assert not sub_stats_1.processing_failed + assert sub_stats_1.number_of_records == 5 + sub_stats_2 = aud.get_submission_status(sub_2.submission_id) + assert sub_stats_2.submission_result == "processing_failed" + assert not sub_stats_2.validation_failed + assert sub_stats_2.processing_failed + sub_stats_3 = aud.get_submission_status(sub_3.submission_id) + assert not sub_stats_3.validation_failed + assert not sub_stats_3.processing_failed + sub_stats_4 = aud.get_submission_status(sub_4.submission_id) + assert sub_stats_4.submission_result == "success" + assert not sub_stats_4.validation_failed + assert not sub_stats_4.processing_failed + assert sub_stats_4.number_of_records == 20 + assert not aud.get_submission_status("5") diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py index de2ee10..1518ccf 100644 --- a/tests/test_pipeline/pipeline_helpers.py +++ b/tests/test_pipeline/pipeline_helpers.py @@ -66,6 +66,13 @@ def planet_test_files() -> Iterator[str]: shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets")) yield tdir + "/planets" +@pytest.fixture(scope="function") +def movies_test_files() -> Iterator[str]: + clear_config_cache() + with tempfile.TemporaryDirectory() as tdir: + shutil.copytree(get_test_file_path("movies/"), Path(tdir, "movies")) + yield tdir + "/movies" + @pytest.fixture(scope="function") def planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionInfo, str]]: diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index 5619735..58eb4ac 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -3,9 +3,13 @@ # pylint: disable=protected-access from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +import logging from pathlib import Path +import shutil from typing import Dict, Tuple from uuid import uuid4 +from unittest.mock import Mock import pytest from duckdb import DuckDBPyConnection @@ -13,9 +17,10 @@ from dve.core_engine.backends.base.auditing import FilterCriteria from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader -from dve.core_engine.models import SubmissionInfo +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord import dve.parser.file_handling as fh from dve.pipeline.duckdb_pipeline import DDBDVEPipeline +from dve.pipeline.utils import SubmissionStatus from ..conftest import get_test_file_path from ..fixtures import temp_ddb_conn # pylint: disable=unused-import @@ -35,11 +40,11 @@ def test_audit_received_step( db_file, conn = temp_ddb_conn with DDBAuditingManager(db_file.as_uri(), ThreadPoolExecutor(1), conn) as audit_manager: dve_pipeline = DDBDVEPipeline( + processed_files_path=planet_test_files, audit_tables=audit_manager, job_run_id=1, connection=conn, rules_path=None, - processed_files_path=planet_test_files, submitted_files_path=planet_test_files, ) @@ -75,11 +80,11 @@ def test_file_transformation_step( db_file, conn = temp_ddb_conn with DDBAuditingManager(db_file.as_uri(), ThreadPoolExecutor(1), conn) as audit_manager: dve_pipeline = DDBDVEPipeline( + processed_files_path=planet_test_files, audit_tables=audit_manager, job_run_id=1, connection=conn, rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(), - processed_files_path=planet_test_files, submitted_files_path=planet_test_files, ) @@ -112,19 +117,20 @@ def test_data_contract_step( sub_info, processed_file_path = planet_data_after_file_transformation with DDBAuditingManager(db_file.as_uri(), ThreadPoolExecutor(1), conn) as audit_manager: dve_pipeline = DDBDVEPipeline( + processed_files_path=processed_file_path, audit_tables=audit_manager, job_run_id=1, connection=conn, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, ) success, failed = dve_pipeline.data_contract_step( - pool=ThreadPoolExecutor(2), file_transform_results=[sub_info] + pool=ThreadPoolExecutor(2), file_transform_results=[(sub_info, SubmissionStatus())] ) assert len(success) == 1 + assert not success[0][1].validation_failed assert len(failed) == 0 assert Path(processed_file_path, sub_info.submission_id, "contract", "planets").exists() @@ -147,21 +153,22 @@ def test_business_rule_step( with DDBAuditingManager(db_file.as_uri(), ThreadPoolExecutor(1), conn) as audit_manager: dve_pipeline = DDBDVEPipeline( + processed_files_path=processed_files_path, audit_tables=audit_manager, job_run_id=1, connection=conn, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_files_path, submitted_files_path=None, reference_data_loader=DuckDBRefDataLoader, ) audit_manager.add_new_submissions([sub_info], job_run_id=1) successful_files, unsuccessful_files, failed_processing = dve_pipeline.business_rule_step( - pool=ThreadPoolExecutor(2), files=[(sub_info, None)] + pool=ThreadPoolExecutor(2), files=[(sub_info, SubmissionStatus())] ) assert len(successful_files) == 1 + assert not successful_files[0][1].validation_failed assert len(unsuccessful_files) == 0 assert len(failed_processing) == 0 @@ -185,11 +192,11 @@ def test_error_report_step( with DDBAuditingManager(db_file.as_uri(), ThreadPoolExecutor(1), conn) as audit_manager: dve_pipeline = DDBDVEPipeline( + processed_files_path=processed_files_path, audit_tables=audit_manager, job_run_id=1, connection=conn, rules_path=None, - processed_files_path=processed_files_path, submitted_files_path=None, reference_data_loader=DuckDBRefDataLoader, ) @@ -204,3 +211,68 @@ def test_error_report_step( audit_result = audit_manager.get_current_processing_info(submitted_file_info.submission_id) assert audit_result.processing_status == "success" + +def test_get_submission_status(temp_ddb_conn): + db_file, conn = temp_ddb_conn + with DDBAuditingManager(db_file.as_uri(), connection = conn) as aud: + dve_pipeline = DDBDVEPipeline( + processed_files_path="fake_path", + audit_tables=aud, + job_run_id=1, + connection=conn, + rules_path=None, + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + dve_pipeline._logger = Mock(spec=logging.Logger) + # add four submissions + sub_one = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_two = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + aud.add_new_submissions([sub_one, sub_two]) + aud.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_one.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_two.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ] + ) + aud.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_one.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + ]) + + sub_stats_one = dve_pipeline.get_submission_status("test", sub_one.submission_id) + assert sub_stats_one.submission_result == "validation_failed" + assert sub_stats_one.validation_failed + assert not sub_stats_one.processing_failed + assert sub_stats_one.number_of_records == 5 + sub_stats_two = dve_pipeline.get_submission_status("test", sub_two.submission_id) + assert sub_stats_two.submission_result == "processing_failed" + assert not sub_stats_two.validation_failed + assert sub_stats_two.processing_failed + sub_stats_3 = dve_pipeline.get_submission_status("test", "3") + dve_pipeline._logger.warning.assert_called_once_with( + "Unable to determine status of submission_id: 3 in service test - assuming no issues." + ) + assert sub_stats_3 \ No newline at end of file diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py new file mode 100644 index 0000000..49440fc --- /dev/null +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -0,0 +1,118 @@ +"""Test DuckDBPipeline object methods""" +# pylint: disable=missing-function-docstring +# pylint: disable=protected-access + +from datetime import datetime +from pathlib import Path +import shutil +from uuid import uuid4 + +import pytest + +from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager +from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader +from dve.core_engine.models import SubmissionInfo +import dve.parser.file_handling as fh +from dve.pipeline.foundry_ddb_pipeline import FoundryDDBPipeline + +from ..conftest import get_test_file_path +from ..fixtures import temp_ddb_conn # pylint: disable=unused-import +from .pipeline_helpers import ( # pylint: disable=unused-import + PLANETS_RULES_PATH, + planet_test_files, + movies_test_files +) + +def test_foundry_runner_validation_fail(planet_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + processed_files_path=processing_folder, + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(), + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 3 + + +def test_foundry_runner_validation_success(movies_test_files, temp_ddb_conn): + db_file, conn = temp_ddb_conn + # add movies refdata to conn + ref_db_file = Path(db_file.parent, f"movies_refdata.duckdb").as_posix() + conn.sql(f"ATTACH '{ref_db_file}' AS movies_refdata") + conn.read_parquet(get_test_file_path("movies/refdata/movies_sequels.parquet").as_posix()).to_table(f"movies_refdata.sequels") + processing_folder = movies_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo(submission_id=sub_id, + dataset_id="movies", + file_name="good_movies", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5)) + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(movies_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = None + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + processed_files_path=processing_folder, + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert fh.get_resource_exists(report_uri) + assert len(list(fh.iter_prefix(output_loc))) == 2 + assert len(list(fh.iter_prefix(audit_files))) == 3 + +def test_foundry_runner_error(planet_test_files, temp_ddb_conn): + # using spark reader config - should error in file transformation - check gracefully handled + db_file, conn = temp_ddb_conn + processing_folder = planet_test_files + sub_id = uuid4().hex + sub_info = SubmissionInfo.from_metadata_file(submission_id=sub_id, + metadata_uri=processing_folder + "/planets_demo.metadata.json") + sub_folder = processing_folder + f"/{sub_id}" + + shutil.copytree(planet_test_files, sub_folder) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = fh.get_parent(PLANETS_RULES_PATH) + + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + processed_files_path=processing_folder, + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("planets/planets.dischema.json").as_posix(), + submitted_files_path=None, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + assert not fh.get_resource_exists(report_uri) + assert not output_loc + assert len(list(fh.iter_prefix(audit_files))) == 2 diff --git a/tests/test_pipeline/test_pipeline.py b/tests/test_pipeline/test_pipeline.py index 429e947..38418d6 100644 --- a/tests/test_pipeline/test_pipeline.py +++ b/tests/test_pipeline/test_pipeline.py @@ -90,7 +90,8 @@ def test_file_transformation(planet_test_files): # pylint: disable=redefined-ou output_path = Path(tdir, submitted_file_info.submission_id, "transform") - result = dve_pipeline.file_transformation(submitted_file_info) + sub_info, sub_status = dve_pipeline.file_transformation(submitted_file_info) - assert isinstance(result, SubmissionInfo) + assert isinstance(sub_info, SubmissionInfo) + assert not sub_status.processing_failed assert output_path.joinpath("planets").exists() diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index c3a7fb2..4e33a28 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -2,11 +2,15 @@ # pylint: disable=missing-function-docstring # pylint: disable=protected-access +from datetime import datetime import json from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor +import logging from pathlib import Path +import time from typing import Dict +from unittest.mock import Mock from uuid import uuid4 import polars as pl @@ -16,7 +20,7 @@ from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager from dve.core_engine.backends.implementations.spark.reference_data import SparkRefDataLoader from dve.core_engine.backends.implementations.spark.rules import SparkStepImplementations -from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord +from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord import dve.parser.file_handling as fh from dve.pipeline.spark_pipeline import SparkDVEPipeline from dve.pipeline.utils import SubmissionStatus @@ -38,10 +42,10 @@ def test_audit_received_step(planet_test_files, spark, spark_test_database): with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_tables: dve_pipeline = SparkDVEPipeline( + processed_files_path=planet_test_files, audit_tables=audit_tables, job_run_id=1, rules_path=None, - processed_files_path=planet_test_files, submitted_files_path=planet_test_files, reference_data_loader=None, ) @@ -80,10 +84,10 @@ def test_file_transformation_step( ): # pylint: disable=redefined-outer-name with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( + processed_files_path=planet_test_files, audit_tables=audit_manager, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=planet_test_files, submitted_files_path=planet_test_files, reference_data_loader=None, spark=spark, @@ -118,18 +122,18 @@ def test_apply_data_contract_success( ): # pylint: disable=redefined-outer-name sub_info, processed_file_path = planet_data_after_file_transformation dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_file_path, audit_tables=None, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, reference_data_loader=None, spark=spark, ) + sub_status = SubmissionStatus() + sub_info, sub_status = dve_pipeline.apply_data_contract(sub_info, sub_status) - _, failed = dve_pipeline.apply_data_contract(sub_info) - - assert not failed + assert not sub_status.validation_failed assert Path(Path(processed_file_path), sub_info.submission_id, "contract", "planets").exists() @@ -139,17 +143,18 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name ): sub_info, processed_file_path = dodgy_planet_data_after_file_transformation dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_file_path, audit_tables=None, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, reference_data_loader=None, spark=spark, ) + sub_status = SubmissionStatus() - _, failed = dve_pipeline.apply_data_contract(sub_info) - assert failed + sub_info, sub_status = dve_pipeline.apply_data_contract(sub_info, sub_status) + assert sub_status.validation_failed output_path = Path(processed_file_path) / sub_info.submission_id assert Path(output_path, "contract", "planets").exists() @@ -210,21 +215,23 @@ def test_data_contract_step( spark_test_database, ): # pylint: disable=redefined-outer-name sub_info, processed_file_path = planet_data_after_file_transformation + sub_status = SubmissionStatus() with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_file_path, audit_tables=audit_manager, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, reference_data_loader=None, ) success, failed = dve_pipeline.data_contract_step( - pool=ThreadPoolExecutor(2), file_transform_results=[sub_info] + pool=ThreadPoolExecutor(2), file_transform_results=[(sub_info, sub_status)] ) assert len(success) == 1 + assert not success[0][1].validation_failed assert len(failed) == 0 assert Path(processed_file_path, sub_info.submission_id, "contract", "planets").exists() @@ -245,18 +252,18 @@ def test_apply_business_rules_success( with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_file_path, audit_tables=audit_manager, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, reference_data_loader=SparkRefDataLoader, spark=spark, ) - _, status = dve_pipeline.apply_business_rules(sub_info, False) + _, status = dve_pipeline.apply_business_rules(sub_info, SubmissionStatus()) - assert not status.failed + assert not status.validation_failed assert status.number_of_records == 1 planets_entity_path = Path( @@ -290,18 +297,18 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_file_path, audit_tables=audit_manager, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, reference_data_loader=SparkRefDataLoader, spark=spark, ) - _, status = dve_pipeline.apply_business_rules(sub_info, False) + _, status = dve_pipeline.apply_business_rules(sub_info, SubmissionStatus()) - assert status.failed + assert status.validation_failed assert status.number_of_records == 1 br_path = Path( @@ -371,10 +378,10 @@ def test_business_rule_step( with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_files_path, audit_tables=audit_manager, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_files_path, submitted_files_path=None, reference_data_loader=SparkRefDataLoader, spark=spark, @@ -382,7 +389,7 @@ def test_business_rule_step( audit_manager.add_new_submissions([sub_info], job_run_id=1) successful_files, unsuccessful_files, failed_processing = dve_pipeline.business_rule_step( - pool=ThreadPoolExecutor(2), files=[(sub_info, None)] + pool=ThreadPoolExecutor(2), files=[(sub_info, SubmissionStatus())] ) assert len(successful_files) == 1 @@ -405,10 +412,10 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out SparkRefDataLoader.spark = spark dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_file_path, audit_tables=None, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=processed_file_path, submitted_files_path=None, reference_data_loader=SparkRefDataLoader, spark=spark, @@ -418,7 +425,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out sub_info, SubmissionStatus(True, 9) ) - assert status.failed + assert status.validation_failed expected = { "submission_id": submission_info.submission_id, @@ -524,10 +531,10 @@ def test_error_report_step( with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) as audit_manager: dve_pipeline = SparkDVEPipeline( + processed_files_path=processed_files_path, audit_tables=audit_manager, job_run_id=1, rules_path=None, - processed_files_path=processed_files_path, submitted_files_path=None, reference_data_loader=None, spark=spark, @@ -553,10 +560,10 @@ def test_cluster_pipeline_run( audit_manager = SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark) dve_pipeline = SparkDVEPipeline( + processed_files_path=planet_test_files, audit_tables=audit_manager, job_run_id=1, rules_path=PLANETS_RULES_PATH, - processed_files_path=planet_test_files, submitted_files_path=planet_test_files, reference_data_loader=SparkRefDataLoader, spark=spark, @@ -570,3 +577,67 @@ def test_cluster_pipeline_run( assert report_processing_result.processing_status == "success" assert Path(report_uri).exists() + +def test_get_submission_status(spark, spark_test_database): + with SparkAuditingManager(spark_test_database, ThreadPoolExecutor(1), spark=spark) as audit_manager: + dve_pipeline = SparkDVEPipeline( + processed_files_path="a_path", + audit_tables=audit_manager, + job_run_id=1, + rules_path=None, + submitted_files_path=None, + reference_data_loader=None, + spark=spark, + ) + dve_pipeline._logger = Mock(spec=logging.Logger) + # add four submissions + sub_one = SubmissionInfo( + submission_id="1", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + sub_two = SubmissionInfo( + submission_id="2", + submitting_org="TEST", + dataset_id="TEST_DATASET", + file_name="TEST_FILE", + submission_method="sftp", + file_extension="xml", + file_size=12345, + datetime_received=datetime(2023, 9, 1, 12, 0, 0), + ) + + audit_manager.add_new_submissions([sub_one, sub_two]) + audit_manager.add_processing_records( + [ + ProcessingStatusRecord( + submission_id=sub_one.submission_id, processing_status="error_report", submission_result="validation_failed" + ), + ProcessingStatusRecord( + submission_id=sub_two.submission_id, processing_status="failed", submission_result="processing_failed" + ), + ] + ) + audit_manager.add_submission_statistics_records([ + SubmissionStatisticsRecord(submission_id=sub_one.submission_id, record_count=5, number_record_rejections=2, number_warnings=3), + ]) + + sub_stats_one = dve_pipeline.get_submission_status("test", sub_one.submission_id) + assert sub_stats_one.submission_result == "validation_failed" + assert sub_stats_one.validation_failed + assert not sub_stats_one.processing_failed + assert sub_stats_one.number_of_records == 5 + sub_stats_two = dve_pipeline.get_submission_status("test", sub_two.submission_id) + assert sub_stats_two.submission_result == "processing_failed" + assert not sub_stats_two.validation_failed + assert sub_stats_two.processing_failed + sub_stats_3 = dve_pipeline.get_submission_status("test", "3") + dve_pipeline._logger.warning.assert_called_once_with( + "Unable to determine status of submission_id: 3 in service test - assuming no issues." + ) + assert sub_stats_3 diff --git a/tests/testdata/books/books_xsd_fail.xml b/tests/testdata/books/books_xsd_fail.xml new file mode 100644 index 0000000..bc818da --- /dev/null +++ b/tests/testdata/books/books_xsd_fail.xml @@ -0,0 +1,24 @@ + + + + + Gambardella, Matthew & Kent, Roy + XML Developer's Guide + Computer + 44.95 + 2000-10-01 + An in-depth look at creating applications + with XML. + + + Ralls, Kim + Midnight Rain + Fantasy + 5.95 + 2000-12-16 + A former architect battles corporate zombies, + an evil sorceress, and her own childhood to become queen + of the world. + + + \ No newline at end of file diff --git a/tests/testdata/books/nested_books.dischema.json b/tests/testdata/books/nested_books.dischema.json index ee83cbf..5e53fce 100644 --- a/tests/testdata/books/nested_books.dischema.json +++ b/tests/testdata/books/nested_books.dischema.json @@ -24,7 +24,10 @@ "reader": "SparkXMLReader", "kwargs": { "record_tag": "bookstore", - "n_records_to_read": 1 + "n_records_to_read": 1, + "xsd_location": "nested_books.xsd", + "xsd_error_code": "TESTXSDERROR", + "xsd_error_message": "the xml is poorly structured" } } } diff --git a/tests/testdata/books/nested_books.xsd b/tests/testdata/books/nested_books.xsd new file mode 100644 index 0000000..b2d498e --- /dev/null +++ b/tests/testdata/books/nested_books.xsd @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/testdata/books/nested_books_ddb.dischema.json b/tests/testdata/books/nested_books_ddb.dischema.json index aa523aa..b9c3314 100644 --- a/tests/testdata/books/nested_books_ddb.dischema.json +++ b/tests/testdata/books/nested_books_ddb.dischema.json @@ -23,7 +23,11 @@ ".xml": { "reader": "DuckDBXMLStreamReader", "kwargs": { - "record_tag": "bookstore" + "record_tag": "bookstore", + "n_records_to_read": 1, + "xsd_location": "nested_books.xsd", + "xsd_error_code": "TESTXSDERROR", + "xsd_error_message": "the xml is poorly structured" } } } diff --git a/tests/testdata/movies/good_movies.json b/tests/testdata/movies/good_movies.json new file mode 100644 index 0000000..acc7026 --- /dev/null +++ b/tests/testdata/movies/good_movies.json @@ -0,0 +1,46 @@ +[ + { + "title": "Not a great one", + "year": 2017, + "genre": ["Animation", "Comedy", "Family"], + "duration_minutes": 88, + "ratings": [6.9, 7.8], + "cast": [ + { "name": "J. Smith", "role": "Lion", "date_joined": "2015-11-12" }, + { "name": "T. Car", "role": "Mouse", "date_joined": "2015-11-12" } + ] + }, + { + "title": "Good family movie", + "year": 2022, + "genre": ["Sci-Fi", "Family"], + "duration_minutes": 95, + "ratings": [7, 8.2, 6.3], + "cast": [ + { "name": "D. Farnesbarnes", "role": "Robot" }, + { "name": "G. Adams", "role": "Alien", "date_joined": "2017-08-01" } + ] + }, + { + "title": "One with a cat and a dog", + "year": 2020, + "genre": ["Fantasy", "Family"], + "duration_minutes": 110, + "ratings": [6.1, 6.2], + "cast": [ + { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, + { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } + ] + }, + { + "title": "A bad 'un", + "year": 2011, + "genre": ["Mystery", "Family"], + "duration_minutes": 97, + "ratings": [1.2, 3.4, 5.6, 3.4], + "cast": [ + { "name": "R. Green", "role": "Baby", "date_joined": "2013-11-12" }, + { "name": "P. Plum", "role": "Dad", "date_joined": "2013-10-08" } + ] + } +] \ No newline at end of file