Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Install extra dependencies for a python install
run: |
sudo apt-get update
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev libxml2-utils

- name: Install asdf cli
uses: asdf-vm/actions/setup@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader):
# TODO - stringify or not
def __init__(
self,
*,
header: bool = True,
delim: str = ",",
quotechar: str = '"',
connection: Optional[DuckDBPyConnection] = None,
**_,
):
self.header = header
self.delim = delim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
class DuckDBJSONReader(BaseFileReader):
"""A reader for JSON files"""

def __init__(self, json_format: Optional[str] = "array"):
def __init__(
self,
*,
json_format: Optional[str] = "array",
**_,
):
self._json_format = json_format

super().__init__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
Expand All @@ -18,13 +19,21 @@
class DuckDBXMLStreamReader(XMLStreamReader):
"""A reader for XML files"""

def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
self.ddb_connection = ddb_connection if ddb_connection else default_connection
super().__init__(**kwargs)

@read_function(DuckDBPyRelation)
def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseModel]):
"""Returns a relation object from the source xml"""
if self.xsd_location:
msg = self._run_xmllint(file_uri=resource)
if msg:
raise MessageBearingError(
"Submitted file failed XSD validation.",
messages=[msg],
)

polars_schema: dict[str, pl.DataType] = { # type: ignore
fld.name: get_polars_type_from_annotation(fld.annotation)
for fld in stringify_model(schema).__fields__.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
multi_line: bool = False,
encoding: str = "utf-8-sig",
spark_session: Optional[SparkSession] = None,
**_,
) -> None:

self.delimiter = delimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
encoding: Optional[str] = "utf-8",
multi_line: Optional[bool] = True,
spark_session: Optional[SparkSession] = None,
**_,
) -> None:

self.encoding = encoding
Expand Down
46 changes: 33 additions & 13 deletions src/dve/core_engine/backends/implementations/spark/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
from typing import Any, Optional

