diff --git a/src/dve/core_engine/backends/implementations/duckdb/__init__.py b/src/dve/core_engine/backends/implementations/duckdb/__init__.py index f51cbbb..5ee22d0 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/__init__.py +++ b/src/dve/core_engine/backends/implementations/duckdb/__init__.py @@ -3,13 +3,20 @@ from dve.core_engine.backends.readers import register_reader from .contract import DuckDBDataContract -from .readers import DuckDBCSVReader, DuckDBXMLStreamReader +from .readers import ( + DuckDBCSVReader, + DuckDBCSVRepeatingHeaderReader, + DuckDBXMLStreamReader, + PolarsToDuckDBCSVReader +) from .reference_data import DuckDBRefDataLoader from .rules import DuckDBStepImplementations register_reader(DuckDBCSVReader) +register_reader(DuckDBCSVRepeatingHeaderReader) register_reader(DuckDBJSONReader) register_reader(DuckDBXMLStreamReader) +register_reader(PolarsToDuckDBCSVReader) __all__ = [ "DuckDBDataContract", diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py b/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py index 236009b..b41cc67 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/__init__.py @@ -1,11 +1,13 @@ """Readers for use with duckdb backend""" -from .csv import DuckDBCSVReader +from .csv import DuckDBCSVReader, DuckDBCSVRepeatingHeaderReader, PolarsToDuckDBCSVReader from .json import DuckDBJSONReader from .xml import DuckDBXMLStreamReader __all__ = [ "DuckDBCSVReader", + "DuckDBCSVRepeatingHeaderReader", "DuckDBJSONReader", "DuckDBXMLStreamReader", + "PolarsToDuckDBCSVReader", ] diff --git a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py index 217d221..96f7d95 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py +++ b/src/dve/core_engine/backends/implementations/duckdb/readers/csv.py @@ -1,18 +1,24 @@ """A csv reader to create duckdb relations""" # pylint: disable=arguments-differ -from typing import Any, Dict, Iterator, Type +from typing import Any, Dict, Iterator, Optional, Type +import duckdb as ddb +import polars as pl from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv from pydantic import BaseModel from dve.core_engine.backends.base.reader import BaseFileReader, read_function +from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( duckdb_write_parquet, get_duckdb_type_from_annotation, ) from dve.core_engine.backends.implementations.duckdb.types import SQLType +from dve.core_engine.backends.utilities import get_polars_type_from_annotation +from dve.core_engine.message import FeedbackMessage from dve.core_engine.type_hints import URI, EntityName +from dve.parser.file_handling import get_content_length @duckdb_write_parquet @@ -25,10 +31,12 @@ def __init__( self, header: bool = True, delim: str = ",", - connection: DuckDBPyConnection = None, + quotechar: str = '"', + connection: Optional[DuckDBPyConnection] = None, ): self.header = header self.delim = delim + self.quotechar = quotechar self._connection = connection if connection else default_connection super().__init__() @@ -44,9 +52,13 @@ def read_to_relation( # pylint: disable=unused-argument self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] ) -> DuckDBPyRelation: """Returns a relation object from the source csv""" + if get_content_length(resource) == 0: + raise EmptyFileError(f"File at {resource} is empty.") + reader_options: Dict[str, Any] = { "header": self.header, "delimiter": self.delim, + "quotechar": self.quotechar, } ddb_schema: Dict[str, SQLType] = { @@ -56,3 +68,96 @@ def read_to_relation( # pylint: disable=unused-argument reader_options["columns"] = ddb_schema return read_csv(resource, **reader_options) + + +class PolarsToDuckDBCSVReader(DuckDBCSVReader): + """ + Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object. + + The primary reason this reader exists is due to the limitation within duckdb csv reader and + it not being able to read partial content from a csv (i.e. select a, b NOT y). + """ + + @read_function(DuckDBPyRelation) + def read_to_relation( # pylint: disable=unused-argument + self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] + ) -> DuckDBPyRelation: + """Returns a relation object from the source csv""" + if get_content_length(resource) == 0: + raise EmptyFileError(f"File at {resource} is empty.") + + reader_options: Dict[str, Any] = { + "has_header": self.header, + "separator": self.delim, + "quote_char": self.quotechar, + } + + polars_types = { + fld.name: get_polars_type_from_annotation(fld.annotation) # type: ignore + for fld in schema.__fields__.values() + } + reader_options["dtypes"] = polars_types + + # there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85 + # redundant + df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612 + + return ddb.sql("SELECT * FROM df") + + +class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader): + """A Reader for files with a `.csv` extension and where there are repeating "header" values + within the file. Header in this case is not the column names at the top of a csv, rather a + collection of unique records that would usually be structured in another entity. However, due + to the fact that `csv` is a semi-structured data format, you cannot define complex entities, + hence the values are then repeated on all rows. + + Example of a repeating header data may look like this... + + | headerCol1 | headerCol2 | headerCol3 | nonHeaderCol1 | nonHeaderCol2 | + | ---------- | ---------- | ---------- | ------------- | ------------- | + | shop 1 | clothes | 2025-01-01 | jeans | 20.39 | + | shop 1 | clothes | 2025-01-01 | shirt | 14.99 | + + This reader will just pull out the distinct values from the header column. Where there are + more/less than one distinct value per column, the reader will produce a + `NonDistinctHeaderError`. + + So using the example above, the expected entity would look like this... + | headerCol1 | headerCol2 | headerCol3 | + | ---------- | ---------- | ---------- | + | shop1 | clothes | 2025-01-01 | + """ + @read_function(DuckDBPyRelation) + def read_to_relation( # pylint: disable=unused-argument + self, resource: URI, entity_name: EntityName, schema: Type[BaseModel] + ) -> DuckDBPyRelation: + entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema) + entity = entity.distinct() + no_records = entity.shape[0] + + if no_records != 1: + rows = entity.pl().to_dicts() + differing_values = [ + f"{key}: {', '.join(sorted(str(val) for val in values))}" + for key, *values in zip(rows[0], *map(dict.values, rows)) # type: ignore + if len(set(values)) > 1 + ] + raise MessageBearingError( + "More than one set of Headers found in CSV file", + messages=[ + FeedbackMessage( + record={entity_name: differing_values}, + entity="Pre-validation", + failure_type="submission", + error_message=( + f"Found {no_records} distinct combination of header values." + ), + error_location=entity_name, + category="Bad file", + error_code="NonUniqueHeader", + ) + ], + ) + + return entity diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py index 3e30832..23f1534 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py @@ -73,7 +73,7 @@ def test_duckdb_data_contract_csv(temp_csv_file): entities: Dict[str, DuckDBPyRelation] = { "test_ds": DuckDBCSVReader( header=True, delim=",", connection=connection - ).read_to_entity_type(DuckDBPyRelation, uri, "test_ds", stringify_model(mdl)) + ).read_to_entity_type(DuckDBPyRelation, str(uri), "test_ds", stringify_model(mdl)) } data_contract: DuckDBDataContract = DuckDBDataContract(connection) diff --git a/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py index 13846dc..8f9d40d 100644 --- a/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py +++ b/tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py @@ -6,13 +6,20 @@ from duckdb import DuckDBPyRelation, default_connection from pydantic import BaseModel +from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( get_duckdb_type_from_annotation, ) -from dve.core_engine.backends.implementations.duckdb.readers.csv import DuckDBCSVReader, SQLType +from dve.core_engine.backends.implementations.duckdb.readers.csv import ( + DuckDBCSVReader, + DuckDBCSVRepeatingHeaderReader, + PolarsToDuckDBCSVReader, +) from dve.core_engine.backends.utilities import stringify_model from tests.test_core_engine.test_backends.fixtures import duckdb_connection +# pylint: disable=C0116 + class SimpleModel(BaseModel): varchar_field: str @@ -21,6 +28,11 @@ class SimpleModel(BaseModel): timestamp_field: datetime +class SimpleHeaderModel(BaseModel): + header_1: str + header_2: str + + @pytest.fixture def temp_dir(): with TemporaryDirectory(prefix="ddb_test_csv_reader") as temp_dir: @@ -43,18 +55,19 @@ def temp_csv_file(temp_dir: Path): yield temp_dir.joinpath("dummy.csv"), header, typed_data, SimpleModel -class SimpleModel(BaseModel): - varchar_field: str - bigint_field: int - date_field: date - timestamp_field: datetime +@pytest.fixture +def temp_empty_csv_file(temp_dir: Path): + with open(temp_dir.joinpath("empty.csv"), mode="w"): + pass + + yield temp_dir.joinpath("empty.csv"), SimpleModel def test_ddb_csv_reader_all_str(temp_csv_file): uri, header, data, mdl = temp_csv_file reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) rel: DuckDBPyRelation = reader.read_to_entity_type( - DuckDBPyRelation, uri, "test", stringify_model(mdl) + DuckDBPyRelation, str(uri), "test", stringify_model(mdl) ) assert rel.columns == header.split(",") assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in header.split(",")} @@ -64,7 +77,7 @@ def test_ddb_csv_reader_all_str(temp_csv_file): def test_ddb_csv_reader_cast(temp_csv_file): uri, header, data, mdl = temp_csv_file reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) - rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl) + rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, str(uri), "test", mdl) assert rel.columns == header.split(",") assert dict(zip(rel.columns, rel.dtypes)) == { fld.name: str(get_duckdb_type_from_annotation(fld.annotation)) @@ -77,9 +90,70 @@ def test_ddb_csv_write_parquet(temp_csv_file): uri, header, data, mdl = temp_csv_file reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) rel: DuckDBPyRelation = reader.read_to_entity_type( - DuckDBPyRelation, uri, "test", stringify_model(mdl) + DuckDBPyRelation, str(uri), "test", stringify_model(mdl) ) target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix() reader.write_parquet(rel, target_loc) parquet_rel = reader._connection.read_parquet(target_loc) assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records") + + +def test_ddb_csv_read_empty_file(temp_empty_csv_file): + uri, mdl = temp_empty_csv_file + reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection) + + with pytest.raises(EmptyFileError): + reader.read_to_relation(str(uri), "test", mdl) + + +def test_polars_to_ddb_csv_reader(temp_csv_file): + uri, header, data, mdl = temp_csv_file + reader = PolarsToDuckDBCSVReader( + header=True, delim=",", quotechar='"', connection=default_connection + ) + entity = reader.read_to_relation(str(uri), "test", mdl) + + assert entity.shape[0] == 2 + + +def test_ddb_csv_repeating_header_reader_non_duplicate(temp_dir): + header = "header_1,header_2,non_header_1" + typed_data = [ + ["hvalue1", "hvalue1", "nhvalue1"], + ["hvalue1", "hvalue1", "nhvalue2"], + ["hvalue1", "hvalue1", "nhvalue3"], + ] + with open(temp_dir.joinpath("test_header.csv"), mode="w") as csv_file: + csv_file.write(header + "\n") + for rw in typed_data: + csv_file.write(",".join([str(val) for val in rw]) + "\n") + + file_uri = temp_dir.joinpath("test_header.csv") + + reader = DuckDBCSVRepeatingHeaderReader( + header=True, delim=",", quotechar='"', connection=default_connection + ) + entity = reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel) + + assert entity.shape[0] == 1 + + +def test_ddb_csv_repeating_header_reader_with_more_than_one_set_of_distinct_values(temp_dir): + header = "header_1,header_2,non_header_1" + typed_data = [ + ["hvalue1", "hvalue2", "nhvalue1"], + ["hvalue2", "hvalue2", "nhvalue2"], + ["hvalue1", "hvalue1", "nhvalue3"], + ] + with open(temp_dir.joinpath("test_header.csv"), mode="w") as csv_file: + csv_file.write(header + "\n") + for rw in typed_data: + csv_file.write(",".join([str(val) for val in rw]) + "\n") + + file_uri = temp_dir.joinpath("test_header.csv") + reader = DuckDBCSVRepeatingHeaderReader( + header=True, delim=",", quotechar='"', connection=default_connection + ) + + with pytest.raises(MessageBearingError): + reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)