Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Install extra dependencies for a python install
run: |
sudo apt-get update
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev libxml2-utils

- name: Install asdf cli
uses: asdf-vm/actions/setup@v4
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
## v0.4.0 (2025-12-17)

### Feat

- add persistance of error aggregates to pipeline
- add Foundry pipeline

### Fix

- issue where templated error messages would not correctly format when passing in parameter values

### Refactor

- include submission status for services passthrough

## v0.3.0 (2025-11-19)

### Feat
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ audit_manager = SparkAuditingManager(

# Setting up the Pipeline (in this case the Spark implemented one)
pipeline = SparkDVEPipeline(
processed_files_path="path/where/my/processed_files/should_go/",
audit_tables=audit_manager,
job_run_id=1,
rules_path="path/to/my_dischema",
processed_files_path="path/where/my/processed_files/should_go/",
submitted_files_path="path/to/my/cwt_files/",
reference_data_loader=SparkParquetRefDataLoader,
spark=spark
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nhs_dve"
version = "0.3.0"
version = "0.4.0"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England <england.contactus@nhs.net>"]
readme = "README.md"
Expand Down Expand Up @@ -39,7 +39,7 @@ requests = "2.32.4" # Mitigates security vuln in < 2.31.0
schedula = "1.2.19"
sqlalchemy = "2.0.19"
typing_extensions = "4.6.2"
urllib3 = "2.5.0" # Mitigates security vuln in < 1.26.19
urllib3 = "2.6.0" # Mitigates security vuln in < 2.5.0
xmltodict = "0.13.0"

[tool.poetry.group.dev]
Expand Down
35 changes: 33 additions & 2 deletions src/dve/core_engine/backends/base/auditing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
QueueType,
SubmissionResult,
)
from dve.pipeline.utils import SubmissionStatus

AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name

Expand Down Expand Up @@ -329,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs):
ProcessingStatusRecord(
submission_id=submission_id,
processing_status="business_rules",
submission_result="failed" if failed else None,
submission_result="validation_failed" if failed else None,
**kwargs,
)
for submission_id, failed in submissions
Expand Down Expand Up @@ -379,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs):
"""Update submission processing_status to failed."""
recs = [
ProcessingStatusRecord(
submission_id=submission_id, processing_status="failed", **kwargs
submission_id=submission_id,
processing_status="failed",
submission_result="processing_failed",
**kwargs,
)
for submission_id in submissions
]
Expand Down Expand Up @@ -494,6 +498,33 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
except StopIteration:
return None

def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]:
"""Get the latest submission status for a submission"""

try:
processing_rec: ProcessingStatusRecord = next( # type: ignore
self._processing_status.conv_to_records(
self._processing_status.get_most_recent_records(
order_criteria=[OrderCriteria("time_updated", True)],
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)],
)
)
)
except StopIteration:
return None
sub_status = SubmissionStatus()
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(
submission_id
)
if processing_rec.submission_result == "processing_failed":
sub_status.processing_failed = True
if processing_rec.submission_result == "validation_failed":
sub_status.validation_failed = True
if sub_stats_rec:
sub_status.number_of_records = sub_stats_rec.record_count

return sub_status

def __enter__(self):
"""Use audit table as context manager"""
if self.pool and self.pool_result.done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader):
# TODO - stringify or not
def __init__(
self,
*,
header: bool = True,
delim: str = ",",
quotechar: str = '"',
connection: Optional[DuckDBPyConnection] = None,
**_,
):
self.header = header
self.delim = delim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
class DuckDBJSONReader(BaseFileReader):
"""A reader for JSON files"""

def __init__(self, json_format: Optional[str] = "array"):
def __init__(
self,
*,
json_format: Optional[str] = "array",
**_,
):
self._json_format = json_format

super().__init__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
Expand All @@ -18,13 +19,21 @@
class DuckDBXMLStreamReader(XMLStreamReader):
"""A reader for XML files"""

