Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
from dve.core_engine.backends.readers import register_reader

from .contract import DuckDBDataContract
from .readers import DuckDBCSVReader, DuckDBXMLStreamReader
from .readers import (
DuckDBCSVReader,
DuckDBCSVRepeatingHeaderReader,
DuckDBXMLStreamReader,
PolarsToDuckDBCSVReader
)
from .reference_data import DuckDBRefDataLoader
from .rules import DuckDBStepImplementations

register_reader(DuckDBCSVReader)
register_reader(DuckDBCSVRepeatingHeaderReader)
register_reader(DuckDBJSONReader)
register_reader(DuckDBXMLStreamReader)
register_reader(PolarsToDuckDBCSVReader)

__all__ = [
"DuckDBDataContract",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Readers for use with duckdb backend"""

from .csv import DuckDBCSVReader
from .csv import DuckDBCSVReader, DuckDBCSVRepeatingHeaderReader, PolarsToDuckDBCSVReader
from .json import DuckDBJSONReader
from .xml import DuckDBXMLStreamReader

__all__ = [
"DuckDBCSVReader",
"DuckDBCSVRepeatingHeaderReader",
"DuckDBJSONReader",
"DuckDBXMLStreamReader",
"PolarsToDuckDBCSVReader",
]
109 changes: 107 additions & 2 deletions src/dve/core_engine/backends/implementations/duckdb/readers/csv.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
"""A csv reader to create duckdb relations"""

# pylint: disable=arguments-differ
from typing import Any, Dict, Iterator, Type
from typing import Any, Dict, Iterator, Optional, Type

import duckdb as ddb
import polars as pl
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv
from pydantic import BaseModel

from dve.core_engine.backends.base.reader import BaseFileReader, read_function
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
duckdb_write_parquet,
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.types import SQLType
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import URI, EntityName
from dve.parser.file_handling import get_content_length


@duckdb_write_parquet
Expand All @@ -25,10 +31,12 @@ def __init__(
self,
header: bool = True,
delim: str = ",",
connection: DuckDBPyConnection = None,
quotechar: str = '"',
connection: Optional[DuckDBPyConnection] = None,
):
self.header = header
self.delim = delim
self.quotechar = quotechar
self._connection = connection if connection else default_connection

super().__init__()
Expand All @@ -44,9 +52,13 @@ def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
) -> DuckDBPyRelation:
"""Returns a relation object from the source csv"""
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

reader_options: Dict[str, Any] = {
"header": self.header,
"delimiter": self.delim,
"quotechar": self.quotechar,
}

ddb_schema: Dict[str, SQLType] = {
Expand All @@ -56,3 +68,96 @@ def read_to_relation( # pylint: disable=unused-argument

reader_options["columns"] = ddb_schema
return read_csv(resource, **reader_options)


class PolarsToDuckDBCSVReader(DuckDBCSVReader):
"""
Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.

The primary reason this reader exists is due to the limitation within duckdb csv reader and
it not being able to read partial content from a csv (i.e. select a, b NOT y).
"""

@read_function(DuckDBPyRelation)
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
) -> DuckDBPyRelation:
"""Returns a relation object from the source csv"""
if get_content_length(resource) == 0:
raise EmptyFileError(f"File at {resource} is empty.")

reader_options: Dict[str, Any] = {
"has_header": self.header,
"separator": self.delim,
"quote_char": self.quotechar,
}

polars_types = {
fld.name: get_polars_type_from_annotation(fld.annotation) # type: ignore
for fld in schema.__fields__.values()
}
reader_options["dtypes"] = polars_types

# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
# redundant
df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612

return ddb.sql("SELECT * FROM df")


class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
"""A Reader for files with a `.csv` extension and where there are repeating "header" values
within the file. Header in this case is not the column names at the top of a csv, rather a
collection of unique records that would usually be structured in another entity. However, due
to the fact that `csv` is a semi-structured data format, you cannot define complex entities,
hence the values are then repeated on all rows.

Example of a repeating header data may look like this...

| headerCol1 | headerCol2 | headerCol3 | nonHeaderCol1 | nonHeaderCol2 |
| ---------- | ---------- | ---------- | ------------- | ------------- |
| shop 1 | clothes | 2025-01-01 | jeans | 20.39 |
| shop 1 | clothes | 2025-01-01 | shirt | 14.99 |

This reader will just pull out the distinct values from the header column. Where there are
more/less than one distinct value per column, the reader will produce a
`NonDistinctHeaderError`.

So using the example above, the expected entity would look like this...
| headerCol1 | headerCol2 | headerCol3 |
| ---------- | ---------- | ---------- |
| shop1 | clothes | 2025-01-01 |
"""
@read_function(DuckDBPyRelation)
def read_to_relation( # pylint: disable=unused-argument
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
) -> DuckDBPyRelation:
entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
entity = entity.distinct()
no_records = entity.shape[0]

if no_records != 1:
rows = entity.pl().to_dicts()
differing_values = [
f"{key}: {', '.join(sorted(str(val) for val in values))}"
for key, *values in zip(rows[0], *map(dict.values, rows)) # type: ignore
if len(set(values)) > 1
]
raise MessageBearingError(
"More than one set of Headers found in CSV file",
messages=[
FeedbackMessage(
record={entity_name: differing_values},
entity="Pre-validation",
failure_type="submission",
error_message=(
f"Found {no_records} distinct combination of header values."
),
error_location=entity_name,
category="Bad file",
error_code="NonUniqueHeader",
)
],
)

return entity
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_duckdb_data_contract_csv(temp_csv_file):
entities: Dict[str, DuckDBPyRelation] = {
"test_ds": DuckDBCSVReader(
header=True, delim=",", connection=connection
).read_to_entity_type(DuckDBPyRelation, uri, "test_ds", stringify_model(mdl))
).read_to_entity_type(DuckDBPyRelation, str(uri), "test_ds", stringify_model(mdl))
}

data_contract: DuckDBDataContract = DuckDBDataContract(connection)
Expand Down
92 changes: 83 additions & 9 deletions tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@
from duckdb import DuckDBPyRelation, default_connection
from pydantic import BaseModel

from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
get_duckdb_type_from_annotation,
)
from dve.core_engine.backends.implementations.duckdb.readers.csv import DuckDBCSVReader, SQLType
from dve.core_engine.backends.implementations.duckdb.readers.csv import (
DuckDBCSVReader,
DuckDBCSVRepeatingHeaderReader,
PolarsToDuckDBCSVReader,
)
from dve.core_engine.backends.utilities import stringify_model
from tests.test_core_engine.test_backends.fixtures import duckdb_connection

# pylint: disable=C0116


class SimpleModel(BaseModel):
varchar_field: str
Expand All @@ -21,6 +28,11 @@ class SimpleModel(BaseModel):
timestamp_field: datetime


class SimpleHeaderModel(BaseModel):
header_1: str
header_2: str


@pytest.fixture
def temp_dir():
with TemporaryDirectory(prefix="ddb_test_csv_reader") as temp_dir:
Expand All @@ -43,18 +55,19 @@ def temp_csv_file(temp_dir: Path):
yield temp_dir.joinpath("dummy.csv"), header, typed_data, SimpleModel


class SimpleModel(BaseModel):
varchar_field: str
bigint_field: int
date_field: date
timestamp_field: datetime
@pytest.fixture
def temp_empty_csv_file(temp_dir: Path):
with open(temp_dir.joinpath("empty.csv"), mode="w"):
pass

yield temp_dir.joinpath("empty.csv"), SimpleModel


def test_ddb_csv_reader_all_str(temp_csv_file):
uri, header, data, mdl = temp_csv_file
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
rel: DuckDBPyRelation = reader.read_to_entity_type(
DuckDBPyRelation, uri, "test", stringify_model(mdl)
DuckDBPyRelation, str(uri), "test", stringify_model(mdl)
)
assert rel.columns == header.split(",")
assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in header.split(",")}
Expand All @@ -64,7 +77,7 @@ def test_ddb_csv_reader_all_str(temp_csv_file):
def test_ddb_csv_reader_cast(temp_csv_file):
uri, header, data, mdl = temp_csv_file
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl)
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, str(uri), "test", mdl)
assert rel.columns == header.split(",")
assert dict(zip(rel.columns, rel.dtypes)) == {
fld.name: str(get_duckdb_type_from_annotation(fld.annotation))
Expand All @@ -77,9 +90,70 @@ def test_ddb_csv_write_parquet(temp_csv_file):
uri, header, data, mdl = temp_csv_file
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
rel: DuckDBPyRelation = reader.read_to_entity_type(
DuckDBPyRelation, uri, "test", stringify_model(mdl)
DuckDBPyRelation, str(uri), "test", stringify_model(mdl)
)
target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix()
reader.write_parquet(rel, target_loc)
parquet_rel = reader._connection.read_parquet(target_loc)
assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records")


def test_ddb_csv_read_empty_file(temp_empty_csv_file):
uri, mdl = temp_empty_csv_file
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)

with pytest.raises(EmptyFileError):
reader.read_to_relation(str(uri), "test", mdl)


def test_polars_to_ddb_csv_reader(temp_csv_file):
uri, header, data, mdl = temp_csv_file
reader = PolarsToDuckDBCSVReader(
header=True, delim=",", quotechar='"', connection=default_connection
)
entity = reader.read_to_relation(str(uri), "test", mdl)

assert entity.shape[0] == 2


def test_ddb_csv_repeating_header_reader_non_duplicate(temp_dir):
header = "header_1,header_2,non_header_1"
typed_data = [
["hvalue1", "hvalue1", "nhvalue1"],
["hvalue1", "hvalue1", "nhvalue2"],
["hvalue1", "hvalue1", "nhvalue3"],
]
with open(temp_dir.joinpath("test_header.csv"), mode="w") as csv_file:
csv_file.write(header + "\n")
for rw in typed_data:
csv_file.write(",".join([str(val) for val in rw]) + "\n")

file_uri = temp_dir.joinpath("test_header.csv")

reader = DuckDBCSVRepeatingHeaderReader(
header=True, delim=",", quotechar='"', connection=default_connection
)
entity = reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)

assert entity.shape[0] == 1


def test_ddb_csv_repeating_header_reader_with_more_than_one_set_of_distinct_values(temp_dir):
header = "header_1,header_2,non_header_1"
typed_data = [
["hvalue1", "hvalue2", "nhvalue1"],
["hvalue2", "hvalue2", "nhvalue2"],
["hvalue1", "hvalue1", "nhvalue3"],
]
with open(temp_dir.joinpath("test_header.csv"), mode="w") as csv_file:
csv_file.write(header + "\n")
for rw in typed_data:
csv_file.write(",".join([str(val) for val in rw]) + "\n")

file_uri = temp_dir.joinpath("test_header.csv")
reader = DuckDBCSVRepeatingHeaderReader(
header=True, delim=",", quotechar='"', connection=default_connection
)

with pytest.raises(MessageBearingError):
reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)