Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Additionally, if you'd like to contribute a new backend implementation into the

## Installation and usage

The DVE is a Python package and can be installed using `pip`. As of release v1.0.0 we currently only supports Python 3.7, with Spark version 3.2.1 and DuckDB version of 1.1.0. We are currently working on upgrading the DVE to work on Python 3.11+ and this will be made available asap with version 2.0.0 release.
The DVE is a Python package and can be installed using `pip`. As of release v0.1.0 we currently only supports Python 3.7, with Spark version 3.2.1 and DuckDB version of 1.1.0. We are currently working on upgrading the DVE to work on Python 3.11+ and this will be made available asap with version 1.0.0 release.

In addition to a working Python 3.7+ installation you will need OpenJDK 11 installed if you're planning to use the Spark backend implementation.

Expand All @@ -30,7 +30,7 @@ Python dependencies are listed in `pyproject.toml`.
To install the DVE package you can simply install using a package manager such as [pip](https://pypi.org/project/pip/).

```
pip install git+https://github.com/NHSDigital/data-validation-engine.git@v1.0.0
pip install git+https://github.com/NHSDigital/data-validation-engine.git@v0.1.0
```

Once you have installed the DVE you are ready to use it. For guidance on how to create your dischema json document (configuration), please read the [documentation](./docs/).
Expand All @@ -48,8 +48,8 @@ If you have feature request then please follow the same process whilst using the
Below is a list of features that we would like to implement or have been requested.
| Feature | Release Version | Released? |
| ------- | --------------- | --------- |
| Open source release | 1.0.0 | Yes |
| Uplift to Python 3.11 | 2.0.0 | No |
| Open source release | 0.1.0 | Yes |
| Uplift to Python 3.11 | 1.0.0 | No |
| Upgrade to Pydantic 2.0 | Not yet confirmed | No |
| Create a more user friendly interface for building and modifying dischema files | Not yet confirmed | No |

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nhs_dve"
version = "1.1.0"
version = "0.1.0"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England <england.contactus@nhs.net>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/dve/core_engine/backends/base/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
raise ValueError(f"Entity name cannot start with 'refdata_', got {entity_name!r}")
self.entities[entity_name] = entity

self.reference_data = reference_data or {}
self.reference_data = reference_data if reference_data is not None else {}
"""The reference data mapping."""

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
DuckDBCSVReader,
DuckDBCSVRepeatingHeaderReader,
DuckDBXMLStreamReader,
PolarsToDuckDBCSVReader
PolarsToDuckDBCSVReader,
)
from .reference_data import DuckDBRefDataLoader
from .rules import DuckDBStepImplementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
| ---------- | ---------- | ---------- |
| shop1 | clothes | 2025-01-01 |
"""

@read_function(DuckDBPyRelation)
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
Expand Down
5 changes: 4 additions & 1 deletion src/dve/core_engine/backends/implementations/duckdb/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.functions import implementations as functions
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import Messages


Expand Down Expand Up @@ -510,12 +511,14 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages:
matched = matched.select(StarExpression(exclude=config.excluded_columns))

for record in matched.df().to_dict(orient="records"):
# NOTE: only templates using values directly accessible in record - nothing nested
# more complex extraction done in reporting module
messages.append(
FeedbackMessage(
entity=config.reporting.reporting_entity_override or config.entity_name,
record=record, # type: ignore
error_location=config.reporting.legacy_location,
error_message=config.reporting.message,
error_message=template_object(config.reporting.message, record), # type: ignore
failure_type=config.reporting.legacy_error_type,
error_type=config.reporting.legacy_error_type,
error_code=config.reporting.code,
Expand Down
7 changes: 6 additions & 1 deletion src/dve/core_engine/backends/implementations/spark/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from dve.core_engine.constants import ROWID_COLUMN_NAME
from dve.core_engine.functions import implementations as functions
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import Messages


Expand Down Expand Up @@ -406,11 +407,15 @@ def notify(self, entities: SparkEntities, *, config: Notification) -> Messages:

for record in matched.toLocalIterator():
messages.append(
# NOTE: only templates using values directly accessible in record - nothing nested
# more complex extraction done in reporting module
FeedbackMessage(
entity=config.reporting.reporting_entity_override or config.entity_name,
record=record.asDict(recursive=True),
error_location=config.reporting.legacy_location,
error_message=config.reporting.message,
error_message=template_object(
config.reporting.message, record.asDict(recursive=True)
),
failure_type=config.reporting.legacy_error_type,
error_type=config.reporting.legacy_error_type,
error_code=config.reporting.code,
Expand Down
2 changes: 1 addition & 1 deletion src/dve/core_engine/backends/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydantic import BaseModel

from dve.core_engine.backends.metadata.contract import DataContractMetadata, ReaderConfig
from dve.core_engine.backends.metadata.reporting import ReportingConfig, UntemplatedReportingConfig
from dve.core_engine.backends.metadata.reporting import LegacyReportingConfig, ReportingConfig
from dve.core_engine.backends.metadata.rules import (
AbstractStep,
Aggregation,
Expand Down
11 changes: 5 additions & 6 deletions src/dve/core_engine/backends/metadata/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BaseReportingConfig(BaseModel):

"""

