From e13b9c58b2f7cf818e1920f4b294e44e8322165e Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 11 Nov 2025 13:14:04 +0000 Subject: [PATCH 1/5] feat: initial work to add xsd validation in xml readers --- .../implementations/duckdb/readers/xml.py | 16 +- .../implementations/spark/readers/xml.py | 38 +- src/dve/core_engine/backends/readers/xml.py | 17 + .../backends/readers/xml_linting.py | 146 +++++++ src/dve/pipeline/pipeline.py | 66 ++-- src/dve/pipeline/utils.py | 39 +- src/dve/pipeline/xml_linting.py | 362 ------------------ .../testdata/books/nested_books.dischema.json | 5 +- tests/testdata/books/nested_books.xsd | 37 ++ 9 files changed, 310 insertions(+), 416 deletions(-) create mode 100644 src/dve/core_engine/backends/readers/xml_linting.py delete mode 100644 src/dve/pipeline/xml_linting.py create mode 100644 tests/testdata/books/nested_books.xsd diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py index af1147f..9f8a6ee 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py @@ -12,19 +12,33 @@ from dve.core_engine.backends.readers.xml import XMLStreamReader from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.type_hints import URI +from dve.parser.file_handling.service import get_parent +from dve.pipeline.utils import dump_errors @duckdb_write_parquet class DuckDBXMLStreamReader(XMLStreamReader): """A reader for XML files""" - def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs): + def __init__(self, + ddb_connection: Optional[DuckDBPyConnection] = None, + **kwargs): self.ddb_connection = ddb_connection if ddb_connection else default_connection super().__init__(**kwargs) @read_function(DuckDBPyRelation) def read_to_relation(self, resource: URI, entity_name: str, schema: Type[BaseModel]): """Returns a relation object from the source xml""" + if self.xsd_location: + msg = self._run_xmllint(file_uri=resource) + if msg: + working_folder = get_parent(resource) + dump_errors( + working_folder=working_folder, + step_name="file_transformation", + messages=[msg] + ) + polars_schema: Dict[str, pl.DataType] = { # type: ignore fld.name: get_polars_type_from_annotation(fld.annotation) for fld in stringify_model(schema).__fields__.values() diff --git a/src/dve/core_engine/backends/implementations/spark/readers/xml.py b/src/dve/core_engine/backends/implementations/spark/readers/xml.py index a2ae2c5..e6cf354 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/xml.py @@ -18,10 +18,12 @@ get_type_from_annotation, spark_write_parquet, ) -from dve.core_engine.backends.readers.xml import XMLStreamReader +from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader +from dve.core_engine.backends.readers.xml_linting import run_xmllint from dve.core_engine.type_hints import URI, EntityName -from dve.parser.file_handling import get_content_length +from dve.parser.file_handling import get_content_length, get_parent from dve.parser.file_handling.service import open_stream +from dve.pipeline.utils import dump_errors SparkXMLMode = Literal["PERMISSIVE", "FAILFAST", "DROPMALFORMED"] """The mode to use when parsing XML files with Spark.""" @@ -51,7 +53,7 @@ def read_to_dataframe( @spark_write_parquet -class SparkXMLReader(BaseFileReader): # pylint: disable=too-many-instance-attributes +class SparkXMLReader(BasicXMLFileReader): # pylint: disable=too-many-instance-attributes """A reader for XML files built atop Spark-XML.""" def __init__( @@ -69,21 +71,31 @@ def __init__( sanitise_multiline: bool = True, namespace=None, trim_cells=True, + xsd_location: Optional[URI] = None, + xsd_error_code: Optional[str] = None, + xsd_error_message: Optional[str] = None **_, ) -> None: - self.record_tag = record_tag + + super().__init__( + record_tag=record_tag, + root_tag=root_tag, + trim_cells=trim_cells, + null_values=null_values, + sanitise_multiline=sanitise_multiline, + xsd_location=xsd_location, + xsd_error_code=xsd_error_code, + xsd_error_message=xsd_error_message + ) + self.spark_session = spark_session or SparkSession.builder.getOrCreate() self.sampling_ratio = sampling_ratio self.exclude_attribute = exclude_attribute self.mode = mode self.infer_schema = infer_schema self.ignore_namespace = ignore_namespace - self.root_tag = root_tag self.sanitise_multiline = sanitise_multiline - self.null_values = null_values self.namespace = namespace - self.trim_cells = trim_cells - super().__init__() def read_to_py_iterator( self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] @@ -104,6 +116,16 @@ def read_to_dataframe( """ if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") + + if self.xsd_location: + msg = self._run_xmllint(file_uri=resource) + if msg: + working_folder = get_parent(resource) + dump_errors( + working_folder=working_folder, + step_name="file_transformation", + messages=[msg] + ) spark_schema: StructType = get_type_from_annotation(schema) kwargs = { diff --git a/src/dve/core_engine/backends/readers/xml.py b/src/dve/core_engine/backends/readers/xml.py index ca55eb9..c918fb8 100644 --- a/src/dve/core_engine/backends/readers/xml.py +++ b/src/dve/core_engine/backends/readers/xml.py @@ -11,8 +11,10 @@ from dve.core_engine.backends.base.reader import BaseFileReader from dve.core_engine.backends.exceptions import EmptyFileError +from dve.core_engine.backends.readers.xml_linting import run_xmllint from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.loggers import get_logger +from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, EntityName from dve.parser.file_handling import NonClosingTextIOWrapper, get_content_length, open_stream from dve.parser.file_handling.implementations.file import ( @@ -114,6 +116,9 @@ def __init__( sanitise_multiline: bool = True, encoding: str = "utf-8-sig", n_records_to_read: Optional[int] = None, + xsd_location: Optional[URI] = None, + xsd_error_code: Optional[str] = None, + xsd_error_message: Optional[str] = None, **_, ): """Init function for the base XML reader. @@ -148,6 +153,12 @@ def __init__( """Encoding of the XML file.""" self.n_records_to_read = n_records_to_read """The maximum number of records to read from a document.""" + self.xsd_location = xsd_location + """The relative URI of the xsd file if wishing to perform xsd validation""" + self.xsd_error_code = xsd_error_code + """The error code to be reported if xsd validation fails (if xsd)""" + self.xsd_error_message = xsd_error_message + """The error message to be reported if xsd validation fails""" super().__init__() self._logger = get_logger(__name__) @@ -260,6 +271,12 @@ def _parse_xml( for element in elements: yield self._parse_element(element, template_row) + + def _run_xmllint(self, file_uri: URI) -> FeedbackMessage: + return run_xmllint(file_uri=file_uri, + schema_uri=self.xsd_location, + error_code=self.xsd_error_code, + error_message=self.xsd_error_message) def read_to_py_iterator( self, diff --git a/src/dve/core_engine/backends/readers/xml_linting.py b/src/dve/core_engine/backends/readers/xml_linting.py new file mode 100644 index 0000000..3109a88 --- /dev/null +++ b/src/dve/core_engine/backends/readers/xml_linting.py @@ -0,0 +1,146 @@ +"""Implement XML linting for files.""" + +import shutil +import tempfile +from contextlib import ExitStack +from pathlib import Path +from subprocess import PIPE, STDOUT, Popen +from typing import Sequence, Union +from uuid import uuid4 + +from dve.core_engine.message import FeedbackMessage +from dve.parser.file_handling import ( + copy_resource, + get_file_name, + get_resource_exists, + open_stream, +) +from dve.parser.file_handling.implementations.file import file_uri_to_local_path +from dve.parser.type_hints import URI + +ErrorMessage = str +"""Error message for xml issues""" +ErrorCode = str +"""Error code for xml feedback errors""" + +FIVE_MEBIBYTES = 5 * (1024**2) +"""The size of 5 binary megabytes, in bytes.""" + + +def _ensure_schema_and_resources( + schema_uri: URI, schema_resources: Sequence[URI], temp_dir: Path +) -> Path: + """Given the schema and schema resource URIs and a temp dir, if the resources + are remote or exist in different directories, copy them to the temp dir. + + Return the local schema path. + + """ + if not get_resource_exists(schema_uri): + raise IOError(f"No resource accessible at schema URI {schema_uri!r}") + + missing_resources = list( + filter(lambda resource: not get_resource_exists(resource), schema_resources) + ) + if missing_resources: + raise IOError(f"Some schema resources missing: {missing_resources!r}") + + all_resources = [schema_uri, *schema_resources] + + schemas_are_files = all(map(lambda resource: resource.startswith("file:"), all_resources)) + if schemas_are_files: + paths = list(map(file_uri_to_local_path, all_resources)) + all_paths_have_same_parent = len({path.parent for path in paths}) == 1 + + if all_paths_have_same_parent: + schema_path = paths[0] + return schema_path + + for resource_uri in all_resources: + local_path = temp_dir.joinpath(get_file_name(resource_uri)) + copy_resource(resource_uri, local_path.as_uri()) + + schema_path = temp_dir.joinpath(get_file_name(schema_uri)) + return schema_path + + +def run_xmllint( + file_uri: URI, + schema_uri: URI, + *schema_resources: URI, + error_code: ErrorCode, + error_message: ErrorMessage, +) -> Union[None, FeedbackMessage]: + """Run `xmllint`, given a file and information about the schemas to apply. + + The schema and associated resources will be copied to a temporary directory + for validation, unless they are all already in the same local folder. + + Args: + - `file_uri`: the URI of the file to be streamed into `xmllint` + - `schema_uri`: the URI of the XSD schema for the file. + - `*schema_resources`: URIs for additional XSD files required by the schema. + - `error_code`: The error_code to use in FeedbackMessage if the linting fails. + - `error_message`: The error_message to use in FeedbackMessage if the linting fails. + + Returns a deque of messages produced by the linting. + + """ + if not shutil.which("xmllint"): + raise OSError("Unable to find `xmllint` binary") + + if not get_resource_exists(file_uri): + raise IOError(f"No resource accessible at file URI {file_uri!r}") + + # Ensure the schema and resources are local file paths so they can be + # read by xmllint. + # Lots of resources to manage here. + with tempfile.TemporaryDirectory() as temp_dir_str: + temp_dir = Path(temp_dir_str) + schema_path = _ensure_schema_and_resources(schema_uri, schema_resources, temp_dir) + message_file_path = temp_dir.joinpath(uuid4().hex) + + with ExitStack() as linting_context: + # Need to write lint output to a file to avoid deadlock. Kinder to mem this way anyway. + message_file_bytes = linting_context.enter_context(message_file_path.open("wb")) + + # Open an `xmllint` process to pipe into. + command = ["xmllint", "--stream", "--schema", str(schema_path), "-"] + process = linting_context.enter_context( + Popen(command, stdin=PIPE, stdout=message_file_bytes, stderr=STDOUT) + ) + # This should never trigger, bad typing in stdlib. + if process.stdin is None: + raise ValueError("Unable to pipe file into subprocess") + + # Pipe the XML file contents into xmllint. + block = b"" + try: + with open_stream(file_uri, "rb") as byte_stream: + while True: + block = byte_stream.read(FIVE_MEBIBYTES) + if not block: + break + process.stdin.write(block) + except BrokenPipeError: + pass + finally: + # Close the input stream and await the response code. + # Output will be written to the message file. + process.stdin.close() + # TODO: Identify an appropriate timeout. + return_code = process.wait() + + if return_code == 0: + return None + + return FeedbackMessage( + entity="xsd_validation", + record={}, + failure_type="submission", + is_informational=False, + error_type="xsd check", + error_location="Whole File", + error_message=error_message, + error_code=error_code, + ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 6f3b1bb..d9b54a0 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -26,7 +26,7 @@ from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord from dve.core_engine.type_hints import URI, Failed, FileURI, InfoURI, Messages from dve.parser import file_handling as fh -from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader +from dve.pipeline.utils import SubmissionStatus, deadletter_file, dump_errors, load_config, load_reader from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates, conditional_cast PERMISSIBLE_EXCEPTIONS: Tuple[Type[Exception]] = ( @@ -94,42 +94,6 @@ def get_entity_count(entity: EntityType) -> int: """Get a row count of an entity stored as parquet""" raise NotImplementedError() - def _dump_errors( - self, - submission_id: str, - step_name: str, - messages: Messages, - key_fields: Optional[Dict[str, List[str]]] = None, - ): - if not self.processed_files_path: - raise AttributeError("processed files path not passed") - - if not key_fields: - key_fields = {} - - errors = fh.joinuri( - self.processed_files_path, submission_id, "errors", f"{step_name}_errors.json" - ) - processed = [] - - for message in messages: - primary_keys: List[str] = key_fields.get(message.entity if message.entity else "", []) - error = message.to_dict( - key_field=primary_keys, - value_separator=" -- ", - max_number_of_values=10, - record_converter=None, - ) - error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") - processed.append(error) - - with fh.open_stream(errors, "w") as f: - json.dump( - processed, - f, - default=str, - ) - @validate_arguments def _move_submission_to_working_location( self, @@ -310,7 +274,13 @@ def file_transformation( submission_file_uri, submission_info, self.processed_files_path ) if errors: - self._dump_errors(submission_info.submission_id, "file_transformation", errors) + dump_errors( + fh.joinuri( + self.processed_files_path, + submission_info.submission_id + ), + "file_transformation", + errors) return submission_info.dict() return submission_info except ValueError as exc: @@ -416,8 +386,14 @@ def apply_data_contract(self, submission_info: SubmissionInfo) -> Tuple[Submissi key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if messages: - self._dump_errors( - submission_info.submission_id, "contract", messages, key_fields=key_fields + dump_errors( + fh.joinuri( + self.processed_files_path, + submission_info.submission_id + ), + "contract", + messages, + key_fields=key_fields ) failed = any(not rule_message.is_informational for rule_message in messages) @@ -515,8 +491,14 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if rule_messages: - self._dump_errors( - submission_info.submission_id, "business_rules", rule_messages, key_fields + dump_errors( + fh.joinuri( + self.processed_files_path, + submission_info.submission_id + ), + "business_rules", + rule_messages, + key_fields ) failed = any(not rule_message.is_informational for rule_message in rule_messages) or failed diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index f4b6620..6c8c054 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -1,7 +1,7 @@ """Utilities to be used with services to abstract away some of the config loading and threading""" import json from threading import Lock -from typing import Dict, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union from pydantic.main import ModelMetaclass from pyspark.sql import SparkSession @@ -11,8 +11,9 @@ import dve.parser.file_handling as fh from dve.core_engine.backends.readers import _READER_REGISTRY from dve.core_engine.configuration.v1 import SchemaName, V1EngineConfig, _ModelConfig -from dve.core_engine.type_hints import URI, SubmissionResult +from dve.core_engine.type_hints import URI, Messages, SubmissionResult from dve.metadata_parser.model_generator import JSONtoPyd +from dve.reporting.error_report import conditional_cast Dataset = Dict[SchemaName, _ModelConfig] _configs: Dict[str, Tuple[Dict[str, ModelMetaclass], V1EngineConfig, Dataset]] = {} @@ -66,6 +67,40 @@ def deadletter_file(source_uri: URI) -> None: except TypeError: return None +def dump_errors( + working_folder: URI, + step_name: str, + messages: Messages, + key_fields: Optional[Dict[str, List[str]]] = None, +): + if not working_folder: + raise AttributeError("processed files path not passed") + + if not key_fields: + key_fields = {} + + errors = fh.joinuri( + working_folder, "errors", f"{step_name}_errors.json" + ) + processed = [] + + for message in messages: + primary_keys: List[str] = key_fields.get(message.entity if message.entity else "", []) + error = message.to_dict( + key_field=primary_keys, + value_separator=" -- ", + max_number_of_values=10, + record_converter=None, + ) + error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") + processed.append(error) + + with fh.open_stream(errors, "a+") as f: + json.dump( + processed, + f, + default=str, + ) class SubmissionStatus: """Submission status for a given submission.""" diff --git a/src/dve/pipeline/xml_linting.py b/src/dve/pipeline/xml_linting.py deleted file mode 100644 index 097d8da..0000000 --- a/src/dve/pipeline/xml_linting.py +++ /dev/null @@ -1,362 +0,0 @@ -"""Implement XML linting for files.""" - -import argparse -import re -import shutil -import sys -import tempfile -from contextlib import ExitStack -from pathlib import Path -from subprocess import PIPE, STDOUT, Popen -from typing import Dict, Iterable, List, Optional, Sequence, Tuple -from uuid import uuid4 - -from typing_extensions import Literal - -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import Messages -from dve.parser.file_handling import copy_resource, get_file_name, get_resource_exists, open_stream -from dve.parser.file_handling.implementations.file import file_uri_to_local_path -from dve.parser.type_hints import URI - -Replacement = str -"""A replacement for a regex pattern.""" -Stage = Literal["Pre-validation", "Post-validation"] -"""The stage at which the XML linting is being performed.""" -ErrorMessage = str -"""Error message for xml issues""" -ErrorCode = str -"""Error code for xml feedback errors""" - -FIVE_MEBIBYTES = 5 * (1024**3) -"""The size of 5 binary megabytes, in bytes.""" - -# Patterns/strings for xmllint message sanitisation. -IGNORED_PATTERNS: List[re.Pattern] = [re.compile(r"^Unimplemented block at")] -"""Regex patterns for messages that should result in their omission.""" -ERRONEOUS_PATTERNS: List[Tuple[re.Pattern, Replacement]] = [ - ( - re.compile(r"^XSD schema .+/(?P.+) failed to compile$"), - r"Missing required component of XSD schema '\g'", - ) -] -""" -Patterns for messages that should trigger errors. Replacement messages will be -raised as `ValueError`s. - -""" -INCORRECT_NAMESPACE_PATTERN = re.compile( - r"Element .*?{(?P.*?)}.*? " - r"No matching global declaration available for the validation root." -) -"""# 1 capture -Pattern to match incorrect namespace found in the xmllint errors. Captures incorrect namespace -""" -# match literal Element followed by anything up to opening curly brace. capture the contents -# of the braces if the literal "No matching global declaration available..." is present in the -# message. This is raised when the namespace doesn't match between the schema and the file - -MISSING_OR_OUT_OF_ORDER_PATTERN = re.compile( - r"Element .*?{.*?}(?P.*?): This element is not expected\. " - r"Expected is .*?}(?P.*?)\)" -) -"""# 2 captures -Pattern to match out of order elements found in the xmllint errors. Captures incorrect found -and expected fields (in that order) -""" -# match literal Element followed by anything up to opening curly brace. capture the field that -# comes after the braces if the literal "this element is not expected" is present in the -# message. capture the expected value too -# This is raised when the xml fields are out of order or a field is missing. - -MISSING_CHILD_ELEMENTS_PATTERN = re.compile( - r"Element .*?{.*?}.*?: Missing child element\(s\). Expected is.*?}(?P.*?)\)" -) -"""# 1 capture -Pattern to match when a tag has missing child elements. Captures the expected element -""" -FAILS_TO_VALIDATE_PATTERN = re.compile(r"fails to validate") -"""# 0 captures -Pattern to match the fails to validate xmllint message, doesn't capture anything""" - -UNEXPECTED_FIELD_PATTERN = re.compile( - r"Element .*?{.*?}(?P.*?): This element is not expected\.$" -) -"""# 1 capture -Pattern to match unexpected fields rather than out of order fields. Captures incorrect field -""" - -REMOVED_PATTERNS: List[re.Pattern] = [ - re.compile(r"\{.+?\}"), - re.compile(r"[\. ]+$"), -] -"""Regex patterns to remove from the xmllint output.""" -REPLACED_PATTERNS: List[Tuple[re.Pattern, Replacement]] = [ - (re.compile(r":(?P\d+):"), r" on line \g:"), -] -"""Regex patterns to replace in the xmllint output.""" -REPLACED_STRINGS: List[Tuple[str, Replacement]] = [ - ( - "No matching global declaration available for the validation root", - "Incorrect namespace version, please ensure you have the most recent namespace", - ), - ( - "fails to validate", - "Whole file has failed schema validation - please ensure your data matches the expected " - + "structure", - ), -] -"""Strings to replace in the xmllint output.""" - - -def _sanitise_lint_issue(issue: str, file_name: str) -> Optional[str]: - """Sanitise an xmllint lint message. If the message should be ignored, - this function will return `None` instead of a string. - - If messages are considered erroneous, a `ValueError` will be raised. - - """ - for pattern in IGNORED_PATTERNS: - if pattern.match(issue): - return None - for pattern, error_message in ERRONEOUS_PATTERNS: - if pattern.match(issue): - raise ValueError(pattern.sub(error_message, issue)) - - # Line will start with "-" because that's the file name for streamed XML files. - if issue.startswith("-"): - issue = "".join((file_name, issue[1:])) - - for pattern in REMOVED_PATTERNS: - issue = pattern.sub("", issue) - for pattern, replacement in REPLACED_PATTERNS: - issue = pattern.sub(replacement, issue) - for string, replacement in REPLACED_STRINGS: - issue = issue.replace(string, replacement) - - return issue - - -def _parse_lint_messages( - lint_messages: Iterable[str], - error_mapping: Dict[re.Pattern, Tuple[ErrorMessage, ErrorCode]], - stage: Stage = "Pre-validation", - file_name: Optional[str] = None, -) -> Messages: - """Parse a sequence of messages from `xmllint` into a deque of feedback messages.""" - messages: Messages = [] - for issue in lint_messages: - for regex, (error_message, error_code) in error_mapping.items(): - match_ = regex.search(issue) - if match_: - groups = {key: group.strip(",' ") for key, group in match_.groupdict().items()} - if not groups: - reporting_field = "Whole file" - groups[reporting_field] = file_name - elif len(groups) > 1: - reporting_field = list(groups) # type: ignore - else: - reporting_field = list(groups)[0] - - messages.append( - FeedbackMessage( - entity=stage, - record={"Wholefile": groups}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_code=error_code, - error_message=error_message.format(*groups.values()), - reporting_field=reporting_field, - category="Bad file", - ) - ) - return list(dict.fromkeys(messages)) # remove duplicate errors but preserve order - - -def _ensure_schema_and_resources( - schema_uri: URI, schema_resources: Sequence[URI], temp_dir: Path -) -> Path: - """Given the schema and schema resource URIs and a temp dir, if the resources - are remote or exist in different directories, copy them to the temp dir. - - Return the local schema path. - - """ - if not get_resource_exists(schema_uri): - raise IOError(f"No resource accessible at schema URI {schema_uri!r}") - - missing_resources = list( - filter(lambda resource: not get_resource_exists(resource), schema_resources) - ) - if missing_resources: - raise IOError(f"Some schema resources missing: {missing_resources!r}") - - all_resources = [schema_uri, *schema_resources] - - schemas_are_files = all(map(lambda resource: resource.startswith("file:"), all_resources)) - if schemas_are_files: - paths = list(map(file_uri_to_local_path, all_resources)) - all_paths_have_same_parent = len({path.parent for path in paths}) == 1 - - if all_paths_have_same_parent: - schema_path = paths[0] - return schema_path - - for resource_uri in all_resources: - local_path = temp_dir.joinpath(get_file_name(resource_uri)) - copy_resource(resource_uri, local_path.as_uri()) - - schema_path = temp_dir.joinpath(get_file_name(schema_uri)) - return schema_path - - -def run_xmllint( - file_uri: URI, - schema_uri: URI, - *schema_resources: URI, - error_mapping: Dict[re.Pattern, Tuple[ErrorMessage, ErrorCode]], - stage: Stage = "Pre-validation", -) -> Messages: - """Run `xmllint`, given a file and information about the schemas to apply. - - The schema and associated resources will be copied to a temporary directory - for validation, unless they are all already in the same local folder. - - Args: - - `file_uri`: the URI of the file to be streamed into `xmllint` - - `schema_uri`: the URI of the XSD schema for the file. - - `*schema_resources`: URIs for additional XSD files required by the schema. - - `stage` (keyword only): One of `{'Pre-validation', 'Post-validation'}` - - Returns a deque of messages produced by the linting. - - """ - if not shutil.which("xmllint"): - raise OSError("Unable to find `xmllint` binary") - - if not get_resource_exists(file_uri): - raise IOError(f"No resource accessible at file URI {file_uri!r}") - - # Ensure the schema and resources are local file paths so they can be - # read by xmllint. - # Lots of resources to manage here. - with tempfile.TemporaryDirectory() as temp_dir_str: - temp_dir = Path(temp_dir_str) - schema_path = _ensure_schema_and_resources(schema_uri, schema_resources, temp_dir) - message_file_path = temp_dir.joinpath(uuid4().hex) - - with ExitStack() as linting_context: - # Need to write lint output to a file to avoid deadlock. Kinder to mem this way anyway. - message_file_bytes = linting_context.enter_context(message_file_path.open("wb")) - - # Open an `xmllint` process to pipe into. - command = ["xmllint", "--stream", "--schema", str(schema_path), "-"] - process = linting_context.enter_context( - Popen(command, stdin=PIPE, stdout=message_file_bytes, stderr=STDOUT) - ) - # This should never trigger, bad typing in stdlib. - if process.stdin is None: - raise ValueError("Unable to pipe file into subprocess") - - # Pipe the XML file contents into xmllint. - block = b"" - try: - with open_stream(file_uri, "rb") as byte_stream: - while True: - block = byte_stream.read(FIVE_MEBIBYTES) - if not block: - break - process.stdin.write(block) - except BrokenPipeError: - file_name = get_file_name(file_uri) - return [ - FeedbackMessage( - entity=stage, - record={"Whole file": file_name}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="failed schema check", - category="Bad file", - ), - FeedbackMessage( - entity=stage, - record={"Whole file": block[:50].decode(errors="replace")}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="Failed xml validation", - reporting_field="Whole file", - category="Bad file", - ), - ] - - # Close the input stream and await the response code. - # Output will be written to the message file. - process.stdin.close() - # TODO: Identify an appropriate timeout. - return_code = process.wait() - - if return_code == 0: - return [] - with message_file_path.open("r", encoding="utf-8") as message_file: - lint_messages = (line for line in map(str.rstrip, message_file) if line) - file_name = get_file_name(file_uri) - messages = _parse_lint_messages(lint_messages, error_mapping, stage, file_name) - - # Nonzero exit code without messages _shouldn't_ happen, but it's possible - # if `xmllint` is killed (and presumably possible if it runs out of mem) - # so we should handle that possibility. - if not messages: - messages.append( - FeedbackMessage( - entity=stage, - record={"Whole file": file_name}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="failed schema check", - category="Bad file", - ) - ) - - return messages - - -def _main(cli_args: List[str]): - """Command line interface for XML linting. Useful for testing.""" - parser = argparse.ArgumentParser() - parser.add_argument("xml_file_path", help="The path to the XML file to be validated") - parser.add_argument( - "xsd_file_paths", help="The path to the XSD schemas (primary schema first)", nargs="+" - ) - args = parser.parse_args(cli_args) - - xml_path = Path(args.xml_file_path).resolve() - xsd_uris = list(map(lambda path_str: Path(path_str).resolve().as_uri(), args.xsd_file_paths)) - try: - messages = run_xmllint(xml_path.as_uri(), *xsd_uris, error_mapping={}) - except Exception as err: # pylint: disable=broad-except - print(f"Exception ({type(err).__name__}) raised in XML linting.", file=sys.stderr) - print(err, file=sys.stderr) - sys.exit(2) - - if not messages: - print(f"File {xml_path.name!r} validated successfully\n", file=sys.stderr) - sys.exit(0) - else: - print(f"File {xml_path.name!r} validation failed\n", file=sys.stderr) - - print("|".join(FeedbackMessage.HEADER)) - for message in messages: - print("|".join(map(lambda col: str(col) if col else "", message.to_row()))) - sys.exit(1) - - -if __name__ == "__main__": - _main(sys.argv[1:]) diff --git a/tests/testdata/books/nested_books.dischema.json b/tests/testdata/books/nested_books.dischema.json index ee83cbf..86ca0bb 100644 --- a/tests/testdata/books/nested_books.dischema.json +++ b/tests/testdata/books/nested_books.dischema.json @@ -24,7 +24,10 @@ "reader": "SparkXMLReader", "kwargs": { "record_tag": "bookstore", - "n_records_to_read": 1 + "n_records_to_read": 1, + "xsd_location": "./nested_books.dischema.json", + "xsd_error_code": "TESTXSDERROR", + "xsd_error_message": "the xml is poorly structured" } } } diff --git a/tests/testdata/books/nested_books.xsd b/tests/testdata/books/nested_books.xsd new file mode 100644 index 0000000..b2d498e --- /dev/null +++ b/tests/testdata/books/nested_books.xsd @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From fc10691c9cc3bdb3582b37a1f69043d42cf4100a Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:48:30 +0000 Subject: [PATCH 2/5] fix: fix issues and implementation logic for xsd validation --- .../implementations/duckdb/readers/csv.py | 2 + .../implementations/duckdb/readers/json.py | 7 +- .../implementations/duckdb/readers/xml.py | 21 +- .../implementations/spark/readers/csv.py | 1 + .../implementations/spark/readers/json.py | 1 + .../implementations/spark/readers/xml.py | 23 +- src/dve/core_engine/backends/readers/csv.py | 1 + src/dve/core_engine/backends/readers/xml.py | 32 +- .../backends/readers/xml_linting.py | 11 +- src/dve/core_engine/backends/utilities.py | 42 +- .../core_engine/configuration/v1/__init__.py | 9 +- src/dve/parser/file_handling/service.py | 2 +- src/dve/parser/type_hints.py | 2 +- src/dve/pipeline/pipeline.py | 30 +- src/dve/pipeline/utils.py | 39 +- src/dve/pipeline/xml_linting.py | 363 ------------------ tests/features/books.feature | 10 + .../test_duckdb/test_data_contract.py | 4 +- tests/testdata/books/books_xsd_fail.xml | 24 ++ .../testdata/books/nested_books.dischema.json | 2 +- .../books/nested_books_ddb.dischema.json | 6 +- 21 files changed, 164 insertions(+), 468 deletions(-) delete mode 100644 src/dve/pipeline/xml_linting.py create mode 100644 tests/testdata/books/books_xsd_fail.xml diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index df43348..3998bf5 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader): # TODO - stringify or not def __init__( self, + *, header: bool = True, delim: str = ",", quotechar: str = '"', connection: Optional[DuckDBPyConnection] = None, + **_, ): self.header = header self.delim = delim diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py index a706f11..b1a3ad4 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py @@ -20,7 +20,12 @@ class DuckDBJSONReader(BaseFileReader): """A reader for JSON files""" - def __init__(self, json_format: Optional[str] = "array"): + def __init__( + self, + *, + json_format: Optional[str] = "array", + **_, + ): self._json_format = json_format super().__init__() diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py index 398d6c9..a955946 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py @@ -8,21 +8,18 @@ from pydantic import BaseModel from dve.core_engine.backends.base.reader import read_function +from dve.core_engine.backends.exceptions import MessageBearingError from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet from dve.core_engine.backends.readers.xml import XMLStreamReader from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.type_hints import URI -from dve.parser.file_handling.service import get_parent -from dve.pipeline.utils import dump_errors @duckdb_write_parquet class DuckDBXMLStreamReader(XMLStreamReader): """A reader for XML files""" - def __init__(self, - ddb_connection: Optional[DuckDBPyConnection] = None, - **kwargs): + def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs): self.ddb_connection = ddb_connection if ddb_connection else default_connection super().__init__(**kwargs) @@ -32,14 +29,12 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod if self.xsd_location: msg = self._run_xmllint(file_uri=resource) if msg: - working_folder = get_parent(resource) - dump_errors( - working_folder=working_folder, - step_name="file_transformation", - messages=[msg] - ) - - polars_schema: Dict[str, pl.DataType] = { # type: ignore + raise MessageBearingError( + "Submitted file failed XSD validation.", + messages=[msg], + ) + + polars_schema: dict[str, pl.DataType] = { # type: ignore fld.name: get_polars_type_from_annotation(fld.annotation) for fld in stringify_model(schema).__fields__.values() } diff --git a/src/dve/core_engine/backends/implementations/spark/readers/csv.py b/src/dve/core_engine/backends/implementations/spark/readers/csv.py index a95cad2..95db464 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/csv.py @@ -31,6 +31,7 @@ def __init__( multi_line: bool = False, encoding: str = "utf-8-sig", spark_session: Optional[SparkSession] = None, + **_, ) -> None: self.delimiter = delimiter diff --git a/src/dve/core_engine/backends/implementations/spark/readers/json.py b/src/dve/core_engine/backends/implementations/spark/readers/json.py index 56f394e..c336ee0 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/json.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/json.py @@ -27,6 +27,7 @@ def __init__( encoding: Optional[str] = "utf-8", multi_line: Optional[bool] = True, spark_session: Optional[SparkSession] = None, + **_, ) -> None: self.encoding = encoding diff --git a/src/dve/core_engine/backends/implementations/spark/readers/xml.py b/src/dve/core_engine/backends/implementations/spark/readers/xml.py index bf376d5..66b22d2 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/xml.py @@ -12,7 +12,7 @@ from pyspark.sql.utils import AnalysisException from typing_extensions import Literal -from dve.core_engine.backends.base.reader import BaseFileReader, read_function +from dve.core_engine.backends.base.reader import read_function from dve.core_engine.backends.exceptions import EmptyFileError from dve.core_engine.backends.implementations.spark.spark_helpers import ( df_is_empty, @@ -20,11 +20,10 @@ spark_write_parquet, ) from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader -from dve.core_engine.backends.readers.xml_linting import run_xmllint +from dve.core_engine.backends.utilities import dump_errors from dve.core_engine.type_hints import URI, EntityName from dve.parser.file_handling import get_content_length, get_parent from dve.parser.file_handling.service import open_stream -from dve.pipeline.utils import dump_errors SparkXMLMode = Literal["PERMISSIVE", "FAILFAST", "DROPMALFORMED"] """The mode to use when parsing XML files with Spark.""" @@ -74,10 +73,11 @@ def __init__( trim_cells=True, xsd_location: Optional[URI] = None, xsd_error_code: Optional[str] = None, - xsd_error_message: Optional[str] = None + xsd_error_message: Optional[str] = None, + rules_location: Optional[URI] = None, **_, ) -> None: - + super().__init__( record_tag=record_tag, root_tag=root_tag, @@ -86,7 +86,8 @@ def __init__( sanitise_multiline=sanitise_multiline, xsd_location=xsd_location, xsd_error_code=xsd_error_code, - xsd_error_message=xsd_error_message + xsd_error_message=xsd_error_message, + rules_location=rules_location, ) self.spark_session = spark_session or SparkSession.builder.getOrCreate() @@ -117,16 +118,14 @@ def read_to_dataframe( """ if get_content_length(resource) == 0: raise EmptyFileError(f"File at {resource} is empty.") - + if self.xsd_location: msg = self._run_xmllint(file_uri=resource) if msg: working_folder = get_parent(resource) dump_errors( - working_folder=working_folder, - step_name="file_transformation", - messages=[msg] - ) + working_folder=working_folder, step_name="file_transformation", messages=[msg] + ) spark_schema: StructType = get_type_from_annotation(schema) kwargs = { @@ -165,7 +164,7 @@ def read_to_dataframe( kwargs["rowTag"] = f"{namespace}:{self.record_tag}" df = ( self.spark_session.read.format("xml") - .options(**kwargs) + .options(**kwargs) # type: ignore .load(resource, schema=read_schema) ) if self.root_tag and df.columns: diff --git a/src/dve/core_engine/backends/readers/csv.py b/src/dve/core_engine/backends/readers/csv.py index c0b6479..bc05b58 100644 --- a/src/dve/core_engine/backends/readers/csv.py +++ b/src/dve/core_engine/backends/readers/csv.py @@ -36,6 +36,7 @@ def __init__( trim_cells: bool = True, null_values: Collection[str] = frozenset({"NULL", "null", ""}), encoding: str = "utf-8-sig", + **_, ): """Init function for the base CSV reader. diff --git a/src/dve/core_engine/backends/readers/xml.py b/src/dve/core_engine/backends/readers/xml.py index d4883d3..e7480f1 100644 --- a/src/dve/core_engine/backends/readers/xml.py +++ b/src/dve/core_engine/backends/readers/xml.py @@ -103,7 +103,7 @@ def clear(self) -> None: def __iter__(self) -> Iterator["XMLElement"]: ... -class BasicXMLFileReader(BaseFileReader): +class BasicXMLFileReader(BaseFileReader): # pylint: disable=R0902 """A reader for XML files built atop LXML.""" def __init__( @@ -119,6 +119,7 @@ def __init__( xsd_location: Optional[URI] = None, xsd_error_code: Optional[str] = None, xsd_error_message: Optional[str] = None, + rules_location: Optional[URI] = None, **_, ): """Init function for the base XML reader. @@ -153,8 +154,11 @@ def __init__( """Encoding of the XML file.""" self.n_records_to_read = n_records_to_read """The maximum number of records to read from a document.""" - self.xsd_location = xsd_location - """The relative URI of the xsd file if wishing to perform xsd validation""" + if rules_location is not None and xsd_location is not None: + self.xsd_location = rules_location + xsd_location + else: + self.xsd_location = xsd_location # type: ignore + """The URI of the xsd file if wishing to perform xsd validation.""" self.xsd_error_code = xsd_error_code """The error code to be reported if xsd validation fails (if xsd)""" self.xsd_error_message = xsd_error_message @@ -269,12 +273,22 @@ def _parse_xml( for element in elements: yield self._parse_element(element, template_row) - - def _run_xmllint(self, file_uri: URI) -> FeedbackMessage: - return run_xmllint(file_uri=file_uri, - schema_uri=self.xsd_location, - error_code=self.xsd_error_code, - error_message=self.xsd_error_message) + + def _run_xmllint(self, file_uri: URI) -> FeedbackMessage | None: + """Run xmllint package to validate against a given xsd. Requires xmlint to be installed + onto the system to run succesfully.""" + if self.xsd_location is None: + raise AttributeError("Trying to run XML lint with no `xsd_location` provided.") + if self.xsd_error_code is None: + raise AttributeError("Trying to run XML with no `xsd_error_code` provided.") + if self.xsd_error_message is None: + raise AttributeError("Trying to run XML with no `xsd_error_message` provided.") + return run_xmllint( + file_uri=file_uri, + schema_uri=self.xsd_location, + error_code=self.xsd_error_code, + error_message=self.xsd_error_message, + ) def read_to_py_iterator( self, diff --git a/src/dve/core_engine/backends/readers/xml_linting.py b/src/dve/core_engine/backends/readers/xml_linting.py index 3109a88..a5baf64 100644 --- a/src/dve/core_engine/backends/readers/xml_linting.py +++ b/src/dve/core_engine/backends/readers/xml_linting.py @@ -1,11 +1,13 @@ -"""Implement XML linting for files.""" +"""Implement XML linting for files. Please note that xml linting requires xmllint to be installed +onto your system.""" import shutil import tempfile +from collections.abc import Sequence from contextlib import ExitStack from pathlib import Path from subprocess import PIPE, STDOUT, Popen -from typing import Sequence, Union +from typing import Union from uuid import uuid4 from dve.core_engine.message import FeedbackMessage @@ -87,7 +89,7 @@ def run_xmllint( """ if not shutil.which("xmllint"): - raise OSError("Unable to find `xmllint` binary") + raise OSError("Unable to find `xmllint` binary. Please install to use this functionality.") if not get_resource_exists(file_uri): raise IOError(f"No resource accessible at file URI {file_uri!r}") @@ -128,8 +130,7 @@ def run_xmllint( # Close the input stream and await the response code. # Output will be written to the message file. process.stdin.close() - # TODO: Identify an appropriate timeout. - return_code = process.wait() + return_code = process.wait(10) if return_code == 0: return None diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index bfa6f90..37b2782 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -1,18 +1,21 @@ """Necessary, otherwise uncategorised backend functionality.""" +import json import sys from dataclasses import is_dataclass from datetime import date, datetime, time from decimal import Decimal from typing import GenericAlias # type: ignore -from typing import Any, ClassVar, Union +from typing import Any, ClassVar, Optional, Union import polars as pl # type: ignore from polars.datatypes.classes import DataTypeClass as PolarsType from pydantic import BaseModel, create_model +import dve.parser.file_handling as fh from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type -from dve.core_engine.type_hints import Messages +from dve.core_engine.type_hints import URI, Messages +from dve.reporting.error_report import conditional_cast # We need to rely on a Python typing implementation detail in Python <= 3.7. if sys.version_info[:2] <= (3, 7): @@ -176,3 +179,38 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType: if polars_type: return polars_type raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") + + +def dump_errors( + working_folder: URI, + step_name: str, + messages: Messages, + key_fields: Optional[dict[str, list[str]]] = None, +): + """Write out to disk captured feedback error messages.""" + if not working_folder: + raise AttributeError("processed files path not passed") + + if not key_fields: + key_fields = {} + + errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json") + processed = [] + + for message in messages: + primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) + error = message.to_dict( + key_field=primary_keys, + value_separator=" -- ", + max_number_of_values=10, + record_converter=None, + ) + error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") + processed.append(error) + + with fh.open_stream(errors, "a+") as f: + json.dump( + processed, + f, + default=str, + ) diff --git a/src/dve/core_engine/configuration/v1/__init__.py b/src/dve/core_engine/configuration/v1/__init__.py index 057abe3..89174be 100644 --- a/src/dve/core_engine/configuration/v1/__init__.py +++ b/src/dve/core_engine/configuration/v1/__init__.py @@ -24,7 +24,7 @@ from dve.core_engine.message import DataContractErrorDetail from dve.core_engine.type_hints import EntityName, ErrorCategory, ErrorType, TemplateVariables from dve.core_engine.validation import RowValidator -from dve.parser.file_handling import joinuri, open_stream +from dve.parser.file_handling import joinuri, open_stream, resolve_location from dve.parser.type_hints import URI, Extension TypeName = str @@ -202,6 +202,13 @@ def __init__(self, *args, **kwargs): uri = joinuri(uri_prefix, rule_store_config.filename) self._load_rule_store(uri) + for _, model_config in self.contract.datasets.items(): + for reader_config in model_config.reader_config.values(): + reader_config.kwargs_ = { + **reader_config.kwargs_, + "rules_location": f'{"/".join(self.location.split("/")[:-1])}/', + } + def _resolve_business_filter( self, config: BusinessFilterConfig ) -> tuple[ConcreteFilterConfig, TemplateVariables]: diff --git a/src/dve/parser/file_handling/service.py b/src/dve/parser/file_handling/service.py index b638eb6..0422b4c 100644 --- a/src/dve/parser/file_handling/service.py +++ b/src/dve/parser/file_handling/service.py @@ -57,7 +57,7 @@ ) """Supported URI schemes.""" -ALL_FILE_MODES: set[FileOpenMode] = {"r", "a", "w", "ab", "rb", "wb", "ba", "br", "bw"} +ALL_FILE_MODES: set[FileOpenMode] = {"r", "a", "a+", "w", "ab", "rb", "wb", "ba", "br", "bw"} """All supported file modes.""" TEXT_MODES: set[TextFileOpenMode] = {"r", "a", "w"} """Text file modes.""" diff --git a/src/dve/parser/type_hints.py b/src/dve/parser/type_hints.py index c8b2fb7..f18cc4f 100644 --- a/src/dve/parser/type_hints.py +++ b/src/dve/parser/type_hints.py @@ -19,7 +19,7 @@ """The path attribute of the URI.""" Extension = str """A file extension (e.g. '.csv').""" -TextFileOpenMode = Literal["r", "a", "w"] +TextFileOpenMode = Literal["r", "a", "w", "a+"] """An opening mode for a file in text mode.""" BinaryFileOpenMode = Literal["ab", "rb", "wb", "ba", "br", "bw"] """An opening mode for a file in binary mode.""" diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index 5dd119f..4a6a9f1 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -22,13 +22,13 @@ from dve.core_engine.backends.exceptions import MessageBearingError from dve.core_engine.backends.readers import BaseFileReader from dve.core_engine.backends.types import EntityType -from dve.core_engine.backends.utilities import stringify_model +from dve.core_engine.backends.utilities import dump_errors, stringify_model from dve.core_engine.loggers import get_logger from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord -from dve.core_engine.type_hints import URI, Failed, FileURI, InfoURI, Messages +from dve.core_engine.type_hints import URI, Failed, FileURI, InfoURI from dve.parser import file_handling as fh -from dve.pipeline.utils import SubmissionStatus, deadletter_file, dump_errors, load_config, load_reader -from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates, conditional_cast +from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader +from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = ( FileNotFoundError, # type: ignore @@ -276,12 +276,10 @@ def file_transformation( ) if errors: dump_errors( - fh.joinuri( - self.processed_files_path, - submission_info.submission_id - ), + fh.joinuri(self.processed_files_path, submission_info.submission_id), "file_transformation", - errors) + errors, + ) return submission_info.dict() return submission_info except ValueError as exc: @@ -388,13 +386,10 @@ def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[Submissi key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} if messages: dump_errors( - fh.joinuri( - self.processed_files_path, - submission_info.submission_id - ), + fh.joinuri(self.processed_files_path, submission_info.submission_id), "contract", messages, - key_fields=key_fields + key_fields=key_fields, ) failed = any(not rule_message.is_informational for rule_message in messages) @@ -493,13 +488,10 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): if rule_messages: dump_errors( - fh.joinuri( - self.processed_files_path, - submission_info.submission_id - ), + fh.joinuri(self.processed_files_path, submission_info.submission_id), "business_rules", rule_messages, - key_fields + key_fields, ) failed = any(not rule_message.is_informational for rule_message in rule_messages) or failed diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index a0bc102..4fa9a02 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -2,7 +2,7 @@ import json from threading import Lock -from typing import Dict, List, Optional, Tuple, Union +from typing import Optional, Union from pydantic.main import ModelMetaclass from pyspark.sql import SparkSession @@ -12,9 +12,8 @@ import dve.parser.file_handling as fh from dve.core_engine.backends.readers import _READER_REGISTRY from dve.core_engine.configuration.v1 import SchemaName, V1EngineConfig, _ModelConfig -from dve.core_engine.type_hints import URI, Messages, SubmissionResult +from dve.core_engine.type_hints import URI, SubmissionResult from dve.metadata_parser.model_generator import JSONtoPyd -from dve.reporting.error_report import conditional_cast Dataset = dict[SchemaName, _ModelConfig] _configs: dict[str, tuple[dict[str, ModelMetaclass], V1EngineConfig, Dataset]] = {} @@ -68,40 +67,6 @@ def deadletter_file(source_uri: URI) -> None: except TypeError: return None -def dump_errors( - working_folder: URI, - step_name: str, - messages: Messages, - key_fields: Optional[Dict[str, List[str]]] = None, -): - if not working_folder: - raise AttributeError("processed files path not passed") - - if not key_fields: - key_fields = {} - - errors = fh.joinuri( - working_folder, "errors", f"{step_name}_errors.json" - ) - processed = [] - - for message in messages: - primary_keys: List[str] = key_fields.get(message.entity if message.entity else "", []) - error = message.to_dict( - key_field=primary_keys, - value_separator=" -- ", - max_number_of_values=10, - record_converter=None, - ) - error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") - processed.append(error) - - with fh.open_stream(errors, "a+") as f: - json.dump( - processed, - f, - default=str, - ) class SubmissionStatus: """Submission status for a given submission.""" diff --git a/src/dve/pipeline/xml_linting.py b/src/dve/pipeline/xml_linting.py deleted file mode 100644 index fbf53a6..0000000 --- a/src/dve/pipeline/xml_linting.py +++ /dev/null @@ -1,363 +0,0 @@ -"""Implement XML linting for files.""" - -import argparse -import re -import shutil -import sys -import tempfile -from collections.abc import Iterable, Sequence -from contextlib import ExitStack -from pathlib import Path -from subprocess import PIPE, STDOUT, Popen -from typing import Optional -from uuid import uuid4 - -from typing_extensions import Literal - -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import Messages -from dve.parser.file_handling import copy_resource, get_file_name, get_resource_exists, open_stream -from dve.parser.file_handling.implementations.file import file_uri_to_local_path -from dve.parser.type_hints import URI - -Replacement = str -"""A replacement for a regex pattern.""" -Stage = Literal["Pre-validation", "Post-validation"] -"""The stage at which the XML linting is being performed.""" -ErrorMessage = str -"""Error message for xml issues""" -ErrorCode = str -"""Error code for xml feedback errors""" - -FIVE_MEBIBYTES = 5 * (1024**3) -"""The size of 5 binary megabytes, in bytes.""" - -# Patterns/strings for xmllint message sanitisation. -IGNORED_PATTERNS: list[re.Pattern] = [re.compile(r"^Unimplemented block at")] -"""Regex patterns for messages that should result in their omission.""" -ERRONEOUS_PATTERNS: list[tuple[re.Pattern, Replacement]] = [ - ( - re.compile(r"^XSD schema .+/(?P.+) failed to compile$"), - r"Missing required component of XSD schema '\g'", - ) -] -""" -Patterns for messages that should trigger errors. Replacement messages will be -raised as `ValueError`s. - -""" -INCORRECT_NAMESPACE_PATTERN = re.compile( - r"Element .*?{(?P.*?)}.*? " - r"No matching global declaration available for the validation root." -) -"""# 1 capture -Pattern to match incorrect namespace found in the xmllint errors. Captures incorrect namespace -""" -# match literal Element followed by anything up to opening curly brace. capture the contents -# of the braces if the literal "No matching global declaration available..." is present in the -# message. This is raised when the namespace doesn't match between the schema and the file - -MISSING_OR_OUT_OF_ORDER_PATTERN = re.compile( - r"Element .*?{.*?}(?P.*?): This element is not expected\. " - r"Expected is .*?}(?P.*?)\)" -) -"""# 2 captures -Pattern to match out of order elements found in the xmllint errors. Captures incorrect found -and expected fields (in that order) -""" -# match literal Element followed by anything up to opening curly brace. capture the field that -# comes after the braces if the literal "this element is not expected" is present in the -# message. capture the expected value too -# This is raised when the xml fields are out of order or a field is missing. - -MISSING_CHILD_ELEMENTS_PATTERN = re.compile( - r"Element .*?{.*?}.*?: Missing child element\(s\). Expected is.*?}(?P.*?)\)" -) -"""# 1 capture -Pattern to match when a tag has missing child elements. Captures the expected element -""" -FAILS_TO_VALIDATE_PATTERN = re.compile(r"fails to validate") -"""# 0 captures -Pattern to match the fails to validate xmllint message, doesn't capture anything""" - -UNEXPECTED_FIELD_PATTERN = re.compile( - r"Element .*?{.*?}(?P.*?): This element is not expected\.$" -) -"""# 1 capture -Pattern to match unexpected fields rather than out of order fields. Captures incorrect field -""" - -REMOVED_PATTERNS: list[re.Pattern] = [ - re.compile(r"\{.+?\}"), - re.compile(r"[\. ]+$"), -] -"""Regex patterns to remove from the xmllint output.""" -REPLACED_PATTERNS: list[tuple[re.Pattern, Replacement]] = [ - (re.compile(r":(?P\d+):"), r" on line \g:"), -] -"""Regex patterns to replace in the xmllint output.""" -REPLACED_STRINGS: list[tuple[str, Replacement]] = [ - ( - "No matching global declaration available for the validation root", - "Incorrect namespace version, please ensure you have the most recent namespace", - ), - ( - "fails to validate", - "Whole file has failed schema validation - please ensure your data matches the expected " - + "structure", - ), -] -"""Strings to replace in the xmllint output.""" - - -def _sanitise_lint_issue(issue: str, file_name: str) -> Optional[str]: - """Sanitise an xmllint lint message. If the message should be ignored, - this function will return `None` instead of a string. - - If messages are considered erroneous, a `ValueError` will be raised. - - """ - for pattern in IGNORED_PATTERNS: - if pattern.match(issue): - return None - for pattern, error_message in ERRONEOUS_PATTERNS: - if pattern.match(issue): - raise ValueError(pattern.sub(error_message, issue)) - - # Line will start with "-" because that's the file name for streamed XML files. - if issue.startswith("-"): - issue = "".join((file_name, issue[1:])) - - for pattern in REMOVED_PATTERNS: - issue = pattern.sub("", issue) - for pattern, replacement in REPLACED_PATTERNS: - issue = pattern.sub(replacement, issue) - for string, replacement in REPLACED_STRINGS: - issue = issue.replace(string, replacement) - - return issue - - -def _parse_lint_messages( - lint_messages: Iterable[str], - error_mapping: dict[re.Pattern, tuple[ErrorMessage, ErrorCode]], - stage: Stage = "Pre-validation", - file_name: Optional[str] = None, -) -> Messages: - """Parse a sequence of messages from `xmllint` into a deque of feedback messages.""" - messages: Messages = [] - for issue in lint_messages: - for regex, (error_message, error_code) in error_mapping.items(): - match_ = regex.search(issue) - if match_: - groups = {key: group.strip(",' ") for key, group in match_.groupdict().items()} - if not groups: - reporting_field = "Whole file" - groups[reporting_field] = file_name - elif len(groups) > 1: - reporting_field = list(groups) # type: ignore - else: - reporting_field = list(groups)[0] - - messages.append( - FeedbackMessage( - entity=stage, - record={"Wholefile": groups}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_code=error_code, - error_message=error_message.format(*groups.values()), - reporting_field=reporting_field, - category="Bad file", - ) - ) - return list(dict.fromkeys(messages)) # remove duplicate errors but preserve order - - -def _ensure_schema_and_resources( - schema_uri: URI, schema_resources: Sequence[URI], temp_dir: Path -) -> Path: - """Given the schema and schema resource URIs and a temp dir, if the resources - are remote or exist in different directories, copy them to the temp dir. - - Return the local schema path. - - """ - if not get_resource_exists(schema_uri): - raise IOError(f"No resource accessible at schema URI {schema_uri!r}") - - missing_resources = list( - filter(lambda resource: not get_resource_exists(resource), schema_resources) - ) - if missing_resources: - raise IOError(f"Some schema resources missing: {missing_resources!r}") - - all_resources = [schema_uri, *schema_resources] - - schemas_are_files = all(map(lambda resource: resource.startswith("file:"), all_resources)) - if schemas_are_files: - paths = list(map(file_uri_to_local_path, all_resources)) - all_paths_have_same_parent = len({path.parent for path in paths}) == 1 - - if all_paths_have_same_parent: - schema_path = paths[0] - return schema_path - - for resource_uri in all_resources: - local_path = temp_dir.joinpath(get_file_name(resource_uri)) - copy_resource(resource_uri, local_path.as_uri()) - - schema_path = temp_dir.joinpath(get_file_name(schema_uri)) - return schema_path - - -def run_xmllint( - file_uri: URI, - schema_uri: URI, - *schema_resources: URI, - error_mapping: dict[re.Pattern, tuple[ErrorMessage, ErrorCode]], - stage: Stage = "Pre-validation", -) -> Messages: - """Run `xmllint`, given a file and information about the schemas to apply. - - The schema and associated resources will be copied to a temporary directory - for validation, unless they are all already in the same local folder. - - Args: - - `file_uri`: the URI of the file to be streamed into `xmllint` - - `schema_uri`: the URI of the XSD schema for the file. - - `*schema_resources`: URIs for additional XSD files required by the schema. - - `stage` (keyword only): One of `{'Pre-validation', 'Post-validation'}` - - Returns a deque of messages produced by the linting. - - """ - if not shutil.which("xmllint"): - raise OSError("Unable to find `xmllint` binary") - - if not get_resource_exists(file_uri): - raise IOError(f"No resource accessible at file URI {file_uri!r}") - - # Ensure the schema and resources are local file paths so they can be - # read by xmllint. - # Lots of resources to manage here. - with tempfile.TemporaryDirectory() as temp_dir_str: - temp_dir = Path(temp_dir_str) - schema_path = _ensure_schema_and_resources(schema_uri, schema_resources, temp_dir) - message_file_path = temp_dir.joinpath(uuid4().hex) - - with ExitStack() as linting_context: - # Need to write lint output to a file to avoid deadlock. Kinder to mem this way anyway. - message_file_bytes = linting_context.enter_context(message_file_path.open("wb")) - - # Open an `xmllint` process to pipe into. - command = ["xmllint", "--stream", "--schema", str(schema_path), "-"] - process = linting_context.enter_context( - Popen(command, stdin=PIPE, stdout=message_file_bytes, stderr=STDOUT) - ) - # This should never trigger, bad typing in stdlib. - if process.stdin is None: - raise ValueError("Unable to pipe file into subprocess") - - # Pipe the XML file contents into xmllint. - block = b"" - try: - with open_stream(file_uri, "rb") as byte_stream: - while True: - block = byte_stream.read(FIVE_MEBIBYTES) - if not block: - break - process.stdin.write(block) - except BrokenPipeError: - file_name = get_file_name(file_uri) - return [ - FeedbackMessage( - entity=stage, - record={"Whole file": file_name}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="failed schema check", - category="Bad file", - ), - FeedbackMessage( - entity=stage, - record={"Whole file": block[:50].decode(errors="replace")}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="Failed xml validation", - reporting_field="Whole file", - category="Bad file", - ), - ] - - # Close the input stream and await the response code. - # Output will be written to the message file. - process.stdin.close() - # TODO: Identify an appropriate timeout. - return_code = process.wait() - - if return_code == 0: - return [] - with message_file_path.open("r", encoding="utf-8") as message_file: - lint_messages = (line for line in map(str.rstrip, message_file) if line) - file_name = get_file_name(file_uri) - messages = _parse_lint_messages(lint_messages, error_mapping, stage, file_name) - - # Nonzero exit code without messages _shouldn't_ happen, but it's possible - # if `xmllint` is killed (and presumably possible if it runs out of mem) - # so we should handle that possibility. - if not messages: - messages.append( - FeedbackMessage( - entity=stage, - record={"Whole file": file_name}, - failure_type="submission", - is_informational=False, - error_type=None, - error_location="Whole file", - error_message="failed schema check", - category="Bad file", - ) - ) - - return messages - - -def _main(cli_args: list[str]): - """Command line interface for XML linting. Useful for testing.""" - parser = argparse.ArgumentParser() - parser.add_argument("xml_file_path", help="The path to the XML file to be validated") - parser.add_argument( - "xsd_file_paths", help="The path to the XSD schemas (primary schema first)", nargs="+" - ) - args = parser.parse_args(cli_args) - - xml_path = Path(args.xml_file_path).resolve() - xsd_uris = list(map(lambda path_str: Path(path_str).resolve().as_uri(), args.xsd_file_paths)) - try: - messages = run_xmllint(xml_path.as_uri(), *xsd_uris, error_mapping={}) - except Exception as err: # pylint: disable=broad-except - print(f"Exception ({type(err).__name__}) raised in XML linting.", file=sys.stderr) - print(err, file=sys.stderr) - sys.exit(2) - - if not messages: - print(f"File {xml_path.name!r} validated successfully\n", file=sys.stderr) - sys.exit(0) - else: - print(f"File {xml_path.name!r} validation failed\n", file=sys.stderr) - - print("|".join(FeedbackMessage.HEADER)) - for message in messages: - print("|".join(map(lambda col: str(col) if col else "", message.to_row()))) - sys.exit(1) - - -if __name__ == "__main__": - _main(sys.argv[1:]) diff --git a/tests/features/books.feature b/tests/features/books.feature index 49cf2b3..6313a07 100644 --- a/tests/features/books.feature +++ b/tests/features/books.feature @@ -67,3 +67,13 @@ Feature: Pipeline tests using the books dataset Then the latest audit record for the submission is marked with processing status error_report When I run the error report phase Then An error report is produced + + Scenario: Handle a file that fails XSD validation (duckdb) + Given I submit the books file books_xsd_fail.xml for processing + And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json' + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py index 5093150..e4c08ad 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py @@ -144,10 +144,10 @@ def test_duckdb_data_contract_xml(temp_xml_file): contract_dict = json.loads(contract_meta).get("contract") entities: Dict[str, DuckDBPyRelation] = { "test_header": DuckDBXMLStreamReader( - connection, root_tag="root", record_tag="Header" + ddb_connection=connection, root_tag="root", record_tag="Header" ).read_to_relation(str(uri), "header", header_model), "test_class_info": DuckDBXMLStreamReader( - connection, root_tag="root", record_tag="ClassData" + ddb_connection=connection, root_tag="root", record_tag="ClassData" ).read_to_relation(str(uri), "class_info", class_model), } diff --git a/tests/testdata/books/books_xsd_fail.xml b/tests/testdata/books/books_xsd_fail.xml new file mode 100644 index 0000000..bc818da --- /dev/null +++ b/tests/testdata/books/books_xsd_fail.xml @@ -0,0 +1,24 @@ + + + + + Gambardella, Matthew & Kent, Roy + XML Developer's Guide + Computer + 44.95 + 2000-10-01 + An in-depth look at creating applications + with XML. + + + Ralls, Kim + Midnight Rain + Fantasy + 5.95 + 2000-12-16 + A former architect battles corporate zombies, + an evil sorceress, and her own childhood to become queen + of the world. + + + \ No newline at end of file diff --git a/tests/testdata/books/nested_books.dischema.json b/tests/testdata/books/nested_books.dischema.json index 86ca0bb..5e53fce 100644 --- a/tests/testdata/books/nested_books.dischema.json +++ b/tests/testdata/books/nested_books.dischema.json @@ -25,7 +25,7 @@ "kwargs": { "record_tag": "bookstore", "n_records_to_read": 1, - "xsd_location": "./nested_books.dischema.json", + "xsd_location": "nested_books.xsd", "xsd_error_code": "TESTXSDERROR", "xsd_error_message": "the xml is poorly structured" } diff --git a/tests/testdata/books/nested_books_ddb.dischema.json b/tests/testdata/books/nested_books_ddb.dischema.json index aa523aa..b9c3314 100644 --- a/tests/testdata/books/nested_books_ddb.dischema.json +++ b/tests/testdata/books/nested_books_ddb.dischema.json @@ -23,7 +23,11 @@ ".xml": { "reader": "DuckDBXMLStreamReader", "kwargs": { - "record_tag": "bookstore" + "record_tag": "bookstore", + "n_records_to_read": 1, + "xsd_location": "nested_books.xsd", + "xsd_error_code": "TESTXSDERROR", + "xsd_error_message": "the xml is poorly structured" } } } From 7a970c15b324797e23dd33c065633b353ffd75ff Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Mon, 1 Dec 2025 16:12:54 +0000 Subject: [PATCH 3/5] ci: add missing xmllint package --- .github/workflows/ci_testing.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_testing.yml b/.github/workflows/ci_testing.yml index f54eb67..5f9bc95 100644 --- a/.github/workflows/ci_testing.yml +++ b/.github/workflows/ci_testing.yml @@ -17,7 +17,7 @@ jobs: - name: Install extra dependencies for a python install run: | sudo apt-get update - sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev + sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev libxml2-utils - name: Install asdf cli uses: asdf-vm/actions/setup@v4 From c5bb8021463f0c25774ca2b0cab0b0e830745488 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:25:57 +0000 Subject: [PATCH 4/5] refactor: tweak of file mode for dump errors --- src/dve/core_engine/backends/utilities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index 37b2782..1a993e9 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -208,7 +208,7 @@ def dump_errors( error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ") processed.append(error) - with fh.open_stream(errors, "a+") as f: + with fh.open_stream(errors, "a") as f: json.dump( processed, f, From 6d6c1d2f36897a1d143d53b537abd9a5b7f49273 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 3 Dec 2025 08:34:10 +0000 Subject: [PATCH 5/5] fix: fix spark xsd check to raise messageBearingError --- .../implementations/spark/readers/xml.py | 17 ++++++++--------- .../core_engine/backends/readers/xml_linting.py | 7 +------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/dve/core_engine/backends/implementations/spark/readers/xml.py b/src/dve/core_engine/backends/implementations/spark/readers/xml.py index 66b22d2..30d6756 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/xml.py @@ -5,24 +5,23 @@ from typing import Any, Optional from pydantic import BaseModel +from pyspark.errors.exceptions.base import AnalysisException from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as sf from pyspark.sql.column import Column from pyspark.sql.types import StringType, StructField, StructType -from pyspark.sql.utils import AnalysisException from typing_extensions import Literal from dve.core_engine.backends.base.reader import read_function -from dve.core_engine.backends.exceptions import EmptyFileError +from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError from dve.core_engine.backends.implementations.spark.spark_helpers import ( df_is_empty, get_type_from_annotation, spark_write_parquet, ) from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader -from dve.core_engine.backends.utilities import dump_errors from dve.core_engine.type_hints import URI, EntityName -from dve.parser.file_handling import get_content_length, get_parent +from dve.parser.file_handling import get_content_length from dve.parser.file_handling.service import open_stream SparkXMLMode = Literal["PERMISSIVE", "FAILFAST", "DROPMALFORMED"] @@ -44,7 +43,7 @@ def read_to_dataframe( ) -> DataFrame: """Stream an XML file into a Spark data frame""" if not self.spark: - self.spark = SparkSession.builder.getOrCreate() + self.spark = SparkSession.builder.getOrCreate() # type: ignore spark_schema = get_type_from_annotation(schema) return self.spark.createDataFrame( # type: ignore list(self.read_to_py_iterator(resource, entity_name, schema)), @@ -90,7 +89,7 @@ def __init__( rules_location=rules_location, ) - self.spark_session = spark_session or SparkSession.builder.getOrCreate() + self.spark_session = spark_session or SparkSession.builder.getOrCreate() # type: ignore self.sampling_ratio = sampling_ratio self.exclude_attribute = exclude_attribute self.mode = mode @@ -122,9 +121,9 @@ def read_to_dataframe( if self.xsd_location: msg = self._run_xmllint(file_uri=resource) if msg: - working_folder = get_parent(resource) - dump_errors( - working_folder=working_folder, step_name="file_transformation", messages=[msg] + raise MessageBearingError( + "Submitted file failed XSD validation.", + messages=[msg], ) spark_schema: StructType = get_type_from_annotation(schema) diff --git a/src/dve/core_engine/backends/readers/xml_linting.py b/src/dve/core_engine/backends/readers/xml_linting.py index a5baf64..529d8ee 100644 --- a/src/dve/core_engine/backends/readers/xml_linting.py +++ b/src/dve/core_engine/backends/readers/xml_linting.py @@ -11,12 +11,7 @@ from uuid import uuid4 from dve.core_engine.message import FeedbackMessage -from dve.parser.file_handling import ( - copy_resource, - get_file_name, - get_resource_exists, - open_stream, -) +from dve.parser.file_handling import copy_resource, get_file_name, get_resource_exists, open_stream from dve.parser.file_handling.implementations.file import file_uri_to_local_path from dve.parser.type_hints import URI