Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
## v1.1.0 (2025-10-28)

### Feat

- Added ability to define custom error codes and templated messages for data contract feedback messages
- Added new JSON readers
- Added SparkCSVReader
- Added PolarsToDuckDBCSVReader and DuckDBCSVRepeatingReader
- Added quotechar option to DuckDBCSVReader

### Fix
- Fixed issues with refdata loader table implementations
- Fixed duckdb try_cast statements in data contract phase
- Allowed use of entity type in file transformation

## 1.0.0 (2025-10-09)

### Refactor
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ DVE configuration can be instantiated from a json (dischema) file which might be
{
"contract": {
"cache_originals": true,
"contract_error_codes": null,
"error_details": null,
"types": {},
"schemas": {},
"datasets": {
Expand Down
6 changes: 3 additions & 3 deletions docs/detailed_guidance/data_contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Lets look at the data contract configuration from [Introduction to DVE](../READM
{
"contract": {
"cache_originals": true,
"contract_error_codes": null,
"error_details": null,
"types": {},
"schemas": {},
"datasets": {
Expand Down Expand Up @@ -78,7 +78,7 @@ Here we have only filled out datasets. We've added a few more fields such as `Pe
{
"contract": {
"cache_originals": true,
"contract_error_codes": null,
"error_details": null,
"types": {
"isodate": {
"description": "an isoformatted date type",
Expand Down Expand Up @@ -172,7 +172,7 @@ We can see here that the Activity has a number of fields. `startdate`, `enddate`
{
"contract": {
"cache_originals": true,
"contract_error_codes": null,
"error_details": null,
"types": {
"isodate": {
"description": "an isoformatted date type",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"$schema": "https://json-schema.org/draft-07/schema",
"$id": "data-ingest:contract/components/contract_error_details.schema.json",
"title": "base_entity",
"description": "A mapping of field names to the custom error code and message required if these fields were to fail validation during the data contract phase. For nested fields, these should be specified using struct '.' notation (eg. fieldA.fieldB.fieldC)",
"type": "object",
"additionalProperties": {
"$ref": "field_error_type.schema.json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"$schema": "https://json-schema.org/draft-07/schema",
"$id": "data-ingest:contract/components/field_error_detail.schema.json",
"title": "field_error_detail",
"description": "The custom details to be used for a field when a validation error is raised during the data contract phase",
"type": "object",
"properties": {
"error_code": {
"description": "The code to be used for the field and error type specified",
"type": "string"
},
"error_message": {
"description": "The message to be used for the field and error type specified. This can include templating (specified using jinja2 conventions). During templating, the full record will be available with an additional __error_value to easily obtain nested offending values.",
"type": "string",
"enum": [
"record_rejection",
"file_rejection",
"warning"
]
}
},
"required": [
"error_code",
"error_message"
],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"$schema": "https://json-schema.org/draft-07/schema",
"$id": "data-ingest:contract/components/field_error_type.schema.json",
"title": "field_error_detail",
"description": "The error type for a field when a validation error is raised during the data contract phase",
"type": "object",
"properties": {
"error_type": {
"description": "The type of error the details are for",
"type": "string",
"enum": [
"Blank",
"Bad value",
"Wrong format"
],
"additionalProperties": {
"$ref": "field_error_detail.schema.json"
}
}
}
}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nhs_dve"
version = "1.0.0"
version = "1.1.0"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England <england.contactus@nhs.net>"]
readme = "README.md"
Expand Down
25 changes: 25 additions & 0 deletions src/dve/core_engine/backends/implementations/duckdb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Implementation of duckdb backend"""
from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader
from dve.core_engine.backends.readers import register_reader

from .contract import DuckDBDataContract
from .readers import (
DuckDBCSVReader,
DuckDBCSVRepeatingHeaderReader,
DuckDBXMLStreamReader,
PolarsToDuckDBCSVReader
)
from .reference_data import DuckDBRefDataLoader
from .rules import DuckDBStepImplementations

register_reader(DuckDBCSVReader)
register_reader(DuckDBCSVRepeatingHeaderReader)
register_reader(DuckDBJSONReader)
register_reader(DuckDBXMLStreamReader)
register_reader(PolarsToDuckDBCSVReader)

__all__ = [
"DuckDBDataContract",
"DuckDBRefDataLoader",
"DuckDBStepImplementations",
]
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
)
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
PYTHON_TYPE_TO_DUCKDB_TYPE,
PYTHON_TYPE_TO_POLARS_TYPE,
table_exists,
)
from dve.core_engine.backends.utilities import PYTHON_TYPE_TO_POLARS_TYPE
from dve.core_engine.models import (
AuditRecord,
ProcessingStatusRecord,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
duckdb_read_parquet,
duckdb_write_parquet,
get_duckdb_type_from_annotation,
get_polars_type_from_annotation,
relation_is_empty,
)
from dve.core_engine.backends.implementations.duckdb.types import DuckDBEntities
from dve.core_engine.backends.metadata.contract import DataContractMetadata
from dve.core_engine.backends.types import StageSuccessful
from dve.core_engine.backends.utilities import stringify_model
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, Messages
from dve.core_engine.validation import RowValidator
Expand Down Expand Up @@ -95,8 +94,8 @@ def generate_ddb_cast_statement(
Current duckdb python API doesn't play well with this currently.
"""
if not null_flag:
return f"try_cast({column_name} AS {dtype}) AS {column_name}"
return f"cast(NULL AS {dtype}) AS {column_name}"
return f'try_cast("{column_name}" AS {dtype}) AS "{column_name}"'
return f'cast(NULL AS {dtype}) AS "{column_name}"'

def apply_data_contract(
self, entities: DuckDBEntities, contract_metadata: DataContractMetadata
Expand Down
118 changes: 8 additions & 110 deletions src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@
from datetime import date, datetime
from decimal import Decimal
from pathlib import Path
from typing import Any, ClassVar, Dict, Set, Union
from typing import Any, ClassVar, Dict, Generator, Iterator, Set, Union
from urllib.parse import urlparse

import duckdb.typing as ddbtyp
import numpy as np
import polars as pl # type: ignore
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from duckdb.typing import DuckDBPyType
from pandas import DataFrame
from polars.datatypes.classes import DataTypeClass as PolarsType
from pydantic import BaseModel
from typing_extensions import Annotated, get_args, get_origin, get_type_hints

Expand Down Expand Up @@ -91,19 +89,6 @@ def __call__(self):
}
"""A mapping of Python types to the equivalent DuckDB types."""

PYTHON_TYPE_TO_POLARS_TYPE: Dict[type, PolarsType] = {
# issue with decimal conversion at the moment...
str: pl.Utf8, # type: ignore
int: pl.Int64, # type: ignore
bool: pl.Boolean, # type: ignore
float: pl.Float64, # type: ignore
bytes: pl.Binary, # type: ignore
date: pl.Date, # type: ignore
datetime: pl.Datetime, # type: ignore
Decimal: pl.Utf8, # type: ignore
}
"""A mapping of Python types to the equivalent Polars types."""


def table_exists(connection: DuckDBPyConnection, table_name: str) -> bool:
"""check if a table exists in a given DuckDBPyConnection"""
Expand Down Expand Up @@ -205,98 +190,6 @@ def get_duckdb_type_from_annotation(type_annotation: Any) -> DuckDBPyType:
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")


def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
"""Get a polars type from a Python type annotation.

Supported types are any of the following (this definition is recursive):
- Supported basic Python types. These are:
* `str`: pl.Utf8
* `int`: pl.Int64
* `bool`: pl.Boolean
* `float`: pl.Float64
* `bytes`: pl.Binary
* `datetime.date`: pl.Date
* `datetime.datetime`: pl.Datetime
* `decimal.Decimal`: pl.Decimal with precision of 38 and scale of 18
- A list of supported types (e.g. `List[str]` or `typing.List[str]`).
This will return a pl.List type (variable length)
- A `typing.Optional` type or a `typing.Union` of the type and `None` (e.g.
`typing.Optional[str]`, `typing.Union[List[str], None]`). This will remove the
'optional' wrapper and return the inner type
- A subclass of `typing.TypedDict` with values typed using supported types. This
will parse the value types as Polars types and return a Polars Struct.
- A dataclass or `pydantic.main.ModelMetaClass` with values typed using supported types.
This will parse the field types as Polars types and return a Polars Struct.
- Any supported type, with a `typing_extensions.Annotated` wrapper.
- A `decimal.Decimal` wrapped with `typing_extensions.Annotated` with a `DecimalConfig`
indicating precision and scale. This will return a Polars Decimal
with the specfied scale and precision.
- A `pydantic.types.condecimal` created type.

Any `ClassVar` types within `TypedDict`s, dataclasses, or `pydantic` models will be
ignored.

"""
type_origin = get_origin(type_annotation)

# An `Optional` or `Union` type, check to ensure non-heterogenity.
if type_origin is Union:
python_type = _get_non_heterogenous_type(get_args(type_annotation))
return get_polars_type_from_annotation(python_type)

# Type hint is e.g. `List[str]`, check to ensure non-heterogenity.
if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)):
element_type = _get_non_heterogenous_type(get_args(type_annotation))
return pl.List(get_polars_type_from_annotation(element_type)) # type: ignore

if type_origin is Annotated:
python_type, *other_args = get_args(type_annotation) # pylint: disable=unused-variable
return get_polars_type_from_annotation(python_type)
# Ensure that we have a concrete type at this point.
if not isinstance(type_annotation, type):
raise ValueError(f"Unsupported type annotation {type_annotation!r}")

if (
# Type hint is a dict subclass, but not dict. Possibly a `TypedDict`.
(issubclass(type_annotation, dict) and type_annotation is not dict)
# Type hint is a dataclass.
or is_dataclass(type_annotation)
# Type hint is a `pydantic` model.
or (type_origin is None and issubclass(type_annotation, BaseModel))
):
fields: Dict[str, PolarsType] = {}
for field_name, field_annotation in get_type_hints(type_annotation).items():
# Technically non-string keys are disallowed, but people are bad.
if not isinstance(field_name, str):
raise ValueError(
f"Dictionary/Dataclass keys must be strings, got {type_annotation!r}"
) # pragma: no cover
if get_origin(field_annotation) is ClassVar:
continue

fields[field_name] = get_polars_type_from_annotation(field_annotation)

if not fields:
raise ValueError(
f"No type annotations in dict/dataclass type (got {type_annotation!r})"
)

return pl.Struct(fields) # type: ignore

if type_annotation is list:
raise ValueError(
f"List must have type annotation (e.g. `List[str]`), got {type_annotation!r}"
)
if type_annotation is dict or type_origin is dict:
raise ValueError(f"Dict must be `typing.TypedDict` subclass, got {type_annotation!r}")

for type_ in type_annotation.mro():
polars_type = PYTHON_TYPE_TO_POLARS_TYPE.get(type_)
if polars_type:
return polars_type
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")


def coerce_inferred_numpy_array_to_list(pandas_df: DataFrame) -> DataFrame:
"""Function to modify numpy inferred array when cnverting from duckdb relation to
pandas dataframe - these cause issues with pydantic models
Expand Down Expand Up @@ -331,15 +224,20 @@ def _ddb_read_parquet(


def _ddb_write_parquet( # pylint: disable=unused-argument
self, entity: DuckDBPyRelation, target_location: URI, **kwargs
self, entity: Union[Iterator[Dict[str, Any]], DuckDBPyRelation], target_location: URI, **kwargs
) -> URI:
"""Method to write parquet files from type cast entities
following data contract application
"""
if isinstance(_get_implementation(target_location), LocalFilesystemImplementation):
Path(target_location).parent.mkdir(parents=True, exist_ok=True)

entity.to_parquet(file_name=target_location, compression="snappy", **kwargs)
if isinstance(entity, Generator):
entity = self._connection.query(
"select dta.* from (select unnest($data) as dta)", params={"data": list(entity)}
)

entity.to_parquet(file_name=target_location, compression="snappy", **kwargs) # type: ignore
return target_location


Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Readers for use with duckdb backend"""

from .csv import DuckDBCSVReader
from .csv import DuckDBCSVReader, DuckDBCSVRepeatingHeaderReader, PolarsToDuckDBCSVReader
from .json import DuckDBJSONReader
from .xml import DuckDBXMLStreamReader

__all__ = [
"DuckDBCSVReader",
"DuckDBCSVRepeatingHeaderReader",
"DuckDBJSONReader",
"DuckDBXMLStreamReader",
"PolarsToDuckDBCSVReader",
]
Loading