UNTEMPLATED_FIELDS: ClassVar[Set[str]] = set()
UNTEMPLATED_FIELDS: ClassVar[Set[str]] = {"message"}
"""Fields that should not be templated."""

emit: Optional[str] = None
Expand Down Expand Up @@ -117,15 +117,13 @@ def template(
else:
variables = local_variables
templated = template_object(self.dict(exclude=self.UNTEMPLATED_FIELDS), variables, "jinja")
templated.update(self.dict(include=self.UNTEMPLATED_FIELDS))
return type_(**templated)


class ReportingConfig(BaseReportingConfig):
"""A base model defining the 'final' reporting config for a message."""

UNTEMPLATED_FIELDS: ClassVar[Set[str]] = {"message"}
"""Fields that should not be templated."""

emit: ErrorEmitValue = "record_failure"
category: ErrorCategory = "Bad value"

Expand Down Expand Up @@ -246,7 +244,7 @@ def get_location_value(
return self.get_location_selector()(record)


class UntemplatedReportingConfig(BaseReportingConfig):
class LegacyReportingConfig(BaseReportingConfig):
"""An untemplated reporting config. This _must_ be templated prior to use.

This class also enables the conversion of deprecated fields to their
Expand Down Expand Up @@ -356,7 +354,8 @@ def template(
else:
variables = local_variables

templated = template_object(self.dict(), variables, "jinja")
templated = template_object(self.dict(exclude=self.UNTEMPLATED_FIELDS), variables, "jinja")
templated.update(self.dict(include=self.UNTEMPLATED_FIELDS))
error_location = templated.pop("legacy_location")
reporting_field = templated.pop("legacy_reporting_field")
if templated.get("location") is None:
Expand Down
4 changes: 2 additions & 2 deletions src/dve/core_engine/backends/metadata/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing_extensions import Literal

from dve.core_engine.backends.base.reference_data import ReferenceConfigUnion
from dve.core_engine.backends.metadata.reporting import ReportingConfig, UntemplatedReportingConfig
from dve.core_engine.backends.metadata.reporting import LegacyReportingConfig, ReportingConfig
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import (
Alias,
Expand Down Expand Up @@ -234,7 +234,7 @@ class DeferredFilter(AbstractStep):
removed from the source entity if the reporting level is a record-level error.

"""
reporting: Union[ReportingConfig, UntemplatedReportingConfig]
reporting: Union[ReportingConfig, LegacyReportingConfig]
"""The reporting information for the filter."""

def template(
Expand Down
4 changes: 2 additions & 2 deletions src/dve/core_engine/configuration/v1/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pydantic import BaseModel, Field

from dve.core_engine.backends.metadata.reporting import UntemplatedReportingConfig
from dve.core_engine.backends.metadata.reporting import LegacyReportingConfig
from dve.core_engine.backends.metadata.rules import AbstractStep, DeferredFilter
from dve.core_engine.type_hints import ErrorCategory

Expand All @@ -27,7 +27,7 @@ class ConcreteFilterConfig(BaseModel):

def to_step(self) -> AbstractStep:
"""Create a deferred filter from the concrete filter config."""
reporting = UntemplatedReportingConfig(
reporting = LegacyReportingConfig(
code=self.error_code,
message=self.failure_message,
category=self.category,
Expand Down
9 changes: 2 additions & 7 deletions src/dve/core_engine/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pydantic.dataclasses import dataclass

from dve.core_engine.constants import CONTRACT_ERROR_VALUE_FIELD_NAME, ROWID_COLUMN_NAME
from dve.core_engine.templating import ENVIRONMENT, template_object
from dve.core_engine.templating import template_object
from dve.core_engine.type_hints import (
EntityName,
ErrorCategory,
Expand Down Expand Up @@ -270,12 +270,7 @@ def to_row(
if isinstance(reporting_field, list):
reporting_field = ", ".join(reporting_field)

if self.record and self.error_message:
error_message: Optional[str] = ENVIRONMENT.from_string(self.error_message).render(
**self.__dict__
)
else:
error_message = self.error_message
error_message = self.error_message

return (
self.entity,
Expand Down
2 changes: 1 addition & 1 deletion src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool):
)
reference_data.entity_cache["dve_submission_info"] = sub_info_entity

entity_manager = EntityManager(entities, reference_data)
entity_manager = EntityManager(entities=entities, reference_data=reference_data)

rule_messages = self.step_implementations.apply_rules(entity_manager, rules) # type: ignore
key_fields = {model: conf.reporting_fields for model, conf in model_config.items()}
Expand Down
80 changes: 42 additions & 38 deletions tests/features/movies.feature
Original file line number Diff line number Diff line change
@@ -1,44 +1,46 @@
Feature: Pipeline tests using the movies dataset
Tests for the processing framework which use the movies dataset.
Tests for the processing framework which use the movies dataset.

This tests submissions in JSON format, with configuration in JSON config files.
Complex types are tested (arrays, nested structs)
This tests submissions in JSON format, with configuration in JSON config files.
Complex types are tested (arrays, nested structs)

Some validation of entity attributes is performed: SQL expressions and Python filter
functions are used, and templatable business rules feature in the transformations.
Some validation of entity attributes is performed: SQL expressions and Python filter
functions are used, and templatable business rules feature in the transformations.

Scenario: Validate and filter movies (spark)
Given I submit the movies file movies.json for processing
And A spark pipeline is configured
And I create the following reference data tables in the database movies_refdata
| table_name | parquet_path |
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the movies entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
| ErrorCode | ErrorMessage | error_count |
| BLANKYEAR | year not provided | 1 |
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And At least one row from "movies" has generated error code "LIMITED_RATINGS"
And At least one row from "derived" has generated error code "RUBBISH_SEQUEL"
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 5 |
| number_record_rejections | 4 |
| number_warnings | 1 |
Given I submit the movies file movies.json for processing
And A spark pipeline is configured
And I create the following reference data tables in the database movies_refdata
| table_name | parquet_path |
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the movies entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
| ErrorCode | ErrorMessage | error_count |
| BLANKYEAR | year not provided | 1 |
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 5 |
| number_record_rejections | 4 |
| number_warnings | 1 |

Scenario: Validate and filter movies (duckdb)
Given I submit the movies file movies.json for processing
Expand All @@ -62,8 +64,10 @@ Feature: Pipeline tests using the movies dataset
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And At least one row from "movies" has generated error code "LIMITED_RATINGS"
And At least one row from "derived" has generated error code "RUBBISH_SEQUEL"
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
Expand Down
4 changes: 2 additions & 2 deletions tests/testdata/movies/movies_ddb_rule_store.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"expression": "no_of_ratings > 1",
"error_code": "LIMITED_RATINGS",
"reporting_field": "title",
"failure_message": "Movie has too few ratings"
"failure_message": "Movie has too few ratings ({{ratings}})"
}
],
"post_filter_rules": [
Expand Down Expand Up @@ -76,7 +76,7 @@
"error_code": "RUBBISH_SEQUEL",
"reporting_entity": "derived",
"reporting_field": "title",
"failure_message": "Movie has rubbish sequel",
"failure_message": "The movie {{title}} has a rubbish sequel",
"is_informational": true
}
],
Expand Down
4 changes: 2 additions & 2 deletions tests/testdata/movies/movies_spark_rule_store.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"expression": "no_of_ratings > 1",
"error_code": "LIMITED_RATINGS",
"reporting_field": "title",
"failure_message": "Movie has too few ratings"
"failure_message": "Movie has too few ratings ({{ratings}})"
}
],
"post_filter_rules": [
Expand Down Expand Up @@ -87,7 +87,7 @@
"error_code": "RUBBISH_SEQUEL",
"reporting_entity": "derived",
"reporting_field": "title",
"failure_message": "Movie has rubbish sequel",
"failure_message": "The movie {{title}} has a rubbish sequel",
"is_informational": true
}
],
Expand Down