from pydantic import BaseModel
from pyspark.errors.exceptions.base import AnalysisException
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.column import Column
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.utils import AnalysisException
from typing_extensions import Literal

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.implementations.spark.spark_helpers import (
df_is_empty,
get_type_from_annotation,
spark_write_parquet,
)
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length
from dve.parser.file_handling.service import open_stream
Expand All @@ -43,7 +43,7 @@
) -> DataFrame:
"""Stream an XML file into a Spark data frame"""
if not self.spark:
self.spark = SparkSession.builder.getOrCreate()
self.spark = SparkSession.builder.getOrCreate() # type: ignore
spark_schema = get_type_from_annotation(schema)
return self.spark.createDataFrame( # type: ignore
list(self.read_to_py_iterator(resource, entity_name, schema)),
Expand All @@ -52,39 +52,51 @@


@spark_write_parquet
class SparkXMLReader(BaseFileReader): # pylint: disable=too-many-instance-attributes
class SparkXMLReader(BasicXMLFileReader): # pylint: disable=too-many-instance-attributes
"""A reader for XML files built atop Spark-XML."""

def __init__(
self,
*,
record_tag: str,
root_tag: Optional[str] = None,
spark_session: Optional[SparkSession] = None,
sampling_ratio: int = 1,
exclude_attribute: bool = True,
mode: SparkXMLMode = "PERMISSIVE",
infer_schema: bool = False,
ignore_namespace: bool = True,
null_values: Collection[str] = frozenset(("NULL", "null", "")),
sanitise_multiline: bool = True,
namespace=None,
trim_cells=True,
xsd_location: Optional[URI] = None,
xsd_error_code: Optional[str] = None,
xsd_error_message: Optional[str] = None,
rules_location: Optional[URI] = None,
**_,

Check warning on line 77 in src/dve/core_engine/backends/implementations/spark/readers/xml.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Method "__init__" has 17 parameters, which is greater than the 13 authorized.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZrA-ejpT-V0F1DOJ4tF&open=AZrA-ejpT-V0F1DOJ4tF&pullRequest=15
) -> None:
self.record_tag = record_tag
self.spark_session = spark_session or SparkSession.builder.getOrCreate()

super().__init__(
record_tag=record_tag,
root_tag=root_tag,
trim_cells=trim_cells,
null_values=null_values,
sanitise_multiline=sanitise_multiline,
xsd_location=xsd_location,
xsd_error_code=xsd_error_code,
xsd_error_message=xsd_error_message,
rules_location=rules_location,
)

self.spark_session = spark_session or SparkSession.builder.getOrCreate() # type: ignore

Check warning on line 92 in src/dve/core_engine/backends/implementations/spark/readers/xml.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Specify both "master" and "appName" parameters to initialize this SparkSession.

See more on https://sonarcloud.io/project/issues?id=NHSDigital_data-validation-engine&issues=AZrjWbBVu-4h0EyNNpHP&open=AZrjWbBVu-4h0EyNNpHP&pullRequest=15
self.sampling_ratio = sampling_ratio
self.exclude_attribute = exclude_attribute
self.mode = mode
self.infer_schema = infer_schema
self.ignore_namespace = ignore_namespace
self.root_tag = root_tag
self.sanitise_multiline = sanitise_multiline
self.null_values = null_values
self.namespace = namespace
self.trim_cells = trim_cells
super().__init__()

def read_to_py_iterator(
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
Expand All @@ -106,6 +118,14 @@
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.xsd_location:
msg = self._run_xmllint(file_uri=resource)
if msg:
raise MessageBearingError(
"Submitted file failed XSD validation.",
messages=[msg],
)

spark_schema: StructType = get_type_from_annotation(schema)
kwargs = {
"rowTag": self.record_tag,
Expand Down Expand Up @@ -143,7 +163,7 @@
kwargs["rowTag"] = f"{namespace}:{self.record_tag}"
df = (
self.spark_session.read.format("xml")
.options(**kwargs)
.options(**kwargs) # type: ignore
.load(resource, schema=read_schema)
)
if self.root_tag and df.columns:
Expand Down
1 change: 1 addition & 0 deletions src/dve/core_engine/backends/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
trim_cells: bool = True,
null_values: Collection[str] = frozenset({"NULL", "null", ""}),
encoding: str = "utf-8-sig",
**_,
):
"""Init function for the base CSV reader.

Expand Down
33 changes: 32 additions & 1 deletion src/dve/core_engine/backends/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

from dve.core_engine.backends.base.reader import BaseFileReader
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.readers.xml_linting import run_xmllint
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import NonClosingTextIOWrapper, get_content_length, open_stream
from dve.parser.file_handling.implementations.file import (
Expand Down Expand Up @@ -101,7 +103,7 @@ def clear(self) -> None:
def __iter__(self) -> Iterator["XMLElement"]: ...


class BasicXMLFileReader(BaseFileReader):
class BasicXMLFileReader(BaseFileReader): # pylint: disable=R0902
"""A reader for XML files built atop LXML."""

def __init__(
Expand All @@ -114,6 +116,10 @@ def __init__(
sanitise_multiline: bool = True,
encoding: str = "utf-8-sig",
n_records_to_read: Optional[int] = None,
xsd_location: Optional[URI] = None,
xsd_error_code: Optional[str] = None,
xsd_error_message: Optional[str] = None,
rules_location: Optional[URI] = None,
**_,
):
"""Init function for the base XML reader.
Expand Down Expand Up @@ -148,6 +154,15 @@ def __init__(
"""Encoding of the XML file."""
self.n_records_to_read = n_records_to_read
"""The maximum number of records to read from a document."""
if rules_location is not None and xsd_location is not None:
self.xsd_location = rules_location + xsd_location
else:
self.xsd_location = xsd_location # type: ignore
"""The URI of the xsd file if wishing to perform xsd validation."""
self.xsd_error_code = xsd_error_code
"""The error code to be reported if xsd validation fails (if xsd)"""
self.xsd_error_message = xsd_error_message
"""The error message to be reported if xsd validation fails"""
super().__init__()
self._logger = get_logger(__name__)

Expand Down Expand Up @@ -259,6 +274,22 @@ def _parse_xml(
for element in elements:
yield self._parse_element(element, template_row)

def _run_xmllint(self, file_uri: URI) -> FeedbackMessage | None:
"""Run xmllint package to validate against a given xsd. Requires xmlint to be installed
onto the system to run succesfully."""
if self.xsd_location is None:
raise AttributeError("Trying to run XML lint with no `xsd_location` provided.")
if self.xsd_error_code is None:
raise AttributeError("Trying to run XML with no `xsd_error_code` provided.")
if self.xsd_error_message is None:
raise AttributeError("Trying to run XML with no `xsd_error_message` provided.")
return run_xmllint(
file_uri=file_uri,
schema_uri=self.xsd_location,
error_code=self.xsd_error_code,
error_message=self.xsd_error_message,
)

def read_to_py_iterator(
self,
resource: URI,
Expand Down
Loading