diff --git a/docs/README.md b/docs/README.md index 041f077..a7feef1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -18,7 +18,7 @@ DVE configuration can be instantiated from a json (dischema) file which might be { "contract": { "cache_originals": true, - "contract_error_codes": null, + "error_details": null, "types": {}, "schemas": {}, "datasets": { diff --git a/docs/detailed_guidance/data_contract.md b/docs/detailed_guidance/data_contract.md index accc95a..5be63d3 100644 --- a/docs/detailed_guidance/data_contract.md +++ b/docs/detailed_guidance/data_contract.md @@ -4,7 +4,7 @@ Lets look at the data contract configuration from [Introduction to DVE](../READM { "contract": { "cache_originals": true, - "contract_error_codes": null, + "error_details": null, "types": {}, "schemas": {}, "datasets": { @@ -78,7 +78,7 @@ Here we have only filled out datasets. We've added a few more fields such as `Pe { "contract": { "cache_originals": true, - "contract_error_codes": null, + "error_details": null, "types": { "isodate": { "description": "an isoformatted date type", @@ -172,7 +172,7 @@ We can see here that the Activity has a number of fields. `startdate`, `enddate` { "contract": { "cache_originals": true, - "contract_error_codes": null, + "error_details": null, "types": { "isodate": { "description": "an isoformatted date type", diff --git a/docs/json_schemas/contract/components/contact_error_details.schema.json b/docs/json_schemas/contract/components/contact_error_details.schema.json new file mode 100644 index 0000000..2635ac4 --- /dev/null +++ b/docs/json_schemas/contract/components/contact_error_details.schema.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema", + "$id": "data-ingest:contract/components/contract_error_details.schema.json", + "title": "base_entity", + "description": "A mapping of field names to the custom error code and message required if these fields were to fail validation during the data contract phase. For nested fields, these should be specified using struct '.' notation (eg. fieldA.fieldB.fieldC)", + "type": "object", + "additionalProperties": { + "$ref": "field_error_type.schema.json" + } +} \ No newline at end of file diff --git a/docs/json_schemas/contract/components/field_error_detail.schema.json b/docs/json_schemas/contract/components/field_error_detail.schema.json new file mode 100644 index 0000000..a0cb547 --- /dev/null +++ b/docs/json_schemas/contract/components/field_error_detail.schema.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema", + "$id": "data-ingest:contract/components/field_error_detail.schema.json", + "title": "field_error_detail", + "description": "The custom details to be used for a field when a validation error is raised during the data contract phase", + "type": "object", + "properties": { + "error_code": { + "description": "The code to be used for the field and error type specified", + "type": "string" + }, + "error_message": { + "description": "The message to be used for the field and error type specified. This can include templating (specified using jinja2 conventions). During templating, the full record will be available with an additional __error_value to easily obtain nested offending values.", + "type": "string", + "enum": [ + "record_rejection", + "file_rejection", + "warning" + ] + } + }, + "required": [ + "error_code", + "error_message" + ], + "additionalProperties": false +} \ No newline at end of file diff --git a/docs/json_schemas/contract/components/field_error_type.schema copy.json b/docs/json_schemas/contract/components/field_error_type.schema copy.json new file mode 100644 index 0000000..694948a --- /dev/null +++ b/docs/json_schemas/contract/components/field_error_type.schema copy.json @@ -0,0 +1,21 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema", + "$id": "data-ingest:contract/components/field_error_type.schema.json", + "title": "field_error_detail", + "description": "The error type for a field when a validation error is raised during the data contract phase", + "type": "object", + "properties": { + "error_type": { + "description": "The type of error the details are for", + "type": "string", + "enum": [ + "Blank", + "Bad value", + "Wrong format" + ], + "additionalProperties": { + "$ref": "field_error_detail.schema.json" + } + } + } +} \ No newline at end of file diff --git a/src/dve/core_engine/backends/implementations/duckdb/__init__.py b/src/dve/core_engine/backends/implementations/duckdb/__init__.py index e69de29..f51cbbb 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/__init__.py +++ b/src/dve/core_engine/backends/implementations/duckdb/__init__.py @@ -0,0 +1,18 @@ +"""Implementation of duckdb backend""" +from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader +from dve.core_engine.backends.readers import register_reader + +from .contract import DuckDBDataContract +from .readers import DuckDBCSVReader, DuckDBXMLStreamReader +from .reference_data import DuckDBRefDataLoader +from .rules import DuckDBStepImplementations + +register_reader(DuckDBCSVReader) +register_reader(DuckDBJSONReader) +register_reader(DuckDBXMLStreamReader) + +__all__ = [ + "DuckDBDataContract", + "DuckDBRefDataLoader", + "DuckDBStepImplementations", +] diff --git a/src/dve/core_engine/backends/implementations/duckdb/auditing.py b/src/dve/core_engine/backends/implementations/duckdb/auditing.py index 423eb44..7119548 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/auditing.py +++ b/src/dve/core_engine/backends/implementations/duckdb/auditing.py @@ -13,9 +13,9 @@ ) from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( PYTHON_TYPE_TO_DUCKDB_TYPE, - PYTHON_TYPE_TO_POLARS_TYPE, table_exists, ) +from dve.core_engine.backends.utilities import PYTHON_TYPE_TO_POLARS_TYPE from dve.core_engine.models import ( AuditRecord, ProcessingStatusRecord, diff --git a/src/dve/core_engine/backends/implementations/duckdb/contract.py b/src/dve/core_engine/backends/implementations/duckdb/contract.py index f3b653b..97ae258 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/contract.py +++ b/src/dve/core_engine/backends/implementations/duckdb/contract.py @@ -20,13 +20,12 @@ duckdb_read_parquet, duckdb_write_parquet, get_duckdb_type_from_annotation, - get_polars_type_from_annotation, relation_is_empty, ) from dve.core_engine.backends.implementations.duckdb.types import DuckDBEntities from dve.core_engine.backends.metadata.contract import DataContractMetadata from dve.core_engine.backends.types import StageSuccessful -from dve.core_engine.backends.utilities import stringify_model +from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, Messages from dve.core_engine.validation import RowValidator @@ -95,8 +94,8 @@ def generate_ddb_cast_statement( Current duckdb python API doesn't play well with this currently. """ if not null_flag: - return f"try_cast({column_name} AS {dtype}) AS {column_name}" - return f"cast(NULL AS {dtype}) AS {column_name}" + return f'try_cast("{column_name}" AS {dtype}) AS "{column_name}"' + return f'cast(NULL AS {dtype}) AS "{column_name}"' def apply_data_contract( self, entities: DuckDBEntities, contract_metadata: DataContractMetadata diff --git a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py index 8ea462e..a8656cc 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py +++ b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py @@ -6,16 +6,14 @@ from datetime import date, datetime from decimal import Decimal from pathlib import Path -from typing import Any, ClassVar, Dict, Set, Union +from typing import Any, ClassVar, Dict, Generator, Iterator, Set, Union from urllib.parse import urlparse import duckdb.typing as ddbtyp import numpy as np -import polars as pl # type: ignore from duckdb import DuckDBPyConnection, DuckDBPyRelation from duckdb.typing import DuckDBPyType from pandas import DataFrame -from polars.datatypes.classes import DataTypeClass as PolarsType from pydantic import BaseModel from typing_extensions import Annotated, get_args, get_origin, get_type_hints @@ -91,19 +89,6 @@ def __call__(self): } """A mapping of Python types to the equivalent DuckDB types.""" -PYTHON_TYPE_TO_POLARS_TYPE: Dict[type, PolarsType] = { - # issue with decimal conversion at the moment... - str: pl.Utf8, # type: ignore - int: pl.Int64, # type: ignore - bool: pl.Boolean, # type: ignore - float: pl.Float64, # type: ignore - bytes: pl.Binary, # type: ignore - date: pl.Date, # type: ignore - datetime: pl.Datetime, # type: ignore - Decimal: pl.Utf8, # type: ignore -} -"""A mapping of Python types to the equivalent Polars types.""" - def table_exists(connection: DuckDBPyConnection, table_name: str) -> bool: """check if a table exists in a given DuckDBPyConnection""" @@ -205,98 +190,6 @@ def get_duckdb_type_from_annotation(type_annotation: Any) -> DuckDBPyType: raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") -def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType: - """Get a polars type from a Python type annotation. - - Supported types are any of the following (this definition is recursive): - - Supported basic Python types. These are: - * `str`: pl.Utf8 - * `int`: pl.Int64 - * `bool`: pl.Boolean - * `float`: pl.Float64 - * `bytes`: pl.Binary - * `datetime.date`: pl.Date - * `datetime.datetime`: pl.Datetime - * `decimal.Decimal`: pl.Decimal with precision of 38 and scale of 18 - - A list of supported types (e.g. `List[str]` or `typing.List[str]`). - This will return a pl.List type (variable length) - - A `typing.Optional` type or a `typing.Union` of the type and `None` (e.g. - `typing.Optional[str]`, `typing.Union[List[str], None]`). This will remove the - 'optional' wrapper and return the inner type - - A subclass of `typing.TypedDict` with values typed using supported types. This - will parse the value types as Polars types and return a Polars Struct. - - A dataclass or `pydantic.main.ModelMetaClass` with values typed using supported types. - This will parse the field types as Polars types and return a Polars Struct. - - Any supported type, with a `typing_extensions.Annotated` wrapper. - - A `decimal.Decimal` wrapped with `typing_extensions.Annotated` with a `DecimalConfig` - indicating precision and scale. This will return a Polars Decimal - with the specfied scale and precision. - - A `pydantic.types.condecimal` created type. - - Any `ClassVar` types within `TypedDict`s, dataclasses, or `pydantic` models will be - ignored. - - """ - type_origin = get_origin(type_annotation) - - # An `Optional` or `Union` type, check to ensure non-heterogenity. - if type_origin is Union: - python_type = _get_non_heterogenous_type(get_args(type_annotation)) - return get_polars_type_from_annotation(python_type) - - # Type hint is e.g. `List[str]`, check to ensure non-heterogenity. - if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)): - element_type = _get_non_heterogenous_type(get_args(type_annotation)) - return pl.List(get_polars_type_from_annotation(element_type)) # type: ignore - - if type_origin is Annotated: - python_type, *other_args = get_args(type_annotation) # pylint: disable=unused-variable - return get_polars_type_from_annotation(python_type) - # Ensure that we have a concrete type at this point. - if not isinstance(type_annotation, type): - raise ValueError(f"Unsupported type annotation {type_annotation!r}") - - if ( - # Type hint is a dict subclass, but not dict. Possibly a `TypedDict`. - (issubclass(type_annotation, dict) and type_annotation is not dict) - # Type hint is a dataclass. - or is_dataclass(type_annotation) - # Type hint is a `pydantic` model. - or (type_origin is None and issubclass(type_annotation, BaseModel)) - ): - fields: Dict[str, PolarsType] = {} - for field_name, field_annotation in get_type_hints(type_annotation).items(): - # Technically non-string keys are disallowed, but people are bad. - if not isinstance(field_name, str): - raise ValueError( - f"Dictionary/Dataclass keys must be strings, got {type_annotation!r}" - ) # pragma: no cover - if get_origin(field_annotation) is ClassVar: - continue - - fields[field_name] = get_polars_type_from_annotation(field_annotation) - - if not fields: - raise ValueError( - f"No type annotations in dict/dataclass type (got {type_annotation!r})" - ) - - return pl.Struct(fields) # type: ignore - - if type_annotation is list: - raise ValueError( - f"List must have type annotation (e.g. `List[str]`), got {type_annotation!r}" - ) - if type_annotation is dict or type_origin is dict: - raise ValueError(f"Dict must be `typing.TypedDict` subclass, got {type_annotation!r}") - - for type_ in type_annotation.mro(): - polars_type = PYTHON_TYPE_TO_POLARS_TYPE.get(type_) - if polars_type: - return polars_type - raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") - - def coerce_inferred_numpy_array_to_list(pandas_df: DataFrame) -> DataFrame: """Function to modify numpy inferred array when cnverting from duckdb relation to pandas dataframe - these cause issues with pydantic models @@ -331,7 +224,7 @@ def _ddb_read_parquet( def _ddb_write_parquet( # pylint: disable=unused-argument - self, entity: DuckDBPyRelation, target_location: URI, **kwargs + self, entity: Union[Iterator[Dict[str, Any]], DuckDBPyRelation], target_location: URI, **kwargs ) -> URI: """Method to write parquet files from type cast entities following data contract application @@ -339,7 +232,12 @@ def _ddb_write_parquet( # pylint: disable=unused-argument if isinstance(_get_implementation(target_location), LocalFilesystemImplementation): Path(target_location).parent.mkdir(parents=True, exist_ok=True) - entity.to_parquet(file_name=target_location, compression="snappy", **kwargs) + if isinstance(entity, Generator): + entity = self._connection.query( + "select dta.* from (select unnest($data) as dta)", params={"data": list(entity)} + ) + + entity.to_parquet(file_name=target_location, compression="snappy", **kwargs) # type: ignore return target_location diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py b/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py index c322f53..236009b 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py @@ -1,9 +1,11 @@ """Readers for use with duckdb backend""" from .csv import DuckDBCSVReader +from .json import DuckDBJSONReader from .xml import DuckDBXMLStreamReader __all__ = [ "DuckDBCSVReader", + "DuckDBJSONReader", "DuckDBXMLStreamReader", ] diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index f0672b4..217d221 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -3,41 +3,17 @@ # pylint: disable=arguments-differ from typing import Any, Dict, Iterator, Type -from duckdb import DuckDBPyConnection, DuckDBPyRelation, read_csv +from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv from pydantic import BaseModel -from typing_extensions import Literal from dve.core_engine.backends.base.reader import BaseFileReader, read_function from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( duckdb_write_parquet, get_duckdb_type_from_annotation, ) +from dve.core_engine.backends.implementations.duckdb.types import SQLType from dve.core_engine.type_hints import URI, EntityName -SQLType = Literal[ - "BIGINT", - "BIT", - "BLOB", - "BOOLEAN", - "DATE", - "DECIMAL", - "DOUBLE", - "HUGEINT", - "INTEGER", - "INTERVAL", - "REAL", - "SMALLINT", - "TIME", - "UBIGINT", - "UHUGEINT", - "UINTEGER", - "USMALLINT", - "UTINYINT", - "UUID", - "VARCHAR", -] -"""SQL types recognised in duckdb""" - @duckdb_write_parquet class DuckDBCSVReader(BaseFileReader): @@ -47,13 +23,13 @@ class DuckDBCSVReader(BaseFileReader): # TODO - stringify or not def __init__( self, - header: bool, - delim: str, - connection: DuckDBPyConnection, + header: bool = True, + delim: str = ",", + connection: DuckDBPyConnection = None, ): self.header = header self.delim = delim - self._connection = connection + self._connection = connection if connection else default_connection super().__init__() @@ -61,7 +37,7 @@ def read_to_py_iterator( self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] ) -> Iterator[Dict[str, Any]]: """Creates an iterable object of rows as dictionaries""" - return self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True) + yield from self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True) @read_function(DuckDBPyRelation) def read_to_relation( # pylint: disable=unused-argument diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/json.py b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py new file mode 100644 index 0000000..f8f3a77 --- /dev/null +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/json.py @@ -0,0 +1,44 @@ +"""A csv reader to create duckdb relations""" + +# pylint: disable=arguments-differ +from typing import Any, Dict, Iterator, Optional, Type + +from duckdb import DuckDBPyRelation, read_json +from pydantic import BaseModel + +from dve.core_engine.backends.base.reader import BaseFileReader, read_function +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( + duckdb_write_parquet, + get_duckdb_type_from_annotation, +) +from dve.core_engine.backends.implementations.duckdb.types import SQLType +from dve.core_engine.type_hints import URI, EntityName + + +@duckdb_write_parquet +class DuckDBJSONReader(BaseFileReader): + """A reader for JSON files""" + + def __init__(self, json_format: Optional[str] = "array"): + self._json_format = json_format + + super().__init__() + + def read_to_py_iterator( + self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] + ) -> Iterator[Dict[str, Any]]: + """Creates an iterable object of rows as dictionaries""" + return self.read_to_relation(resource, entity_name, schema).pl().iter_rows(named=True) + + @read_function(DuckDBPyRelation) + def read_to_relation( # pylint: disable=unused-argument + self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] + ) -> DuckDBPyRelation: + """Returns a relation object from the source json""" + + ddb_schema: Dict[str, SQLType] = { + fld.name: str(get_duckdb_type_from_annotation(fld.annotation)) # type: ignore + for fld in schema.__fields__.values() + } + + return read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py index 5d66c95..af1147f 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/xml.py @@ -1,19 +1,16 @@ # mypy: disable-error-code="attr-defined" """An xml reader to create duckdb relations""" -from typing import Dict, Type +from typing import Dict, Optional, Type import polars as pl -from duckdb import DuckDBPyConnection, DuckDBPyRelation +from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection from pydantic import BaseModel from dve.core_engine.backends.base.reader import read_function -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( - duckdb_write_parquet, - get_polars_type_from_annotation, -) +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet from dve.core_engine.backends.readers.xml import XMLStreamReader -from dve.core_engine.backends.utilities import stringify_model +from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.type_hints import URI @@ -21,15 +18,15 @@ class DuckDBXMLStreamReader(XMLStreamReader): """A reader for XML files""" - def __init__(self, ddb_connection: DuckDBPyConnection, **kwargs): - self.ddb_connection = ddb_connection + def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs): + self.ddb_connection = ddb_connection if ddb_connection else default_connection super().__init__(**kwargs) @read_function(DuckDBPyRelation) def read_to_relation(self, resource: URI, entity_name: str, schema: Type[BaseModel]): """Returns a relation object from the source xml""" polars_schema: Dict[str, pl.DataType] = { # type: ignore - fld.name: get_polars_type_from_annotation(fld.type_) + fld.name: get_polars_type_from_annotation(fld.annotation) for fld in stringify_model(schema).__fields__.values() } diff --git a/src/dve/core_engine/backends/implementations/duckdb/reference_data.py b/src/dve/core_engine/backends/implementations/duckdb/reference_data.py index 406f95d..98e80bb 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/reference_data.py +++ b/src/dve/core_engine/backends/implementations/duckdb/reference_data.py @@ -42,7 +42,7 @@ def __init__( def load_table(self, config: ReferenceTable) -> DuckDBPyRelation: """Load reference entity from a database table""" - return self.connection.table(f"{config.fq_table_name}") + return self.connection.sql(f"select * from {config.fq_table_name}") def load_file(self, config: ReferenceFile) -> DuckDBPyRelation: "Load reference entity from a relative file path" diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py index a0a0cbd..e4db6a1 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/rules.py +++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py @@ -57,7 +57,7 @@ @duckdb_write_parquet @duckdb_read_parquet -class DuckDBStepImplemetations(BaseStepImplementations[DuckDBPyRelation]): +class DuckDBStepImplementations(BaseStepImplementations[DuckDBPyRelation]): """An implementation of transformation steps in duckdb.""" def __init__(self, connection: DuckDBPyConnection, **kwargs): diff --git a/src/dve/core_engine/backends/implementations/duckdb/types.py b/src/dve/core_engine/backends/implementations/duckdb/types.py index d321d3b..21e2615 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/types.py +++ b/src/dve/core_engine/backends/implementations/duckdb/types.py @@ -4,9 +4,34 @@ from typing import MutableMapping from duckdb import DuckDBPyRelation +from typing_extensions import Literal from dve.core_engine.type_hints import EntityName +SQLType = Literal[ + "BIGINT", + "BIT", + "BLOB", + "BOOLEAN", + "DATE", + "DECIMAL", + "DOUBLE", + "HUGEINT", + "INTEGER", + "INTERVAL", + "REAL", + "SMALLINT", + "TIME", + "UBIGINT", + "UHUGEINT", + "UINTEGER", + "USMALLINT", + "UTINYINT", + "UUID", + "VARCHAR", +] +"""SQL types recognised in duckdb""" + Source = DuckDBPyRelation """The source entity for a join. This will be aliased to the source entity name.""" Target = DuckDBPyRelation diff --git a/src/dve/core_engine/backends/implementations/spark/__init__.py b/src/dve/core_engine/backends/implementations/spark/__init__.py index 4043ce2..80cf4e4 100644 --- a/src/dve/core_engine/backends/implementations/spark/__init__.py +++ b/src/dve/core_engine/backends/implementations/spark/__init__.py @@ -4,11 +4,15 @@ from .backend import SparkBackend from .contract import SparkDataContract -from .readers import SparkXMLReader +from .readers import SparkCSVReader, SparkJSONReader, SparkXMLReader, SparkXMLStreamReader from .reference_data import SparkRefDataLoader from .rules import SparkStepImplementations +register_reader(SparkCSVReader) +register_reader(SparkJSONReader) register_reader(SparkXMLReader) +register_reader(SparkXMLStreamReader) + __all__ = [ "SparkBackend", diff --git a/src/dve/core_engine/backends/implementations/spark/readers/__init__.py b/src/dve/core_engine/backends/implementations/spark/readers/__init__.py index 30a38e8..e89dc91 100644 --- a/src/dve/core_engine/backends/implementations/spark/readers/__init__.py +++ b/src/dve/core_engine/backends/implementations/spark/readers/__init__.py @@ -1,11 +1,15 @@ """Spark-specific readers.""" +from dve.core_engine.backends.implementations.spark.readers.csv import SparkCSVReader +from dve.core_engine.backends.implementations.spark.readers.json import SparkJSONReader from dve.core_engine.backends.implementations.spark.readers.xml import ( SparkXMLReader, SparkXMLStreamReader, ) __all__ = [ + "SparkCSVReader", + "SparkJSONReader", "SparkXMLReader", "SparkXMLStreamReader", ] diff --git a/src/dve/core_engine/backends/implementations/spark/readers/csv.py b/src/dve/core_engine/backends/implementations/spark/readers/csv.py new file mode 100644 index 0000000..5e21714 --- /dev/null +++ b/src/dve/core_engine/backends/implementations/spark/readers/csv.py @@ -0,0 +1,76 @@ +"""A reader implementation using the Databricks Spark CSV reader.""" + + +from typing import Any, Dict, Iterator, Type + +from pydantic import BaseModel +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.types import StructType + +from dve.core_engine.backends.base.reader import BaseFileReader, read_function +from dve.core_engine.backends.exceptions import EmptyFileError +from dve.core_engine.backends.implementations.spark.spark_helpers import ( + get_type_from_annotation, + spark_write_parquet, +) +from dve.core_engine.type_hints import URI, EntityName +from dve.parser.file_handling import get_content_length + + +@spark_write_parquet +class SparkCSVReader(BaseFileReader): + """A Spark reader for CSV files.""" + + def __init__( + self, + *, + delimiter: str = ",", + escape_char: str = "\\", + quote_char: str = '"', + header: bool = True, + multi_line: bool = False, + encoding: str = "utf-8-sig", + spark_session: SparkSession = None, + ) -> None: + + self.delimiter = delimiter + self.escape_char = escape_char + self.encoding = encoding + self.quote_char = quote_char + self.header = header + self.multi_line = multi_line + self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate() + + super().__init__() + + def read_to_py_iterator( + self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] + ) -> Iterator[Dict[URI, Any]]: + df = self.read_to_dataframe(resource, entity_name, schema) + yield from (record.asDict(True) for record in df.toLocalIterator()) + + @read_function(DataFrame) + def read_to_dataframe( + self, + resource: URI, + entity_name: EntityName, # pylint: disable=unused-argument + schema: Type[BaseModel], + ) -> DataFrame: + """Read a CSV file directly to a Spark DataFrame.""" + if get_content_length(resource) == 0: + raise EmptyFileError(f"File at {resource} is empty.") + + spark_schema: StructType = get_type_from_annotation(schema) + kwargs = { + "sep": self.delimiter, + "header": self.header, + "escape": self.escape_char, + "quote": self.quote_char, + "multiLine": self.multi_line, + } + + return ( + self.spark_session.read.format("csv") + .options(**kwargs) # type: ignore + .load(resource, schema=spark_schema) + ) diff --git a/src/dve/core_engine/backends/implementations/spark/readers/json.py b/src/dve/core_engine/backends/implementations/spark/readers/json.py new file mode 100644 index 0000000..6c1902b --- /dev/null +++ b/src/dve/core_engine/backends/implementations/spark/readers/json.py @@ -0,0 +1,65 @@ +"""A reader implementation using the Databricks Spark JSON reader.""" + + +from typing import Any, Dict, Iterator, Optional, Type + +from pydantic import BaseModel +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.types import StructType + +from dve.core_engine.backends.base.reader import BaseFileReader, read_function +from dve.core_engine.backends.exceptions import EmptyFileError +from dve.core_engine.backends.implementations.spark.spark_helpers import ( + get_type_from_annotation, + spark_write_parquet, +) +from dve.core_engine.type_hints import URI, EntityName +from dve.parser.file_handling import get_content_length + + +@spark_write_parquet +class SparkJSONReader(BaseFileReader): + """A Spark reader for JSON files.""" + + def __init__( + self, + *, + encoding: Optional[str] = "utf-8", + multi_line: Optional[bool] = True, + spark_session: Optional[SparkSession] = None, + ) -> None: + + self.encoding = encoding + self.multi_line = multi_line + self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate() + + super().__init__() + + def read_to_py_iterator( + self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] + ) -> Iterator[Dict[URI, Any]]: + df = self.read_to_dataframe(resource, entity_name, schema) + yield from (record.asDict(True) for record in df.toLocalIterator()) + + @read_function(DataFrame) + def read_to_dataframe( + self, + resource: URI, + entity_name: EntityName, # pylint: disable=unused-argument + schema: Type[BaseModel], + ) -> DataFrame: + """Read a JSON file directly to a Spark DataFrame.""" + if get_content_length(resource) == 0: + raise EmptyFileError(f"File at {resource} is empty.") + + spark_schema: StructType = get_type_from_annotation(schema) + kwargs = { + "encoding": self.encoding, + "multiline": self.multi_line, + } + + return ( + self.spark_session.read.format("json") + .options(**kwargs) # type: ignore + .load(resource, schema=spark_schema) + ) diff --git a/src/dve/core_engine/backends/implementations/spark/spark_helpers.py b/src/dve/core_engine/backends/implementations/spark/spark_helpers.py index 62467cf..12f32d3 100644 --- a/src/dve/core_engine/backends/implementations/spark/spark_helpers.py +++ b/src/dve/core_engine/backends/implementations/spark/spark_helpers.py @@ -16,6 +16,8 @@ Callable, ClassVar, Dict, + Generator, + Iterator, List, Optional, Set, @@ -345,18 +347,19 @@ def _spark_read_parquet(self, path: URI, **kwargs) -> DataFrame: def _spark_write_parquet( # pylint: disable=unused-argument - self, entity: DataFrame, target_location: URI, **kwargs + self, entity: Union[Iterator[Dict[str, Any]], DataFrame], target_location: URI, **kwargs ) -> URI: """Method to write parquet files from type cast entities following data contract application """ - _options = {"schema": entity.schema, **kwargs} - ( - entity.write.options(**_options) # type: ignore - .format("parquet") - .mode("overwrite") - .save(target_location) - ) + _options: Dict[str, Any] = {**kwargs} + if isinstance(entity, Generator): + _writer = self.spark_session.createDataFrame(entity).write + else: + _options["schema"] = entity.schema # type: ignore + _writer = entity.write # type: ignore + + (_writer.options(**_options).format("parquet").mode("overwrite").save(target_location)) return target_location diff --git a/src/dve/core_engine/backends/readers/csv.py b/src/dve/core_engine/backends/readers/csv.py index 8f80fae..6969a32 100644 --- a/src/dve/core_engine/backends/readers/csv.py +++ b/src/dve/core_engine/backends/readers/csv.py @@ -14,10 +14,7 @@ FieldCountMismatch, MissingHeaderError, ) -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( - get_polars_type_from_annotation, -) -from dve.core_engine.backends.utilities import stringify_model +from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.type_hints import EntityName from dve.parser.file_handling import get_content_length, open_stream from dve.parser.file_handling.implementations.file import file_uri_to_local_path @@ -221,7 +218,7 @@ def write_parquet( # type: ignore target_location = file_uri_to_local_path(target_location).as_posix() if schema: polars_schema: Dict[str, pl.DataType] = { # type: ignore - fld.name: get_polars_type_from_annotation(fld.type_) + fld.name: get_polars_type_from_annotation(fld.annotation) for fld in stringify_model(schema).__fields__.values() } diff --git a/src/dve/core_engine/backends/readers/xml.py b/src/dve/core_engine/backends/readers/xml.py index b6708c6..ca55eb9 100644 --- a/src/dve/core_engine/backends/readers/xml.py +++ b/src/dve/core_engine/backends/readers/xml.py @@ -11,10 +11,7 @@ from dve.core_engine.backends.base.reader import BaseFileReader from dve.core_engine.backends.exceptions import EmptyFileError -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( - get_polars_type_from_annotation, -) -from dve.core_engine.backends.utilities import stringify_model +from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model from dve.core_engine.loggers import get_logger from dve.core_engine.type_hints import URI, EntityName from dve.parser.file_handling import NonClosingTextIOWrapper, get_content_length, open_stream diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index 7c1cb98..3086c81 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -1,10 +1,16 @@ """Necessary, otherwise uncategorised backend functionality.""" import sys -from typing import Type +from dataclasses import is_dataclass +from datetime import date, datetime +from decimal import Decimal +from typing import Any, ClassVar, Dict, Type, Union +import polars as pl # type: ignore +from polars.datatypes.classes import DataTypeClass as PolarsType from pydantic import BaseModel, create_model +from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type from dve.core_engine.type_hints import Messages # We need to rely on a Python typing implementation detail in Python <= 3.7. @@ -12,9 +18,22 @@ # Crimes against typing. from typing import _GenericAlias # type: ignore - from typing_extensions import get_args, get_origin + from typing_extensions import Annotated, get_args, get_origin, get_type_hints else: - from typing import get_args, get_origin + from typing import Annotated, get_args, get_origin, get_type_hints + +PYTHON_TYPE_TO_POLARS_TYPE: Dict[type, PolarsType] = { + # issue with decimal conversion at the moment... + str: pl.Utf8, # type: ignore + int: pl.Int64, # type: ignore + bool: pl.Boolean, # type: ignore + float: pl.Float64, # type: ignore + bytes: pl.Binary, # type: ignore + date: pl.Date, # type: ignore + datetime: pl.Datetime, # type: ignore + Decimal: pl.Utf8, # type: ignore +} +"""A mapping of Python types to the equivalent Polars types.""" def stringify_type(type_: type) -> type: @@ -61,3 +80,95 @@ def dedup_messages(messages: Messages) -> Messages: """ return list(dict.fromkeys(messages)) + + +def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType: + """Get a polars type from a Python type annotation. + + Supported types are any of the following (this definition is recursive): + - Supported basic Python types. These are: + * `str`: pl.Utf8 + * `int`: pl.Int64 + * `bool`: pl.Boolean + * `float`: pl.Float64 + * `bytes`: pl.Binary + * `datetime.date`: pl.Date + * `datetime.datetime`: pl.Datetime + * `decimal.Decimal`: pl.Decimal with precision of 38 and scale of 18 + - A list of supported types (e.g. `List[str]` or `typing.List[str]`). + This will return a pl.List type (variable length) + - A `typing.Optional` type or a `typing.Union` of the type and `None` (e.g. + `typing.Optional[str]`, `typing.Union[List[str], None]`). This will remove the + 'optional' wrapper and return the inner type + - A subclass of `typing.TypedDict` with values typed using supported types. This + will parse the value types as Polars types and return a Polars Struct. + - A dataclass or `pydantic.main.ModelMetaClass` with values typed using supported types. + This will parse the field types as Polars types and return a Polars Struct. + - Any supported type, with a `typing_extensions.Annotated` wrapper. + - A `decimal.Decimal` wrapped with `typing_extensions.Annotated` with a `DecimalConfig` + indicating precision and scale. This will return a Polars Decimal + with the specfied scale and precision. + - A `pydantic.types.condecimal` created type. + + Any `ClassVar` types within `TypedDict`s, dataclasses, or `pydantic` models will be + ignored. + + """ + type_origin = get_origin(type_annotation) + + # An `Optional` or `Union` type, check to ensure non-heterogenity. + if type_origin is Union: + python_type = _get_non_heterogenous_type(get_args(type_annotation)) + return get_polars_type_from_annotation(python_type) + + # Type hint is e.g. `List[str]`, check to ensure non-heterogenity. + if type_origin is list or (isinstance(type_origin, type) and issubclass(type_origin, list)): + element_type = _get_non_heterogenous_type(get_args(type_annotation)) + return pl.List(get_polars_type_from_annotation(element_type)) # type: ignore + + if type_origin is Annotated: + python_type, *other_args = get_args(type_annotation) # pylint: disable=unused-variable + return get_polars_type_from_annotation(python_type) + # Ensure that we have a concrete type at this point. + if not isinstance(type_annotation, type): + raise ValueError(f"Unsupported type annotation {type_annotation!r}") + + if ( + # Type hint is a dict subclass, but not dict. Possibly a `TypedDict`. + (issubclass(type_annotation, dict) and type_annotation is not dict) + # Type hint is a dataclass. + or is_dataclass(type_annotation) + # Type hint is a `pydantic` model. + or (type_origin is None and issubclass(type_annotation, BaseModel)) + ): + fields: Dict[str, PolarsType] = {} + for field_name, field_annotation in get_type_hints(type_annotation).items(): + # Technically non-string keys are disallowed, but people are bad. + if not isinstance(field_name, str): + raise ValueError( + f"Dictionary/Dataclass keys must be strings, got {type_annotation!r}" + ) # pragma: no cover + if get_origin(field_annotation) is ClassVar: + continue + + fields[field_name] = get_polars_type_from_annotation(field_annotation) + + if not fields: + raise ValueError( + f"No type annotations in dict/dataclass type (got {type_annotation!r})" + ) + + return pl.Struct(fields) # type: ignore + + if type_annotation is list: + raise ValueError( + f"List must have type annotation (e.g. `List[str]`), got {type_annotation!r}" + ) + if type_annotation is dict or type_origin is dict: + raise ValueError(f"Dict must be `typing.TypedDict` subclass, got {type_annotation!r}") + + for type_ in type_annotation.mro(): + polars_type = PYTHON_TYPE_TO_POLARS_TYPE.get(type_) + if polars_type: + return polars_type + raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}") diff --git a/src/dve/core_engine/configuration/v1/__init__.py b/src/dve/core_engine/configuration/v1/__init__.py index c1389bd..ba1b9b6 100644 --- a/src/dve/core_engine/configuration/v1/__init__.py +++ b/src/dve/core_engine/configuration/v1/__init__.py @@ -21,7 +21,8 @@ BusinessRuleSpecConfig, ) from dve.core_engine.configuration.v1.steps import StepConfigUnion -from dve.core_engine.type_hints import EntityName, TemplateVariables +from dve.core_engine.message import DataContractErrorDetail +from dve.core_engine.type_hints import EntityName, ErrorCategory, ErrorType, TemplateVariables from dve.core_engine.validation import RowValidator from dve.parser.file_handling import joinuri, open_stream from dve.parser.type_hints import URI, Extension @@ -136,8 +137,8 @@ class V1DataContractConfig(BaseModel): cache_originals: bool = False """Whether to cache the original entities after loading.""" - contract_error_codes: Optional[URI] = None - """Optional URI to json file containing data contract error codes""" + error_details: Optional[URI] = None + """Optional URI containing custom data contract error codes and messages""" types: Dict[TypeName, TypeOrDef] = Field(default_factory=dict) """Dataset specific types defined within the config.""" schemas: Dict[SchemaName, _SchemaConfig] = Field(default_factory=dict) @@ -302,9 +303,9 @@ def get_contract_metadata(self) -> DataContractMetadata: reporting_fields = {} contract_dict = self.contract.dict() - error_codes = {} - if self.contract.contract_error_codes: - error_codes = self.load_error_codes(self.contract.contract_error_codes) + error_info = {} + if self.contract.error_details: + error_info = self.load_error_message_info(self.contract.error_details) for entity_name, dataset_config in self.contract.datasets.items(): reader_metadata[entity_name] = { ext: ReaderConfig(reader=config.reader, parameters=config.kwargs_) @@ -312,7 +313,7 @@ def get_contract_metadata(self) -> DataContractMetadata: } reporting_fields[entity_name] = dataset_config.reporting_fields validators[entity_name] = RowValidator( - contract_dict, entity_name, error_codes=error_codes + contract_dict, entity_name, error_info=error_info ) return DataContractMetadata( @@ -322,8 +323,8 @@ def get_contract_metadata(self) -> DataContractMetadata: cache_originals=self.contract.cache_originals, ) - def load_error_codes(self, uri): - """Load data contract error codes from json file""" + def load_error_message_info(self, uri): + """Load data contract error info from json file""" uri_prefix = self.location.rsplit("/", 1)[0] with open_stream(joinuri(uri_prefix, uri)) as stream: return json.load(stream) diff --git a/src/dve/core_engine/constants.py b/src/dve/core_engine/constants.py index 8629d44..d452c9b 100644 --- a/src/dve/core_engine/constants.py +++ b/src/dve/core_engine/constants.py @@ -2,3 +2,7 @@ ROWID_COLUMN_NAME: str = "__rowid__" """The name of the column containing the row ID for each entity.""" + +CONTRACT_ERROR_VALUE_FIELD_NAME: str = "__error_value" +"""The name of the field that can be used to extract the field value that caused + a pydantic validation error""" diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index 8afe9fd..9b37239 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -1,25 +1,66 @@ """Functionality to represent messages.""" +import copy import datetime as dt import json +import operator from decimal import Decimal -from typing import Any, Callable, ClassVar, Dict, List, Optional, Set, Type, Union +from functools import reduce +from typing import Any, Callable, ClassVar, Dict, List, Optional, Set, Tuple, Type, Union -from pydantic import ValidationError, validator +from pydantic import BaseModel, ValidationError, validator from pydantic.dataclasses import dataclass -from dve.core_engine.constants import ROWID_COLUMN_NAME -from dve.core_engine.templating import ENVIRONMENT +from dve.core_engine.constants import CONTRACT_ERROR_VALUE_FIELD_NAME, ROWID_COLUMN_NAME +from dve.core_engine.templating import ENVIRONMENT, template_object from dve.core_engine.type_hints import ( EntityName, ErrorCategory, - ErrorCode, FailureType, - Field, Messages, MessageTuple, Record, ) +from dve.parser.type_hints import FieldName + + +class DataContractErrorDetail(BaseModel): + """Define custom error codes for validation issues raised during the data contract phase""" + + error_code: str + error_message: Optional[str] = None + + def template_message( + self, + variables: Dict[str, Any], + error_location: Optional[Tuple[Union[str, int], ...]] = None, + ) -> Optional[str]: + """Template error messages with values from the record""" + if error_location: + variables = self.extract_error_value(variables, error_location) + return template_object(self.error_message, variables) + + @staticmethod + def extract_error_value(records, error_location): + """For nested errors, extract the offending value for easy access during templating.""" + _records = copy.copy(records) + try: + _records[CONTRACT_ERROR_VALUE_FIELD_NAME] = reduce( + operator.getitem, error_location, _records + ) + except KeyError: + pass + return _records + + +DEFAULT_ERROR_DETAIL: Dict[ErrorCategory, DataContractErrorDetail] = { + "Blank": DataContractErrorDetail(error_code="FieldBlank", error_message="cannot be blank"), + "Bad value": DataContractErrorDetail(error_code="BadValue", error_message="is invalid"), + "Wrong format": DataContractErrorDetail( + error_code="WrongFormat", error_message="has wrong format" + ), +} + INTEGRITY_ERROR_CODES: Set[str] = {"blockingsubmission"} """ @@ -153,16 +194,17 @@ def from_pydantic_error( entity: str, record: Record, error: ValidationError, - error_codes: Dict[Field, ErrorCode], + error_details: Optional[ + Dict[FieldName, Dict[ErrorCategory, DataContractErrorDetail]] + ] = None, ) -> Messages: """Create messages from a `pydantic` validation error.""" + error_details = {} if not error_details else error_details messages: Messages = [] for error_dict in error.errors(): error_type = error_dict["type"] - msg = "is invalid" if "none.not_allowed" in error_type or "value_error.missing" in error_type: category = "Blank" - msg = "cannot be blank" else: category = "Bad value" error_code = error_type @@ -176,9 +218,15 @@ def from_pydantic_error( else: failure_type = "record" + error_field = ".".join([idx for idx in error_dict["loc"] if not isinstance(idx, int)]) + is_informational = False if error_code.endswith("warning"): is_informational = True + error_detail: DataContractErrorDetail = error_details.get( # type: ignore + error_field, DEFAULT_ERROR_DETAIL + ).get(category) + messages.append( cls( entity=entity, @@ -187,10 +235,10 @@ def from_pydantic_error( is_informational=is_informational, error_type=error_type, error_location=error_dict["loc"], # type: ignore - error_message=msg, + error_message=error_detail.template_message(record, error_dict["loc"]), reporting_field=error_dict["loc"][-1], # type: ignore category=category, # type: ignore - error_code=error_codes.get(error_dict["loc"][-1]), # type: ignore + error_code=error_detail.error_code, # type: ignore ) ) diff --git a/src/dve/core_engine/templating.py b/src/dve/core_engine/templating.py index 049fbfd..4fc39f7 100644 --- a/src/dve/core_engine/templating.py +++ b/src/dve/core_engine/templating.py @@ -83,7 +83,7 @@ def template_object( if isinstance(object_, str): if method == "jinja": - return ENVIRONMENT.from_string(object_).render(**variables) # type: ignore + return ENVIRONMENT.from_string(object_).render(variables) # type: ignore return object_.format(**variables) # type: ignore parameterise = partial(template_object, variables=variables, method=method) diff --git a/src/dve/core_engine/validation.py b/src/dve/core_engine/validation.py index d2f3fda..d21e7e2 100644 --- a/src/dve/core_engine/validation.py +++ b/src/dve/core_engine/validation.py @@ -1,15 +1,16 @@ """XML schema/contract configuration.""" import warnings -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from pydantic import ValidationError from pydantic.main import ModelMetaclass -from dve.core_engine.message import FeedbackMessage -from dve.core_engine.type_hints import ContractContents, EntityName, Messages, Record +from dve.core_engine.message import DEFAULT_ERROR_DETAIL, DataContractErrorDetail, FeedbackMessage +from dve.core_engine.type_hints import ContractContents, EntityName, ErrorCategory, Messages, Record from dve.metadata_parser.exc import EntityNotFoundError from dve.metadata_parser.model_generator import JSONtoPyd +from dve.parser.type_hints import FieldName class RowValidator: @@ -27,16 +28,20 @@ def __init__( model_definition: ContractContents, entity_name: EntityName, validators: Optional[dict] = None, - error_codes: Optional[dict] = None, + error_info: Optional[dict] = None, ): self._model_definition = model_definition self._validators = validators self.entity_name = entity_name self._model: Optional[ModelMetaclass] = None - self.error_codes = error_codes or {} + self._error_info = error_info or {} + self._error_details: Optional[ + Dict[FieldName, Dict[ErrorCategory, DataContractErrorDetail]] + ] = None - def __reduce__(self): # Don't attempt to pickle the Pydantic model. + def __reduce__(self): # Don't attempt to pickle Pydantic models. self._model = None + self._error_details = None return super().__reduce__() @property @@ -55,6 +60,20 @@ def model(self) -> ModelMetaclass: self._model = model return self._model + @property + def error_details(self) -> Dict[FieldName, Dict[ErrorCategory, DataContractErrorDetail]]: + """Custom error code and message mapping for contract phase""" + if not self._error_details: + _error_details = { + field: { + err_type: DataContractErrorDetail(**detail) + for err_type, detail in err_details.items() + } + for field, err_details in self._error_info.items() + } + self._error_details = _error_details + return self._error_details + def __call__(self, record: Record) -> Tuple[Optional[Record], Messages]: """Take a record, returning a validated record (is successful) and a list of messages.""" with warnings.catch_warnings(record=True) as caught_warnings: @@ -69,7 +88,7 @@ def __call__(self, record: Record) -> Tuple[Optional[Record], Messages]: messages.extend(self.handle_warnings(record, caught_warnings)) messages.extend( FeedbackMessage.from_pydantic_error( - self.entity_name, record, err, self.error_codes + self.entity_name, record, err, self.error_details ) ) return None, messages @@ -106,6 +125,11 @@ def handle_warnings(self, record, caught_warnings) -> List[FeedbackMessage]: break else: error_location = None + error_code = ( + self.error_details.get(error_location, DEFAULT_ERROR_DETAIL) # type: ignore + .get("Wrong format") + .error_code + ) messages.append( FeedbackMessage( @@ -117,7 +141,7 @@ def handle_warnings(self, record, caught_warnings) -> List[FeedbackMessage]: error_location=error_location, error_message=error_message, category="Wrong format", - error_code=self.error_codes.get(error_location, ""), + error_code=error_code, ) ) return messages diff --git a/src/dve/pipeline/duckdb_pipeline.py b/src/dve/pipeline/duckdb_pipeline.py index 54336bc..1287d6b 100644 --- a/src/dve/pipeline/duckdb_pipeline.py +++ b/src/dve/pipeline/duckdb_pipeline.py @@ -2,13 +2,14 @@ from typing import Optional, Type -from duckdb import DuckDBPyConnection +from duckdb import DuckDBPyConnection, DuckDBPyRelation from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager from dve.core_engine.backends.implementations.duckdb.contract import DuckDBDataContract from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count -from dve.core_engine.backends.implementations.duckdb.rules import DuckDBStepImplemetations +from dve.core_engine.backends.implementations.duckdb.rules import DuckDBStepImplementations +from dve.core_engine.models import SubmissionInfo from dve.core_engine.type_hints import URI from dve.pipeline.pipeline import BaseDVEPipeline @@ -35,9 +36,17 @@ def __init__( audit_tables, job_run_id, DuckDBDataContract(connection=self._connection), - DuckDBStepImplemetations.register_udfs(connection=self._connection), + DuckDBStepImplementations.register_udfs(connection=self._connection), rules_path, processed_files_path, submitted_files_path, reference_data_loader, ) + + # pylint: disable=arguments-differ + def write_file_to_parquet( # type: ignore + self, submission_file_uri: URI, submission_info: SubmissionInfo, output: URI + ): + return super().write_file_to_parquet( + submission_file_uri, submission_info, output, DuckDBPyRelation + ) diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index a696e35..507ce3b 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -188,7 +188,7 @@ def write_file_to_parquet( submission_file_uri: URI, submission_info: SubmissionInfo, output: URI, - entity_type: Optional[EntityType] = None, # type: ignore + entity_type: Optional[EntityType] = None, ): """Takes a submission file and a valid submission_info and converts the file to parquet""" @@ -202,9 +202,7 @@ def write_file_to_parquet( errors = [] for model_name, model in models.items(): - reader: BaseFileReader = load_reader( - dataset, model_name, submission_info.dataset_id, ext - ) + reader: BaseFileReader = load_reader(dataset, model_name, ext) try: if not entity_type: reader.write_parquet( @@ -500,9 +498,9 @@ def apply_business_rules(self, submission_info: SubmissionInfo, failed: bool): for parquet_uri, _ in fh.iter_prefix(contract): file_name = fh.get_file_name(parquet_uri) - entities[file_name] = self.step_implementations.read_parquet(parquet_uri) # type: ignore - entities[file_name] = self.step_implementations.add_row_id(entities[file_name]) # type: ignore - entities[f"Original{file_name}"] = self.step_implementations.read_parquet(parquet_uri) # type: ignore + entities[file_name] = self.step_implementations.read_parquet(parquet_uri) # type: ignore + entities[file_name] = self.step_implementations.add_row_id(entities[file_name]) # type: ignore + entities[f"Original{file_name}"] = self.step_implementations.read_parquet(parquet_uri) # type: ignore sub_info_entity = ( self._audit_tables._submission_info.conv_to_entity( # pylint: disable=protected-access diff --git a/src/dve/pipeline/spark_pipeline.py b/src/dve/pipeline/spark_pipeline.py index 335ee86..60b7b18 100644 --- a/src/dve/pipeline/spark_pipeline.py +++ b/src/dve/pipeline/spark_pipeline.py @@ -3,7 +3,7 @@ from concurrent.futures import Executor from typing import List, Optional, Tuple, Type -from pyspark.sql import SparkSession +from pyspark.sql import DataFrame, SparkSession from dve.core_engine.backends.base.reference_data import BaseRefDataLoader from dve.core_engine.backends.implementations.spark.auditing import SparkAuditingManager @@ -45,6 +45,14 @@ def __init__( reference_data_loader, ) + # pylint: disable=arguments-differ + def write_file_to_parquet( # type: ignore + self, submission_file_uri: URI, submission_info: SubmissionInfo, output: URI + ): + return super().write_file_to_parquet( + submission_file_uri, submission_info, output, DataFrame + ) + def business_rule_step( self, pool: Executor, diff --git a/src/dve/pipeline/utils.py b/src/dve/pipeline/utils.py index 9a948ff..f4b6620 100644 --- a/src/dve/pipeline/utils.py +++ b/src/dve/pipeline/utils.py @@ -6,10 +6,10 @@ from pydantic.main import ModelMetaclass from pyspark.sql import SparkSession +import dve.core_engine.backends.implementations.duckdb # pylint: disable=unused-import +import dve.core_engine.backends.implementations.spark # pylint: disable=unused-import import dve.parser.file_handling as fh -from dve.core_engine.backends.base.reader import BaseFileReader -from dve.core_engine.backends.readers.csv import CSVFileReader -from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader +from dve.core_engine.backends.readers import _READER_REGISTRY from dve.core_engine.configuration.v1 import SchemaName, V1EngineConfig, _ModelConfig from dve.core_engine.type_hints import URI, SubmissionResult from dve.metadata_parser.model_generator import JSONtoPyd @@ -17,14 +17,6 @@ Dataset = Dict[SchemaName, _ModelConfig] _configs: Dict[str, Tuple[Dict[str, ModelMetaclass], V1EngineConfig, Dataset]] = {} locks = Lock() -reader_lock = Lock() -_readers: Dict[Tuple[str, str, str], BaseFileReader] = {} - -reader_map = { - "BasicXMLFileReader": BasicXMLFileReader, - "CSVFileReader": CSVFileReader, - "XMLStreamReader": XMLStreamReader, -} def load_config( @@ -48,15 +40,10 @@ def load_config( return models, config, dataset -def load_reader(dataset: Dataset, model_name: str, dataset_id: str, file_extension: str): +def load_reader(dataset: Dataset, model_name: str, file_extension: str): """Loads the readers for the diven feed, model name and file extension""" - key = (dataset_id, model_name, file_extension) - if key in _readers: - return _readers[key] reader_config = dataset[model_name].reader_config[f".{file_extension}"] - reader = reader_map[reader_config.reader](**reader_config.kwargs_) - with reader_lock: - _readers[key] = reader + reader = _READER_REGISTRY[reader_config.reader](**reader_config.kwargs_) return reader diff --git a/tests/features/demographics.feature b/tests/features/demographics.feature index 810ece4..aa59bfc 100644 --- a/tests/features/demographics.feature +++ b/tests/features/demographics.feature @@ -32,7 +32,7 @@ Feature: Pipeline tests using the ambsys dataset Scenario: Validate PID data with custom types (duckdb) Given I submit the demographics file basic_demographics.csv for processing - And A duckdb pipeline is configured with schema file 'basic_demographics.dischema.json' + And A duckdb pipeline is configured with schema file 'basic_demographics_ddb.dischema.json' And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase diff --git a/tests/features/movies.feature b/tests/features/movies.feature new file mode 100644 index 0000000..e48a122 --- /dev/null +++ b/tests/features/movies.feature @@ -0,0 +1,75 @@ +Feature: Pipeline tests using the movies dataset + Tests for the processing framework which use the movies dataset. + + This tests submissions in JSON format, with configuration in JSON config files. + Complex types are tested (arrays, nested structs) + + Some validation of entity attributes is performed: SQL expressions and Python filter + functions are used, and templatable business rules feature in the transformations. + + Scenario: Validate and filter movies (spark) + Given I submit the movies file movies.json for processing + And A spark pipeline is configured + And I create the following reference data tables in the database movies_refdata + | table_name | parquet_path | + | sequels | tests/testdata/movies/refdata/movies_sequels.parquet | + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the movies entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there are 3 record rejections from the data_contract phase + And there are errors with the following details and associated error_count from the data_contract phase + | ErrorCode | ErrorMessage | error_count | + | BLANKYEAR | year not provided | 1 | + | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | + | DODGYDATE | date_joined value is not valid: daft_date | 1 | + And the movies entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "movies" to 4 qualifying records + And At least one row from "movies" has generated error code "LIMITED_RATINGS" + And At least one row from "derived" has generated error code "RUBBISH_SEQUEL" + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 5 | + | number_record_rejections | 4 | + | number_warnings | 1 | + + Scenario: Validate and filter movies (duckdb) + Given I submit the movies file movies.json for processing + And A duckdb pipeline is configured with schema file 'movies_ddb.dischema.json' + And I create the following reference data tables in the database "movies_refdata" + | table_name | parquet_path | + | sequels | tests/testdata/movies/refdata/movies_sequels.parquet | + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the movies entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there are 3 record rejections from the data_contract phase + And there are errors with the following details and associated error_count from the data_contract phase + | ErrorCode | ErrorMessage | error_count | + | BLANKYEAR | year not provided | 1 | + | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | + | DODGYDATE | date_joined value is not valid: daft_date | 1 | + And the movies entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "movies" to 4 qualifying records + And At least one row from "movies" has generated error code "LIMITED_RATINGS" + And At least one row from "derived" has generated error code "RUBBISH_SEQUEL" + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 5 | + | number_record_rejections | 4 | + | number_warnings | 1 | + diff --git a/tests/features/planets.feature b/tests/features/planets.feature index b00050b..321f047 100644 --- a/tests/features/planets.feature +++ b/tests/features/planets.feature @@ -64,7 +64,7 @@ Feature: Pipeline tests using the planets dataset Scenario: Validate and filter planets (duckdb) Given I submit the planets file planets_demo.csv for processing - And A duckdb pipeline is configured + And A duckdb pipeline is configured with schema file 'planets_ddb.dischema.json' And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase @@ -92,7 +92,7 @@ Feature: Pipeline tests using the planets dataset Scenario: Handle a file with malformed header provided (duckdb) Given I submit the planets file malformed_planets.csv for processing - And A duckdb pipeline is configured + And A duckdb pipeline is configured with schema file 'planets_ddb.dischema.json' And I add initial audit entries for the submission Then the latest audit record for the submission is marked with processing status file_transformation When I run the file transformation phase @@ -101,7 +101,6 @@ Feature: Pipeline tests using the planets dataset Then the latest audit record for the submission is marked with processing status business_rules When I run the business rules phase Then The rules restrict "planets" to 0 qualifying records - And At least one row from "planets" has generated error category "Blank" And At least one row from "planets" has generated error code "STRONG_GRAVITY" And the latest audit record for the submission is marked with processing status error_report When I run the error report phase diff --git a/tests/features/steps/steps_pipeline.py b/tests/features/steps/steps_pipeline.py index 6968388..3dbc2b2 100644 --- a/tests/features/steps/steps_pipeline.py +++ b/tests/features/steps/steps_pipeline.py @@ -7,10 +7,11 @@ """ # pylint: disable=no-name-in-module from concurrent.futures import ThreadPoolExecutor -from functools import partial +from functools import partial, reduce from itertools import chain +import operator from pathlib import Path -from typing import Callable, Dict, Optional +from typing import Any, Callable, Dict, List, Optional, Tuple from uuid import uuid4 from behave import given, then, when # type: ignore from behave.model import Row, Table @@ -163,7 +164,28 @@ def get_record_rejects_from_service(context: Context, service: str, expected_num message_df = load_errors_from_service(processing_path, service) num_rejections = message_df.filter(pl.col("FailureType").eq("record")).shape[0] assert num_rejections == expected_num_errors, f"Got {num_rejections} actual rejections" + +@then("there are errors with the following details and associated error_count from the {service} phase") +def check_error_record_details_from_service(context: Context, service:str): + processing_path = ctxt.get_processing_location(context) + table: Optional[Table] = context.table + if table is None: + raise ValueError("No table supplied in step") + error_details: List[Tuple[pl.Expr, int]] = [] + row: Row + for row in table: + record = row.as_dict() + error_count = int(record.pop("error_count")) + filter_expr = reduce(operator.and_, + [pl.col(k).eq(v) for k, v in record.items()]) + error_details.append((filter_expr, error_count)) + + message_df = load_errors_from_service(processing_path, service) + for err_details in error_details: + filter_expr, error_count = err_details + assert message_df.filter(filter_expr).shape[0] == error_count + @given("A {implementation} pipeline is configured") @given("A {implementation} pipeline is configured with schema file '{schema_file_name}'") @@ -264,7 +286,7 @@ def check_rows_removed_with_error_code(context: Context, entity_name: str, error assert recs_with_err_code >= 1 -@then('At least one row from "{entity_name}" has generated error category "{category}"') +@then('At least one row from "{entity_name}" has generated error category "{category}"') def check_rows_eq_to_category(context: Context, entity_name: str, category: str): """Check number error message rows equivalent to a given value against a given category.""" err_df = get_all_errors_df(context) @@ -273,3 +295,30 @@ def check_rows_eq_to_category(context: Context, entity_name: str, category: str) (pl.col("Entity").eq(entity_name)) & (pl.col("Category").eq(category)) ).shape[0] assert recs_with_err_code >= 1 + +@given("I create the following reference data tables in the database {database}") +def create_refdata_tables(context: Context, database: str): + table: Optional[Table] = context.table + refdata_tables: Dict[str, URI] = {} + row: Row + for row in table: + record = row.as_dict() + refdata_tables[record["table_name"]] = record["parquet_path"] + pipeline = ctxt.get_pipeline(context) + refdata_loader = getattr(pipeline, "_reference_data_loader") + if refdata_loader == SparkRefDataLoader: + refdata_loader.spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}") + for tbl, source in refdata_tables.items(): + (refdata_loader.spark.read.parquet(source) + .write.saveAsTable(f"{database}.{tbl}")) + + if refdata_loader == DuckDBRefDataLoader: + ref_db_file = Path(ctxt.get_processing_location(context), f"{database}.duckdb").as_posix() + refdata_loader.connection.sql(f"ATTACH '{ref_db_file}' AS {database}") + for tbl, source in refdata_tables.items(): + refdata_loader.connection.read_parquet(source).to_table(f"{database}.{tbl}") + + + + + diff --git a/tests/test_core_engine/test_backends/fixtures.py b/tests/test_core_engine/test_backends/fixtures.py index 7d684c6..1f9ac23 100644 --- a/tests/test_core_engine/test_backends/fixtures.py +++ b/tests/test_core_engine/test_backends/fixtures.py @@ -460,3 +460,130 @@ def nested_typecast_parquet(temp_dir) -> Iterator[Tuple[URI, List[Dict[str, Any] ) _df.coalesce(1).write.format("parquet").save(output_location) yield output_location, data + +@pytest.fixture(scope="function") +def nested_all_string_parquet_w_errors(temp_dir, + nested_parquet_custom_dc_err_details) -> Iterator[Tuple[URI, str, List[Dict[str, Any]]]]: + contract_meta = json.dumps( + { + "contract": { + "error_details": f"{nested_parquet_custom_dc_err_details.as_posix()}", + "schemas": { + "SubField": { + "fields": { + "id": "int", + "substrfield": "str", + "subarrayfield": {"type": "date", "is_array": True}, + }, + "mandatory_fields": ["id"], + } + }, + "datasets": { + "nested_model": { + "fields": { + "id": "int", + "strfield": "str", + "datetimefield": "datetime", + "subfield": {"model": "SubField", "is_array": True}, + }, + "reader_config": { + ".xml": { + "reader": "DuckDBXMLStreamReader", + "parameters": {"root_tag": "root", "record_tag": "NestedModel"}, + } + }, + "key_field": "id", + } + }, + } + } + ) + + _spark: SparkSession = SparkSession.builder.getOrCreate() + data: List[Dict[str, Any]] = [ + dict( + id=1, + strfield="hi", + datetimefield=str(datetime(2020, 9, 20, 12, 34, 56)), + subfield=[ + dict( + id=1, + substrfield="bye", + subarrayfield=[str(date(2020, 9, 20)), str(date(2020, 9, 21))], + ) + ], + ), + dict( + id="WRONG", + strfield="hello", + datetimefield=str(datetime(2020, 9, 21, 12, 34, 56)), + subfield=[ + dict( + id=2, + substrfield="bye", + subarrayfield=[str(date(2020, 9, 20)), str(date(2020, 9, 21))], + ), + dict( + id="WRONG", + substrfield="aurevoir", + subarrayfield=[str(date(2020, 9, 22)), str(date(2020, 9, 23))], + ), + ], + ), + ] + + output_location: URI = str(Path(temp_dir).joinpath("nested_parquet").as_posix()) + "/" + + _df: DataFrame = _spark.createDataFrame( + data, + schema=StructType( + [ + StructField("id", StringType()), + StructField("strfield", StringType()), + StructField("datetimefield", StringType()), + StructField( + "subfield", + ArrayType( + StructType( + [ + StructField("id", StringType()), + StructField("substrfield", StringType()), + StructField("subarrayfield", ArrayType(StringType())), + ] + ) + ), + ), + ] + ), + ) + _df.coalesce(1).write.format("parquet").save(output_location) + yield output_location, contract_meta, data + + +@pytest.fixture() +def nested_parquet_custom_dc_err_details(temp_dir): + file_path = Path(temp_dir).joinpath("nested_parquet_data_contract_codes.json") + err_details = { + "id": { + "Blank": {"error_code": "TESTIDBLANK", + "error_message": "id cannot be null"}, + "Bad value": {"error_code": "TESTIDBAD", + "error_message": "id is invalid: id - {{id}}"} + }, + "datetimefield": { + "Bad value": {"error_code": "TESTDTFIELDBAD", + "error_message": "datetimefield is invalid: id - {{id}}, datetimefield - {{datetimefield}}"} + }, + "subfield.id": { + "Blank": {"error_code": "SUBFIELDTESTIDBLANK", + "error_message": "subfield id cannot be null"}, + "Bad value": {"error_code": "SUBFIELDTESTIDBAD", + "error_message": "subfield id is invalid: subfield.id - {{__error_value}}"} + }, + } + with open(file_path, mode="w") as fle: + json.dump(err_details, fle) + + yield file_path + + diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py index decd77f..989a928 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py @@ -174,7 +174,7 @@ def test_dve_audit_using_thread_pool(ddb_audit_manager_threaded: DDBAuditingMana aud.add_new_submissions([_sub_info]) while not aud.queue.empty(): - time.sleep(2) + time.sleep(0.2) at_entry = list( aud._processing_status.get_relation() @@ -188,7 +188,7 @@ def test_dve_audit_using_thread_pool(ddb_audit_manager_threaded: DDBAuditingMana assert len(at_entry) == 1 aud.mark_transform([_sub_info.submission_id]) while not aud.queue.empty(): - time.sleep(2) + time.sleep(0.2) file_trans = aud.get_all_file_transformation_submissions() assert [rw.get("submission_id") for rw in file_trans.pl().iter_rows(named=True)] == [ diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py index 8d1f54d..3e30832 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py @@ -19,12 +19,13 @@ from tests.test_core_engine.test_backends.fixtures import ( nested_all_string_parquet, simple_all_string_parquet, + nested_all_string_parquet_w_errors, + nested_parquet_custom_dc_err_details, temp_csv_file, temp_duckdb_dir, temp_xml_file, ) - def test_duckdb_data_contract_csv(temp_csv_file): uri, _, _, mdl = temp_csv_file connection = default_connection @@ -306,3 +307,49 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet): "datetimefield": "TIMESTAMP", "subfield": "STRUCT(id BIGINT, substrfield VARCHAR, subarrayfield DATE[])[]", } + +def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_errors, + nested_parquet_custom_dc_err_details): + parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors + connection = default_connection + data_contract = DuckDBDataContract(connection) + + entity = data_contract.read_parquet(path=parquet_uri) + assert entity.count("*").fetchone()[0] == 2 + + # check processes entity + contract_dict = json.loads(contract_meta).get("contract") + entities: Dict[str, DuckDBPyRelation] = { + "nested_model": entity, + } + + with open(nested_parquet_custom_dc_err_details) as err_dets: + custom_error_details = json.load(err_dets) + + dc_meta = DataContractMetadata( + reader_metadata={ + "nested_model": { + ".xml": ReaderConfig( + **contract_dict.get("datasets", {}) + .get("nested_model", {}) + .get("reader_config", {}) + .get(".xml") + ) + } + }, + validators={ + "nested_model": RowValidator(contract_dict, + "nested_model", + error_info=custom_error_details) + }, + reporting_fields={"nested_model": ["id"]}, + ) + + entities, messages, stage_successful = data_contract.apply_data_contract(entities, dc_meta) + assert stage_successful + assert len(messages) == 2 + messages = sorted(messages, key= lambda x: x.error_code) + assert messages[0].error_code == "SUBFIELDTESTIDBAD" + assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG" + assert messages[1].error_code == "TESTIDBAD" + assert messages[1].error_message == "id is invalid: id - WRONG" \ No newline at end of file diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py index 5122637..dafac4d 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py @@ -17,7 +17,7 @@ from dve.core_engine.backends.base.core import EntityManager from dve.core_engine.backends.exceptions import MissingEntity -from dve.core_engine.backends.implementations.duckdb.rules import DuckDBStepImplemetations +from dve.core_engine.backends.implementations.duckdb.rules import DuckDBStepImplementations from dve.core_engine.backends.metadata.reporting import ReportingConfig from dve.core_engine.backends.metadata.rules import ( Aggregation, @@ -49,7 +49,7 @@ simple_typecast_parquet, ) -DUCKDB_STEP_BACKEND = DuckDBStepImplemetations(default_connection) +DUCKDB_STEP_BACKEND = DuckDBStepImplementations(default_connection) """The backend for the duckdb steps.""" diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py index 4d6b72f..4045a90 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py @@ -194,7 +194,7 @@ def test_dve_audit_using_thread_pool(spark_audit_manager_threaded: SparkAuditing _sub_info.submission_id = uuid4().hex aud.add_new_submissions([_sub_info]) while not aud.queue.empty(): - time.sleep(2) + time.sleep(0.5) assert _sub_info.submission_id at_entry = ( @@ -206,7 +206,7 @@ def test_dve_audit_using_thread_pool(spark_audit_manager_threaded: SparkAuditing assert len(at_entry) == 1 aud.mark_transform([_sub_info.submission_id]) while not aud.queue.empty(): - time.sleep(2) + time.sleep(0.5) file_trans = aud.get_all_file_transformation_submissions() assert [rw.submission_id for rw in file_trans.collect()] == [_sub_info.submission_id] diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py index 1f8fbf5..fac6cdf 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py @@ -20,7 +20,9 @@ from dve.core_engine.validation import RowValidator from tests.test_core_engine.test_backends.fixtures import ( nested_all_string_parquet, + nested_all_string_parquet_w_errors, simple_all_string_parquet, + nested_parquet_custom_dc_err_details ) @@ -88,7 +90,7 @@ def test_spark_data_contract_read_and_write_basic_parquet( ) -def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet): +def test_spark_data_contract_read_nested_parquet(nested_all_string_parquet): # can we read in a stringified parquet and run the data contract on it? # more complex file - nested, arrays of structs parquet_uri, contract_meta, _ = nested_all_string_parquet @@ -170,3 +172,68 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet): ), ] ) + +def test_spark_data_contract_custom_error_details(nested_all_string_parquet_w_errors, + nested_parquet_custom_dc_err_details): + parquet_uri, contract_meta, _ = nested_all_string_parquet_w_errors + data_contract = SparkDataContract() + + entity = data_contract.read_parquet(path=parquet_uri) + assert entity.count() == 2 + assert entity.schema == StructType( + [ + StructField("id", StringType()), + StructField("strfield", StringType()), + StructField("datetimefield", StringType()), + StructField( + "subfield", + ArrayType( + StructType( + [ + StructField("id", StringType()), + StructField("substrfield", StringType()), + StructField("subarrayfield", ArrayType(StringType())), + ] + ) + ), + ), + ] + ) + # check processes entity + contract_dict = json.loads(contract_meta).get("contract") + entities: Dict[str, DataFrame] = { + "nested_model": entity, + } + + with open(nested_parquet_custom_dc_err_details) as err_dets: + custom_error_details = json.load(err_dets) + + dc_meta = DataContractMetadata( + reader_metadata={ + "nested_model": { + ".xml": ReaderConfig( + **contract_dict.get("datasets", {}) + .get("nested_model", {}) + .get("reader_config", {}) + .get(".xml") + ) + } + }, + validators={ + "nested_model": RowValidator(contract_dict, + "nested_model", + error_info=custom_error_details) + }, + reporting_fields={"nested_model": ["id"]}, + ) + + entities, messages, stage_successful = data_contract.apply_data_contract(entities, dc_meta) + assert stage_successful + assert len(messages) == 2 + messages = sorted(messages, key= lambda x: x.error_code) + assert messages[0].error_code == "SUBFIELDTESTIDBAD" + assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG" + assert messages[1].error_code == "TESTIDBAD" + assert messages[1].error_message == "id is invalid: id - WRONG" + + \ No newline at end of file diff --git a/tests/test_core_engine/test_backends/test_readers/test_duckdb/test_csv.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py similarity index 100% rename from tests/test_core_engine/test_backends/test_readers/test_duckdb/test_csv.py rename to tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py new file mode 100644 index 0000000..900632d --- /dev/null +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_json.py @@ -0,0 +1,102 @@ +from datetime import date, datetime +import json +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import List + +import pytest +from duckdb import DuckDBPyRelation, default_connection +from pydantic import BaseModel + +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( + get_duckdb_type_from_annotation, +) +from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader +from dve.core_engine.backends.utilities import stringify_model +from tests.test_core_engine.test_backends.fixtures import duckdb_connection + + +class SimpleModel(BaseModel): + varchar_field: str + bigint_field: int + date_field: date + timestamp_field: datetime + + +@pytest.fixture +def temp_dir(): + with TemporaryDirectory(prefix="ddb_test_json_reader") as temp_dir: + yield Path(temp_dir) + + +@pytest.fixture +def temp_json_file(temp_dir: Path): + field_names: List[str] = ["varchar_field","bigint_field","date_field","timestamp_field"] + typed_data = [ + ["hi", 1, date(2023, 1, 3), datetime(2023, 1, 3, 12, 0, 3)], + ["bye", 2, date(2023, 3, 7), datetime(2023, 5, 9, 15, 21, 53)], + ] + + test_data = [dict(zip(field_names, rw)) for rw in typed_data] + + with open(temp_dir.joinpath("test.json"), mode="w") as json_file: + json.dump(test_data, json_file, default=str) + + yield temp_dir.joinpath("test.json"), test_data, SimpleModel + + +class SimpleModel(BaseModel): + varchar_field: str + bigint_field: int + date_field: date + timestamp_field: datetime + + +def test_ddb_json_reader_all_str(temp_json_file): + uri, data, mdl = temp_json_file + expected_fields = [fld for fld in mdl.__fields__] + reader = DuckDBJSONReader() + rel: DuckDBPyRelation = reader.read_to_entity_type( + DuckDBPyRelation, uri, "test", stringify_model(mdl) + ) + assert rel.columns == expected_fields + assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in expected_fields} + assert rel.fetchall() == [tuple(str(val) for val in rw.values()) for rw in data] + + +def test_ddb_json_reader_cast(temp_json_file): + uri, data, mdl = temp_json_file + expected_fields = [fld for fld in mdl.__fields__] + reader = DuckDBJSONReader() + rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl) + + assert rel.columns == expected_fields + assert dict(zip(rel.columns, rel.dtypes)) == { + fld.name: str(get_duckdb_type_from_annotation(fld.annotation)) + for fld in mdl.__fields__.values() + } + assert rel.fetchall() == [tuple(rw.values()) for rw in data] + + +def test_ddb_csv_write_parquet(temp_json_file): + uri, _, mdl = temp_json_file + reader = DuckDBJSONReader() + rel: DuckDBPyRelation = reader.read_to_entity_type( + DuckDBPyRelation, uri, "test", stringify_model(mdl) + ) + target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() + reader.write_parquet(rel, target_loc) + parquet_rel = default_connection.read_parquet(target_loc) + assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records") + +def test_ddb_json_write_parquet_py_iterator(temp_json_file): + uri, _, mdl = temp_json_file + reader = DuckDBJSONReader() + data = list(reader.read_to_py_iterator(uri.as_posix(), "test", stringify_model(mdl))) + target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() + reader.write_parquet(default_connection.query("select dta.* from (select unnest($data) as dta)", + params={"data": data}), + target_loc) + parquet_data = sorted(default_connection.read_parquet(target_loc).pl().iter_rows(named=True), + key= lambda x: x.get("bigint_field")) + assert parquet_data == list(data) diff --git a/tests/test_core_engine/test_backends/test_readers/test_duckdb/test_xml.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py similarity index 100% rename from tests/test_core_engine/test_backends/test_readers/test_duckdb/test_xml.py rename to tests/test_core_engine/test_backends/test_readers/test_ddb_xml.py diff --git a/tests/test_core_engine/test_backends/test_readers/test_spark_json.py b/tests/test_core_engine/test_backends/test_readers/test_spark_json.py new file mode 100644 index 0000000..3cbecb8 --- /dev/null +++ b/tests/test_core_engine/test_backends/test_readers/test_spark_json.py @@ -0,0 +1,98 @@ +from datetime import date, datetime +import json +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import List + +import pytest +from pydantic import BaseModel +from pyspark.sql import DataFrame +from pyspark.sql.types import StructType, StructField, StringType + +from dve.core_engine.backends.implementations.spark.spark_helpers import ( + get_type_from_annotation, +) +from dve.core_engine.backends.implementations.spark.readers.json import SparkJSONReader +from dve.core_engine.backends.utilities import stringify_model + + +class SimpleModel(BaseModel): + varchar_field: str + bigint_field: int + date_field: date + timestamp_field: datetime + + +@pytest.fixture +def temp_dir(): + with TemporaryDirectory(prefix="spark_test_json_reader") as temp_dir: + yield Path(temp_dir) + + +@pytest.fixture +def temp_json_file(temp_dir: Path): + field_names: List[str] = ["varchar_field","bigint_field","date_field","timestamp_field"] + typed_data = [ + ["hi", 1, date(2023, 1, 3), datetime(2023, 1, 3, 12, 0, 3)], + ["bye", 2, date(2023, 3, 7), datetime(2023, 5, 9, 15, 21, 53)], + ] + + test_data = [dict(zip(field_names, rw)) for rw in typed_data] + + with open(temp_dir.joinpath("test.json"), mode="w") as json_file: + json.dump(test_data, json_file, default=str) + + yield temp_dir.joinpath("test.json"), test_data, SimpleModel + + +class SimpleModel(BaseModel): + varchar_field: str + bigint_field: int + date_field: date + timestamp_field: datetime + + +def test_spark_json_reader_all_str(temp_json_file): + uri, data, mdl = temp_json_file + expected_fields = [fld for fld in mdl.__fields__] + reader = SparkJSONReader() + df: DataFrame = reader.read_to_entity_type( + DataFrame, uri.as_posix(), "test", stringify_model(mdl) + ) + assert df.columns == expected_fields + assert df.schema == StructType([StructField(nme, StringType()) for nme in expected_fields]) + assert [rw.asDict() for rw in df.collect()] == [{k: str(v) for k, v in rw.items()} for rw in data] + +def test_spark_json_reader_cast(temp_json_file): + uri, data, mdl = temp_json_file + expected_fields = [fld for fld in mdl.__fields__] + reader = SparkJSONReader() + df: DataFrame = reader.read_to_entity_type(DataFrame, uri.as_posix(), "test", mdl) + + assert df.columns == expected_fields + assert df.schema == StructType([StructField(fld.name, get_type_from_annotation(fld.annotation)) + for fld in mdl.__fields__.values()]) + assert [rw.asDict() for rw in df.collect()] == data + + +def test_spark_json_write_parquet(spark, temp_json_file): + uri, _, mdl = temp_json_file + reader = SparkJSONReader() + df: DataFrame = reader.read_to_entity_type( + DataFrame, uri.as_posix(), "test", stringify_model(mdl) + ) + target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() + reader.write_parquet(df, target_loc) + parquet_df = spark.read.parquet(target_loc) + assert parquet_df.collect() == df.collect() + +def test_spark_json_write_parquet_py_iterator(spark, temp_json_file): + uri, _, mdl = temp_json_file + reader = SparkJSONReader() + data = list(reader.read_to_py_iterator(uri.as_posix(), "test", stringify_model(mdl))) + target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() + reader.write_parquet(spark.createDataFrame(data), target_loc) + parquet_data = sorted([rw.asDict() for rw + in spark.read.parquet(target_loc).collect()], + key= lambda x: x.get("bigint_field")) + assert parquet_data == list(data) diff --git a/tests/test_core_engine/test_message.py b/tests/test_core_engine/test_message.py index b48c185..edf89fc 100644 --- a/tests/test_core_engine/test_message.py +++ b/tests/test_core_engine/test_message.py @@ -1,11 +1,15 @@ """Tests for feedback messages.""" +from datetime import date +import json from string import ascii_letters +from typing import Dict, List, Optional +from pydantic import BaseModel, ValidationError import pytest from dve.core_engine.constants import ROWID_COLUMN_NAME -from dve.core_engine.message import FeedbackMessage +from dve.core_engine.message import DEFAULT_ERROR_DETAIL, DataContractErrorDetail, FeedbackMessage def test_rowid_column_stripped(): @@ -139,3 +143,154 @@ def test_max_values(max_value, expected_len): if multiple != expected_len: # make sure message that only the first n were shown assert "only first 3" in val[-1] + + +def test_from_pydantic_error(): + + bad_val_default = DEFAULT_ERROR_DETAIL.get("Bad value") + blank_default = DEFAULT_ERROR_DETAIL.get("Blank") + + class TestModel(BaseModel): + idx: int + other_field: str + + _bad_value_data = {"idx": "ABC", "other_field": 123} + _blank_value_data = {"other_field": "hi"} + + try: + TestModel(**_bad_value_data) + except ValidationError as e: + _error_bad_value = e + + try: + TestModel(**_blank_value_data) + except ValidationError as e: + _error_blank = e + + msgs_bad= FeedbackMessage.from_pydantic_error(entity="test_entity", + record = _bad_value_data, + error=_error_bad_value) + + assert len(msgs_bad) == 1 + assert msgs_bad[0].error_code == bad_val_default.error_code + assert msgs_bad[0].error_message == bad_val_default.error_message + + msgs_blank = FeedbackMessage.from_pydantic_error(entity="test_entity", + record = _blank_value_data, + error=_error_blank) + + assert len(msgs_blank) == 1 + assert msgs_blank[0].error_code == blank_default.error_code + assert msgs_blank[0].error_message == blank_default.error_message + +def test_from_pydantic_error_custom_error_details(): + + bad_val_default = DEFAULT_ERROR_DETAIL.get("Bad value") + blank_default = DEFAULT_ERROR_DETAIL.get("Blank") + class TestModel(BaseModel): + idx: int + str_field: str + date_field: Optional[date] + unimportant_field: Optional[int] + + custom_error_details: str = """ + {"idx": {"Blank": {"error_code": "IDBLANKERRCODE", + "error_message": "idx is a mandatory field"}, + "Bad value": {"error_code": "IDDODGYVALCODE", + "error_message": "idx value is dodgy: {{idx}}"}}, + "date_field": {"Bad value": {"error_code": "DATEDODGYVALCODE", + "error_message": "date_field value is dodgy: idx: {{idx}}, date_field: {{date_field}}"}}} + """ + error_details: Dict[str, Dict[str, DataContractErrorDetail]] = {field: {err_type: DataContractErrorDetail(**detail) + for err_type, detail in err_details.items()} + for field, err_details in json.loads(custom_error_details).items()} + + _bad_value_data = {"idx": "ABC", "str_field": "test", "date_field": "terry", "unimportant_field": "dog"} + _blank_value_data = {} + + try: + TestModel(**_bad_value_data) + except ValidationError as e: + _error_bad_value = e + + try: + TestModel(**_blank_value_data) + except ValidationError as e: + _error_blank = e + + msgs_bad= FeedbackMessage.from_pydantic_error(entity="test_entity", + record = _bad_value_data, + error=_error_bad_value, + error_details=error_details) + + msgs_bad = sorted(msgs_bad, key=lambda x: x.error_location) + + assert len(msgs_bad) == 3 + assert msgs_bad[0].error_code == error_details.get("date_field").get("Bad value").error_code + assert msgs_bad[0].error_message == error_details.get("date_field").get("Bad value").template_message(_bad_value_data) + assert msgs_bad[1].error_code == error_details.get("idx").get("Bad value").error_code + assert msgs_bad[1].error_message == error_details.get("idx").get("Bad value").template_message(_bad_value_data) + assert msgs_bad[2].error_code == bad_val_default.error_code + assert msgs_bad[2].error_message == bad_val_default.error_message + + msgs_blank = FeedbackMessage.from_pydantic_error(entity="test_entity", + record = _blank_value_data, + error=_error_blank, + error_details=error_details) + + + msgs_blank = sorted(msgs_blank, key=lambda x: x.error_location) + + assert len(msgs_blank) == 2 + assert msgs_blank[0].error_code == error_details.get("idx").get("Blank").error_code + assert msgs_blank[0].error_message == error_details.get("idx").get("Blank").template_message(_blank_value_data) + assert msgs_blank[1].error_code == blank_default.error_code + assert msgs_blank[1].error_message == blank_default.error_message + +def test_from_pydantic_error_custom_codes_nested(): + + class LowestModel(BaseModel): + nested_field_3: str + test_date: Optional[date] + class SubTestModel(BaseModel): + nested_field_1: int + nested_field_2: List[LowestModel] + class TestModel(BaseModel): + a_field: str + sub_field: List[SubTestModel] + + + test_record = {"a_field": "test", + "sub_field": [{"nested_field_1": 5, + "nested_field_2": [{"nested_field_3": "hi"}, + {"nested_field_3": "bye", + "test_date": date(2020,1,1)}] + }, + {"nested_field_1": 6, + "nested_field_2": [{"nested_field_3": "bonjour", + "test_date": "Barry"}, + {"nested_field_3": "aurevoir"}] + } + ] + } + custom_error_details: str = """{"sub_field.nested_field_2.test_date": {"Bad value": {"error_code": "DATEDODGYVALCODE", + "error_message": "date_field value is dodgy: a_field: {{a_field}}, date_field: {{__error_value}}"}}}""" + error_details: Dict[str, Dict[str, DataContractErrorDetail]] = {field: {err_type: DataContractErrorDetail(**detail) + for err_type, detail in err_details.items()} + for field, err_details in json.loads(custom_error_details).items()} + try: + TestModel(**test_record) + except ValidationError as err: + error = err + + msg = FeedbackMessage.from_pydantic_error( + entity="test_entity", + record=test_record, + error=error, + error_details=error_details + ) + assert len(msg) == 1 + msg = msg[0] + assert msg.error_code == "DATEDODGYVALCODE" + assert msg.error_message == "date_field value is dodgy: a_field: test, date_field: Barry" + diff --git a/tests/test_core_engine/test_register_udfs.py b/tests/test_core_engine/test_register_udfs.py index 89e134b..118c9aa 100644 --- a/tests/test_core_engine/test_register_udfs.py +++ b/tests/test_core_engine/test_register_udfs.py @@ -6,7 +6,7 @@ from pyspark.sql.types import Row from dve.core_engine.backends.base.core import EntityManager -from dve.core_engine.backends.implementations.duckdb.rules import DuckDBStepImplemetations +from dve.core_engine.backends.implementations.duckdb.rules import DuckDBStepImplementations from dve.core_engine.backends.implementations.spark.rules import SparkStepImplementations from dve.core_engine.backends.metadata.rules import ColumnAddition from tests.test_core_engine.test_backends.fixtures import duckdb_connection @@ -66,7 +66,7 @@ def test_register_udfs_spark(spark: SparkSession): def test_register_udfs_duckdb(duckdb_connection): """Test that UDFs can be registered as duckdb functions.""" - step_impl = DuckDBStepImplemetations.register_udfs(duckdb_connection) + step_impl = DuckDBStepImplementations.register_udfs(duckdb_connection) df: DataFrame = pd.DataFrame( # type: ignore [{"Key": "Over", "A": 10}, {"Key": "Under", "A": 2}] diff --git a/tests/test_pipeline/pipeline_helpers.py b/tests/test_pipeline/pipeline_helpers.py index 00eb3c5..3b3c706 100644 --- a/tests/test_pipeline/pipeline_helpers.py +++ b/tests/test_pipeline/pipeline_helpers.py @@ -17,8 +17,13 @@ from dve.core_engine.models import SubmissionInfo from dve.pipeline.utils import SubmissionStatus +import dve.pipeline.utils + from ..conftest import get_test_file_path +def clear_config_cache(): + dve.pipeline.utils._configs = {} + PLANET_POLARS_SCHEMA = { "planet": pl.Utf8(), "mass": pl.Float32(), @@ -56,6 +61,7 @@ @pytest.fixture(scope="function") def planet_test_files() -> Iterator[str]: + clear_config_cache() with tempfile.TemporaryDirectory() as tdir: shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets")) yield tdir + "/planets" diff --git a/tests/test_pipeline/test_duckdb_pipeline.py b/tests/test_pipeline/test_duckdb_pipeline.py index a6bea24..5619735 100644 --- a/tests/test_pipeline/test_duckdb_pipeline.py +++ b/tests/test_pipeline/test_duckdb_pipeline.py @@ -78,7 +78,7 @@ def test_file_transformation_step( audit_tables=audit_manager, job_run_id=1, connection=conn, - rules_path=PLANETS_RULES_PATH, + rules_path=get_test_file_path("planets/planets_ddb.dischema.json").as_posix(), processed_files_path=planet_test_files, submitted_files_path=planet_test_files, ) diff --git a/tests/test_pipeline/test_pipeline.py b/tests/test_pipeline/test_pipeline.py index adb65bc..429e947 100644 --- a/tests/test_pipeline/test_pipeline.py +++ b/tests/test_pipeline/test_pipeline.py @@ -6,6 +6,7 @@ import tempfile from pathlib import Path +from pyspark.sql import DataFrame from uuid import uuid4 from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager @@ -62,6 +63,7 @@ def test_write_file_to_parquet(planet_test_files): # pylint: disable=redefined- ), submitted_file_info, planet_test_files, + DataFrame ) assert not errors diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index 2912e00..126f6de 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -166,7 +166,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name "ErrorType": "value_error.any_str.max_length", "ErrorLocation": "planet", "ErrorMessage": "is invalid", - "ErrorCode": None, + "ErrorCode": "BadValue", "ReportingField": "planet", "Value": "EarthEarthEarthEarthEarthEarthEarthEarthEarth", "Category": "Bad value", @@ -179,7 +179,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name "ErrorType": "value_error.number.not_ge", "ErrorLocation": "numberOfMoons", "ErrorMessage": "is invalid", - "ErrorCode": None, + "ErrorCode": "BadValue", "ReportingField": "numberOfMoons", "Value": "-1", "Category": "Bad value", @@ -192,7 +192,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name "ErrorType": "type_error.bool", "ErrorLocation": "hasGlobalMagneticField", "ErrorMessage": "is invalid", - "ErrorCode": None, + "ErrorCode": "BadValue", "ReportingField": "hasGlobalMagneticField", "Value": "sometimes", "Category": "Bad value", diff --git a/tests/testdata/books/nested_books.dischema.json b/tests/testdata/books/nested_books.dischema.json index 9b40b2b..ee83cbf 100644 --- a/tests/testdata/books/nested_books.dischema.json +++ b/tests/testdata/books/nested_books.dischema.json @@ -21,7 +21,7 @@ }, "reader_config": { ".xml": { - "reader": "BasicXMLFileReader", + "reader": "SparkXMLReader", "kwargs": { "record_tag": "bookstore", "n_records_to_read": 1 @@ -39,7 +39,7 @@ }, "reader_config": { ".xml": { - "reader": "BasicXMLFileReader", + "reader": "SparkXMLReader", "kwargs": { "record_tag": "author" } diff --git a/tests/testdata/books/nested_books_ddb.dischema.json b/tests/testdata/books/nested_books_ddb.dischema.json index 108465d..aa523aa 100644 --- a/tests/testdata/books/nested_books_ddb.dischema.json +++ b/tests/testdata/books/nested_books_ddb.dischema.json @@ -21,10 +21,9 @@ }, "reader_config": { ".xml": { - "reader": "BasicXMLFileReader", + "reader": "DuckDBXMLStreamReader", "kwargs": { - "record_tag": "bookstore", - "n_records_to_read": 1 + "record_tag": "bookstore" } } } @@ -39,7 +38,7 @@ }, "reader_config": { ".xml": { - "reader": "BasicXMLFileReader", + "reader": "DuckDBXMLStreamReader", "kwargs": { "record_tag": "author" } diff --git a/tests/testdata/demographics/basic_demographics.dischema.json b/tests/testdata/demographics/basic_demographics.dischema.json index b228497..e9c8944 100644 --- a/tests/testdata/demographics/basic_demographics.dischema.json +++ b/tests/testdata/demographics/basic_demographics.dischema.json @@ -16,7 +16,7 @@ }, "reader_config": { ".csv": { - "reader": "CSVFileReader" + "reader": "SparkCSVReader" } }, "mandatory_fields": [ diff --git a/tests/testdata/demographics/basic_demographics_ddb.dischema.json b/tests/testdata/demographics/basic_demographics_ddb.dischema.json new file mode 100644 index 0000000..44e2f77 --- /dev/null +++ b/tests/testdata/demographics/basic_demographics_ddb.dischema.json @@ -0,0 +1,49 @@ +{ + "contract": { + "datasets": { + "demographics": { + "fields": { + "Key": "NonNegativeInt", + "NHS_Number": "nhsnumber", + "NHS_Number_Valid": "str", + "Forename": "str", + "Surname": "str", + "Date_Of_Birth": "date", + "Postcode": "postcode", + "Postcode_Country": "str", + "Postcode_Format_Valid": "str", + "Postcode_Valid": "str" + }, + "reader_config": { + ".csv": { + "reader": "DuckDBCSVReader" + } + }, + "mandatory_fields": [ + "Key", + "NHS_Number", + "NHS_Number_Valid", + "Forename", + "Surname", + "Date_Of_Birth", + "Postcode", + "Postcode_Country", + "Postcode_Format_Valid", + "Postcode_Valid" + ] + } + } + }, + "transformations": { + "filters": [ + { + "entity": "demographics", + "name": "NHS_number_valid", + "expression": "NHS_Number_Valid=='TRUE'", + "error_code": "BAD_NHS", + "failure_message": "NHS number must be valid" + } + ] + } +} + diff --git a/tests/testdata/movies/movies.dischema.json b/tests/testdata/movies/movies.dischema.json new file mode 100644 index 0000000..aa55882 --- /dev/null +++ b/tests/testdata/movies/movies.dischema.json @@ -0,0 +1,73 @@ +{ + "contract": { + "schemas": { + "cast": { + "fields": { + "name": "str", + "role": "str", + "date_joined": "date" + } + } + }, + "error_details": "movies_contract_error_details.json", + "datasets": { + "movies": { + "fields": { + "title": "str", + "year": "int", + "genre": { + "type": "str", + "is_array": true + }, + "duration_minutes": "int", + "ratings": { + "type": "NonNegativeFloat", + "is_array": true + }, + "cast": { + "model": "cast", + "is_array": true} + }, + "reader_config": { + ".json": { + "reader": "SparkJSONReader" + } + }, + "mandatory_fields": [ + "title", + "year" + ] + } + } + }, + "transformations": { + "parameters": {"entity": "movies"}, + "reference_data": { + "sequels": { + "type": "table", + "database": "movies_refdata", + "table_name": "sequels" + } + }, + "rule_stores": [ + { + "store_type": "json", + "filename": "movies_spark_rule_store.json" + } + ], + "complex_rules": [ + { + "rule_name": "ratings_count" + }, + { + "rule_name": "poor_sequel_check", + "parameters": { + "sequel_entity": "refdata_sequels" + } + } + ] + } +} + + + \ No newline at end of file diff --git a/tests/testdata/movies/movies.json b/tests/testdata/movies/movies.json new file mode 100644 index 0000000..87f1149 --- /dev/null +++ b/tests/testdata/movies/movies.json @@ -0,0 +1,56 @@ +[ + { + "title": "The Greatest Movie Ever", + "year": "NOT_A_NUMBER", + "genre": ["Adventure", "Family"], + "duration_minutes": 102, + "ratings": [ 7.4, 8.5 ], + "cast": [ + { "name": "A. Star", "role": "Mom", "date_joined": "2019-03-17" }, + { "name": "C. Actor", "role": "Son", "date_joined": "daft_date" } + ] + }, + { + "title": "Not a great one", + "genre": ["Animation", "Comedy", "Family"], + "duration_minutes": 88, + "ratings": [6.9, 7.8], + "cast": [ + { "name": "J. Smith", "role": "Lion", "date_joined": "2015-11-12" }, + { "name": "T. Car", "role": "Mouse", "date_joined": "2015-11-12" } + ] + }, + { + "title": "Good family movie", + "year": 2022, + "genre": ["Sci-Fi", "Family"], + "duration_minutes": 95, + "ratings": [7, 8.2, 6.3], + "cast": [ + { "name": "D. Farnesbarnes", "role": "Robot" }, + { "name": "G. Adams", "role": "Alien", "date_joined": "2017-08-01" } + ] + }, + { + "title": "One with a cat and a dog", + "year": 2020, + "genre": ["Fantasy", "Family"], + "duration_minutes": 110, + "ratings": [6.1], + "cast": [ + { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, + { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } + ] + }, + { + "title": "A bad 'un", + "year": 2011, + "genre": ["Mystery", "Family"], + "duration_minutes": 97, + "ratings": [1.2, 3.4, 5.6, 3.4], + "cast": [ + { "name": "R. Green", "role": "Baby", "date_joined": "2013-11-12" }, + { "name": "P. Plum", "role": "Dad", "date_joined": "2013-10-08" } + ] + } +] \ No newline at end of file diff --git a/tests/testdata/movies/movies_contract_error_details.json b/tests/testdata/movies/movies_contract_error_details.json new file mode 100644 index 0000000..8c94c92 --- /dev/null +++ b/tests/testdata/movies/movies_contract_error_details.json @@ -0,0 +1,24 @@ +{ + "title": { + "Blank": { + "error_code": "BLANKTITLE", + "error_message": "title should not be blank" + } + }, + "year": { + "Blank": { + "error_code": "BLANKYEAR", + "error_message": "year not provided" + }, + "Bad value": { + "error_code": "DODGYYEAR", + "error_message": "year value ({{year}}) is invalid" + } + }, + "cast.date_joined": { + "Bad value": { + "error_code": "DODGYDATE", + "error_message": "date_joined value is not valid: {{__error_value}}" + } + } +} \ No newline at end of file diff --git a/tests/testdata/movies/movies_ddb.dischema.json b/tests/testdata/movies/movies_ddb.dischema.json new file mode 100644 index 0000000..39eb6ad --- /dev/null +++ b/tests/testdata/movies/movies_ddb.dischema.json @@ -0,0 +1,73 @@ +{ + "contract": { + "schemas": { + "cast": { + "fields": { + "name": "str", + "role": "str", + "date_joined": "date" + } + } + }, + "error_details": "movies_contract_error_details.json", + "datasets": { + "movies": { + "fields": { + "title": "str", + "year": "int", + "genre": { + "type": "str", + "is_array": true + }, + "duration_minutes": "int", + "ratings": { + "type": "NonNegativeFloat", + "is_array": true + }, + "cast": { + "model": "cast", + "is_array": true} + }, + "reader_config": { + ".json": { + "reader": "DuckDBJSONReader" + } + }, + "mandatory_fields": [ + "title", + "year" + ] + } + } + }, + "transformations": { + "parameters": {"entity": "movies"}, + "reference_data": { + "sequels": { + "type": "table", + "database": "movies_refdata", + "table_name": "sequels" + } + }, + "rule_stores": [ + { + "store_type": "json", + "filename": "movies_ddb_rule_store.json" + } + ], + "complex_rules": [ + { + "rule_name": "ratings_count" + }, + { + "rule_name": "poor_sequel_check", + "parameters": { + "sequel_entity": "refdata_sequels" + } + } + ] + } +} + + + \ No newline at end of file diff --git a/tests/testdata/movies/movies_ddb_rule_store.json b/tests/testdata/movies/movies_ddb_rule_store.json new file mode 100644 index 0000000..55ea3c2 --- /dev/null +++ b/tests/testdata/movies/movies_ddb_rule_store.json @@ -0,0 +1,92 @@ +{ + "ratings_count": { + "description": "Ensure more than 1 rating", + "type": "complex_rule", + "parameter_descriptions": { + "entity": "The entity to apply the workflow to." + }, + "parameter_defaults": {}, + "rule_config": { + "rules": [ + { + "name": "Get count of ratings", + "operation": "add", + "entity": "{{entity}}", + "column_name": "no_of_ratings", + "expression": "length(ratings)" + } + ], + "filters": [ + { + "name": "filter_too_few_ratings", + "entity": "{{entity}}", + "expression": "no_of_ratings > 1", + "error_code": "LIMITED_RATINGS", + "reporting_field": "title", + "failure_message": "Movie has too few ratings" + } + ], + "post_filter_rules": [ + { + "name": "Remove the no_of_ratings field", + "operation": "remove", + "entity": "{{entity}}", + "column_name": "no_of_ratings" + } + ] + } + }, + "poor_sequel_check": { + "description": "check if bad sequel exists", + "type": "complex_rule", + "parameter_descriptions": { + "entity": "The entity to apply the workflow to.", + "sequel_entity": "The entity containing sequel data" + }, + "parameter_defaults": {}, + "rule_config": { + "rules": [ + { + "name": "Join sequel data", + "operation": "inner_join", + "entity": "{{entity}}", + "target": "{{sequel_entity}}", + "join_condition": "{{entity}}.title = {{sequel_entity}}.sequel_to", + "new_entity_name": "with_sequels", + "new_columns": { + "{{sequel_entity}}.ratings": "sequel_rating" + } + }, + { + "name": "Get median sequel rating", + "operation": "group_by", + "entity": "with_sequels", + "group_by": "title", + "agg_columns": { + "list_aggregate(sequel_rating, 'median')": "median_sequel_rating" + } + } + + ], + "filters": [ + { + "name": "filter_rubbish_sequel", + "entity": "with_sequels", + "expression": "median_sequel_rating > 5", + "error_code": "RUBBISH_SEQUEL", + "reporting_entity": "derived", + "reporting_field": "title", + "failure_message": "Movie has rubbish sequel", + "is_informational": true + } + ], + "post_filter_rules": [ + { + "name": "Remove the with_sequel entity", + "operation": "remove_entity", + "entity": "with_sequels" + } + ] + } + } +} \ No newline at end of file diff --git a/tests/testdata/movies/movies_spark_rule_store.json b/tests/testdata/movies/movies_spark_rule_store.json new file mode 100644 index 0000000..49ba9b9 --- /dev/null +++ b/tests/testdata/movies/movies_spark_rule_store.json @@ -0,0 +1,103 @@ +{ + "ratings_count": { + "description": "Ensure more than 1 rating", + "type": "complex_rule", + "parameter_descriptions": { + "entity": "The entity to apply the workflow to." + }, + "parameter_defaults": {}, + "rule_config": { + "rules": [ + { + "name": "Get count of ratings", + "operation": "add", + "entity": "{{entity}}", + "column_name": "no_of_ratings", + "expression": "size(ratings)" + } + ], + "filters": [ + { + "name": "filter_too_few_ratings", + "entity": "{{entity}}", + "expression": "no_of_ratings > 1", + "error_code": "LIMITED_RATINGS", + "reporting_field": "title", + "failure_message": "Movie has too few ratings" + } + ], + "post_filter_rules": [ + { + "name": "Remove the no_of_ratings field", + "operation": "remove", + "entity": "{{entity}}", + "column_name": "no_of_ratings" + } + ] + } + }, + "poor_sequel_check": { + "description": "check if bad sequel exists", + "type": "complex_rule", + "parameter_descriptions": { + "entity": "The entity to apply the workflow to.", + "sequel_entity": "The entity containing sequel data" + }, + "parameter_defaults": {}, + "rule_config": { + "rules": [ + { + "name": "Join sequel data", + "operation": "inner_join", + "entity": "{{entity}}", + "target": "{{sequel_entity}}", + "join_condition": "{{entity}}.title = {{sequel_entity}}.sequel_to", + "new_entity_name": "with_sequels", + "new_columns": { + "{{sequel_entity}}.ratings": "sequel_rating" + } + }, + { + "name": "Explode sequel rating", + "operation": "select", + "entity": "with_sequels", + "columns": { + "title": "title", + "explode(sequel_rating)": "sequel_rating" + } + }, + { + "name": "Get median sequel rating", + "operation": "group_by", + "entity": "with_sequels", + "group_by": "title", + "agg_columns": { + "percentile_approx(sequel_rating, 0.5)": "median_sequel_rating" + } + } + + + + ], + "filters": [ + { + "name": "filter_rubbish_sequel", + "entity": "with_sequels", + "expression": "median_sequel_rating > 5", + "error_code": "RUBBISH_SEQUEL", + "reporting_entity": "derived", + "reporting_field": "title", + "failure_message": "Movie has rubbish sequel", + "is_informational": true + } + ], + "post_filter_rules": [ + { + "name": "Remove the with_sequel entity", + "operation": "remove_entity", + "entity": "with_sequels" + } + ] + } + } +} \ No newline at end of file diff --git a/tests/testdata/movies/refdata/movies_sequels.parquet b/tests/testdata/movies/refdata/movies_sequels.parquet new file mode 100644 index 0000000..bd4cee2 Binary files /dev/null and b/tests/testdata/movies/refdata/movies_sequels.parquet differ diff --git a/tests/testdata/planets/planets.dischema.json b/tests/testdata/planets/planets.dischema.json index 36b2969..7a0387c 100644 --- a/tests/testdata/planets/planets.dischema.json +++ b/tests/testdata/planets/planets.dischema.json @@ -47,7 +47,7 @@ "key_field": "planet", "reader_config": { ".csv": { - "reader": "CSVFileReader" + "reader": "SparkCSVReader" } }, "mandatory_fields": [ diff --git a/tests/testdata/planets/planets_ddb.dischema.json b/tests/testdata/planets/planets_ddb.dischema.json new file mode 100644 index 0000000..51e6650 --- /dev/null +++ b/tests/testdata/planets/planets_ddb.dischema.json @@ -0,0 +1,155 @@ +{ + "contract": { + "types": { + "Planet": { + "callable": "constr", + "constraints": { + "max_length": 30, + "min_length": 1, + "strip_whitespace": true + } + }, + "Pressure": { + "type": "NonNegativeFloat" + }, + "BoolExcludeUnknown": { + "type": "bool" + }, + "FloatDroppingUncertainty": { + "type": "float" + } + }, + "datasets": { + "planets": { + "fields": { + "planet": "Planet", + "mass": "PositiveFloat", + "diameter": "PositiveFloat", + "density": "PositiveFloat", + "gravity": "PositiveFloat", + "escapeVelocity": "PositiveFloat", + "rotationPeriod": "float", + "lengthOfDay": "PositiveFloat", + "distanceFromSun": "PositiveFloat", + "perihelion": "PositiveFloat", + "aphelion": "PositiveFloat", + "orbitalPeriod": "PositiveFloat", + "orbitalVelocity": "PositiveFloat", + "orbitalInclination": "NonNegativeFloat", + "orbitalEccentricity": "PositiveFloat", + "obliquityToOrbit": "PositiveFloat", + "meanTemperature": "float", + "surfacePressure": "Pressure", + "numberOfMoons": "NonNegativeInt", + "hasRingSystem": "bool", + "hasGlobalMagneticField": "BoolExcludeUnknown" + }, + "key_field": "planet", + "reader_config": { + ".csv": { + "reader": "DuckDBCSVReader" + } + }, + "mandatory_fields": [ + "planet", + "mass", + "diameter" + ] + } + } + }, + "transformations": { + "reference_data": { + "satellites": { + "type": "filename", + "filename": "./refdata/parquet/satellites.parquet" + } + }, + "rule_stores": [ + { + "store_type": "json", + "filename": "planet_ruleset.json" + } + ], + "rules": [ + { + "operation": "left_join", + "entity": "planets", + "target": "refdata_satellites", + "join_condition": "planets.planet == refdata_satellites.planet", + "new_columns": { + "refdata_satellites.gm": "satellite_gm", + "refdata_satellites.density": "satellite_density", + "refdata_satellites.radius": "satellite_radius", + "refdata_satellites.magnitude": "satellite_magnitude", + "refdata_satellites.albedo": "satellite_albedo", + "COALESCE(planets.meanTemperature BETWEEN 10 AND 30, False)": "OrbitsPlanetWithNiceTemp" + } + }, + { + "operation": "group_by", + "entity": "planets", + "group_by": { + "planet": "planet" + }, + "agg_columns": { + "max(satellite_gm)": "gm", + "max_by(satellite_radius, satellite_gm)": "radius", + "max_by(satellite_density, satellite_gm)": "density", + "max_by(satellite_magnitude, satellite_gm)": "magnitude", + "max_by(satellite_albedo, satellite_gm)": "albedo", + "first(OrbitsPlanetWithNiceTemp)": "OrbitsPlanetWithNiceTemp" + }, + "new_entity_name": "largest_satellites" + } + ], + "filters": [ + { + "entity": "planets", + "name": "weak_escape", + "expression": "escapeVelocity < 25", + "error_code": "WEAK_ESCAPE", + "reporting_field": "escapeVelocity", + "failure_message": "Planet has weak escape velocity" + }, + { + "entity": "planets", + "name": "has_row_id", + "expression": "__rowid__ IS NOT NULL" + }, + { + "entity": "planets", + "name": "long_orbit", + "expression": "orbitalPeriod > 1000", + "error_code": "LONG_ORBIT", + "reporting_field": "orbitalPeriod", + "failure_message": "Planet has long orbital period" + }, + { + "rule_name": "field_over_1000", + "parameters": { + "entity": "planets", + "field": "density" + }, + "error_code": "DENSITY_OVER_1000", + "reporting_field": "density" + }, + { + "entity": "planets", + "name": "strong_gravity", + "expression": "over_10(gravity)", + "error_code": "STRONG_GRAVITY", + "reporting_field": "gravity", + "failure_message": "Planet has too strong gravity" + } + ], + "complex_rules": [ + { + "rule_name": "add_1_col_and_remove_it", + "parameters": { + "entity": "planets" + } + } + ] + } +}