def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
self.ddb_connection = ddb_connection if ddb_connection else default_connection
super().__init__(**kwargs)

@read_function(DuckDBPyRelation)
def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseModel]):
"""Returns a relation object from the source xml"""
if self.xsd_location:
msg = self._run_xmllint(file_uri=resource)
if msg:
raise MessageBearingError(
"Submitted file failed XSD validation.",
messages=[msg],
)

polars_schema: dict[str, pl.DataType] = { # type: ignore
fld.name: get_polars_type_from_annotation(fld.annotation)
for fld in stringify_model(schema).__fields__.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
multi_line: bool = False,
encoding: str = "utf-8-sig",
spark_session: Optional[SparkSession] = None,
**_,
) -> None:

self.delimiter = delimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
encoding: Optional[str] = "utf-8",
multi_line: Optional[bool] = True,
spark_session: Optional[SparkSession] = None,
**_,
) -> None:

self.encoding = encoding
Expand Down
46 changes: 33 additions & 13 deletions src/dve/core_engine/backends/implementations/spark/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
from typing import Any, Optional

from pydantic import BaseModel
from pyspark.errors.exceptions.base import AnalysisException
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.column import Column
from pyspark.sql.types import StringType, StructField, StructType
from pyspark.sql.utils import AnalysisException
from typing_extensions import Literal

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.base.reader import read_function
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.implementations.spark.spark_helpers import (
df_is_empty,
get_type_from_annotation,
spark_write_parquet,
)
from dve.core_engine.backends.readers.xml import XMLStreamReader
from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length
from dve.parser.file_handling.service import open_stream
Expand All @@ -43,7 +43,7 @@ def read_to_dataframe(
) -> DataFrame:
"""Stream an XML file into a Spark data frame"""
if not self.spark:
self.spark = SparkSession.builder.getOrCreate()
self.spark = SparkSession.builder.getOrCreate() # type: ignore
spark_schema = get_type_from_annotation(schema)
return self.spark.createDataFrame( # type: ignore
list(self.read_to_py_iterator(resource, entity_name, schema)),
Expand All @@ -52,7 +52,7 @@ def read_to_dataframe(


@spark_write_parquet
class SparkXMLReader(BaseFileReader): # pylint: disable=too-many-instance-attributes
class SparkXMLReader(BasicXMLFileReader): # pylint: disable=too-many-instance-attributes
"""A reader for XML files built atop Spark-XML."""

def __init__(
Expand All @@ -70,21 +70,33 @@ def __init__(
sanitise_multiline: bool = True,
namespace=None,
trim_cells=True,
xsd_location: Optional[URI] = None,
xsd_error_code: Optional[str] = None,
xsd_error_message: Optional[str] = None,
rules_location: Optional[URI] = None,
**_,
) -> None:
self.record_tag = record_tag
self.spark_session = spark_session or SparkSession.builder.getOrCreate()

super().__init__(
record_tag=record_tag,
root_tag=root_tag,
trim_cells=trim_cells,
null_values=null_values,
sanitise_multiline=sanitise_multiline,
xsd_location=xsd_location,
xsd_error_code=xsd_error_code,
xsd_error_message=xsd_error_message,
rules_location=rules_location,
)

self.spark_session = spark_session or SparkSession.builder.getOrCreate() # type: ignore
self.sampling_ratio = sampling_ratio
self.exclude_attribute = exclude_attribute
self.mode = mode
self.infer_schema = infer_schema
self.ignore_namespace = ignore_namespace
self.root_tag = root_tag
self.sanitise_multiline = sanitise_multiline
self.null_values = null_values
self.namespace = namespace
self.trim_cells = trim_cells
super().__init__()

def read_to_py_iterator(
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
Expand All @@ -106,6 +118,14 @@ def read_to_dataframe(
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

if self.xsd_location:
msg = self._run_xmllint(file_uri=resource)
if msg:
raise MessageBearingError(
"Submitted file failed XSD validation.",
messages=[msg],
)

spark_schema: StructType = get_type_from_annotation(schema)
kwargs = {
"rowTag": self.record_tag,
Expand Down Expand Up @@ -143,7 +163,7 @@ def read_to_dataframe(
kwargs["rowTag"] = f"{namespace}:{self.record_tag}"
df = (
self.spark_session.read.format("xml")
.options(**kwargs)
.options(**kwargs) # type: ignore
.load(resource, schema=read_schema)
)
if self.root_tag and df.columns:
Expand Down
2 changes: 1 addition & 1 deletion src/dve/core_engine/backends/metadata/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BaseReportingConfig(BaseModel):

"""

UNTEMPLATED_FIELDS: ClassVar[set[str]] = {"message"}
UNTEMPLATED_FIELDS: ClassVar[set[str]] = set()
"""Fields that should not be templated."""

emit: Optional[str] = None
Expand Down
1 change: 1 addition & 0 deletions src/dve/core_engine/backends/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
trim_cells: bool = True,
null_values: Collection[str] = frozenset({"NULL", "null", ""}),
encoding: str = "utf-8-sig",
**_,
):
"""Init function for the base CSV reader.

Expand Down
33 changes: 32 additions & 1 deletion src/dve/core_engine/backends/readers/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

from dve.core_engine.backends.base.reader import BaseFileReader
from dve.core_engine.backends.exceptions import EmptyFileError
from dve.core_engine.backends.readers.xml_linting import run_xmllint
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import NonClosingTextIOWrapper, get_content_length, open_stream
from dve.parser.file_handling.implementations.file import (
Expand Down Expand Up @@ -101,7 +103,7 @@ def clear(self) -> None:
def __iter__(self) -> Iterator["XMLElement"]: ...


class BasicXMLFileReader(BaseFileReader):
class BasicXMLFileReader(BaseFileReader): # pylint: disable=R0902
"""A reader for XML files built atop LXML."""

def __init__(
Expand All @@ -114,6 +116,10 @@ def __init__(
sanitise_multiline: bool = True,
encoding: str = "utf-8-sig",
n_records_to_read: Optional[int] = None,
xsd_location: Optional[URI] = None,
xsd_error_code: Optional[str] = None,
xsd_error_message: Optional[str] = None,
rules_location: Optional[URI] = None,
**_,
):
"""Init function for the base XML reader.
Expand Down Expand Up @@ -148,6 +154,15 @@ def __init__(
"""Encoding of the XML file."""
self.n_records_to_read = n_records_to_read
"""The maximum number of records to read from a document."""
if rules_location is not None and xsd_location is not None:
self.xsd_location = rules_location + xsd_location
else:
self.xsd_location = xsd_location # type: ignore
"""The URI of the xsd file if wishing to perform xsd validation."""
self.xsd_error_code = xsd_error_code
"""The error code to be reported if xsd validation fails (if xsd)"""
self.xsd_error_message = xsd_error_message
"""The error message to be reported if xsd validation fails"""
super().__init__()
self._logger = get_logger(__name__)

Expand Down Expand Up @@ -259,6 +274,22 @@ def _parse_xml(
for element in elements:
yield self._parse_element(element, template_row)

def _run_xmllint(self, file_uri: URI) -> FeedbackMessage | None:
"""Run xmllint package to validate against a given xsd. Requires xmlint to be installed
onto the system to run succesfully."""
if self.xsd_location is None:
raise AttributeError("Trying to run XML lint with no `xsd_location` provided.")
if self.xsd_error_code is None:
raise AttributeError("Trying to run XML with no `xsd_error_code` provided.")
if self.xsd_error_message is None:
raise AttributeError("Trying to run XML with no `xsd_error_message` provided.")
return run_xmllint(
file_uri=file_uri,
schema_uri=self.xsd_location,
error_code=self.xsd_error_code,
error_message=self.xsd_error_message,
)

def read_to_py_iterator(
self,
resource: URI,
Expand Down
Loading