From 4c3454bf8f46eaba00871ecaee7932a4ef36305d Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 17 Nov 2025 16:50:51 +0000 Subject: [PATCH 1/2] refactor: small tweak to allow use of dynamic fields in select rules --- .../backends/implementations/duckdb/utilities.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/dve/core_engine/backends/implementations/duckdb/utilities.py b/src/dve/core_engine/backends/implementations/duckdb/utilities.py index 39e4929..628238c 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/utilities.py +++ b/src/dve/core_engine/backends/implementations/duckdb/utilities.py @@ -1,5 +1,6 @@ """Utility objects for use with duckdb backend""" +import itertools from dve.core_engine.backends.base.utilities import _split_multiexpr_string @@ -24,7 +25,12 @@ def expr_mapping_to_columns(expressions: dict) -> list[str]: def expr_array_to_columns(expressions: list[str]) -> list[str]: """Create list of duckdb expressions from list of expressions""" - return [f"{expression}" for expression in expressions] + return list( + itertools.chain.from_iterable( + _split_multiexpr_string(expression) + for expression in expressions + ) + ) def multiexpr_string_to_columns(expressions: str) -> list[str]: From 61827aeb5b1b4f8541021395c9167812b5ee22dc Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Wed, 19 Nov 2025 09:59:03 +0000 Subject: [PATCH 2/2] test: added small tests for duckdb and spark utils --- .../implementations/duckdb/__init__.py | 1 + .../implementations/duckdb/utilities.py | 6 +- .../implementations/spark/spark_helpers.py | 9 +-- .../implementations/spark/utilities.py | 9 ++- src/dve/core_engine/backends/readers/xml.py | 9 +-- src/dve/core_engine/backends/utilities.py | 6 +- src/dve/core_engine/message.py | 2 +- src/dve/core_engine/type_hints.py | 3 +- .../test_duckdb/test_ddb_utils.py | 56 +++++++++++++++++++ .../test_spark/test_spark_utils.py | 38 +++++++++++++ 10 files changed, 115 insertions(+), 24 deletions(-) create mode 100644 tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py create mode 100644 tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py diff --git a/src/dve/core_engine/backends/implementations/duckdb/__init__.py b/src/dve/core_engine/backends/implementations/duckdb/__init__.py index d731064..996ec80 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/__init__.py +++ b/src/dve/core_engine/backends/implementations/duckdb/__init__.py @@ -1,4 +1,5 @@ """Implementation of duckdb backend""" + from dve.core_engine.backends.implementations.duckdb.readers.json import DuckDBJSONReader from dve.core_engine.backends.readers import register_reader diff --git a/src/dve/core_engine/backends/implementations/duckdb/utilities.py b/src/dve/core_engine/backends/implementations/duckdb/utilities.py index 628238c..6211500 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/utilities.py +++ b/src/dve/core_engine/backends/implementations/duckdb/utilities.py @@ -1,6 +1,7 @@ """Utility objects for use with duckdb backend""" import itertools + from dve.core_engine.backends.base.utilities import _split_multiexpr_string @@ -27,10 +28,9 @@ def expr_array_to_columns(expressions: list[str]) -> list[str]: """Create list of duckdb expressions from list of expressions""" return list( itertools.chain.from_iterable( - _split_multiexpr_string(expression) - for expression in expressions - ) + _split_multiexpr_string(expression) for expression in expressions ) + ) def multiexpr_string_to_columns(expressions: str) -> list[str]: diff --git a/src/dve/core_engine/backends/implementations/spark/spark_helpers.py b/src/dve/core_engine/backends/implementations/spark/spark_helpers.py index 921b04e..7cb7b17 100644 --- a/src/dve/core_engine/backends/implementations/spark/spark_helpers.py +++ b/src/dve/core_engine/backends/implementations/spark/spark_helpers.py @@ -12,14 +12,7 @@ from dataclasses import dataclass, is_dataclass from decimal import Decimal from functools import wraps -from typing import ( - Any, - ClassVar, - Optional, - TypeVar, - Union, - overload, -) +from typing import Any, ClassVar, Optional, TypeVar, Union, overload from delta.exceptions import ConcurrentAppendException, DeltaConcurrentModificationException from pydantic import BaseModel diff --git a/src/dve/core_engine/backends/implementations/spark/utilities.py b/src/dve/core_engine/backends/implementations/spark/utilities.py index bd5b02a..5eca158 100644 --- a/src/dve/core_engine/backends/implementations/spark/utilities.py +++ b/src/dve/core_engine/backends/implementations/spark/utilities.py @@ -1,6 +1,7 @@ """Some utilities which are useful for implementing Spark transformations.""" import datetime as dt +import itertools from collections.abc import Callable from json import JSONEncoder from operator import and_, or_ @@ -70,7 +71,13 @@ def expr_mapping_to_columns(expressions: ExpressionMapping) -> list[Column]: def expr_array_to_columns(expressions: ExpressionArray) -> list[Column]: """Convert an array of expressions to a list of columns.""" - return list(map(sf.expr, expressions)) + + _expr_list = list( + itertools.chain.from_iterable( + _split_multiexpr_string(expression) for expression in expressions + ) + ) + return list(map(sf.expr, _expr_list)) def multiexpr_string_to_columns(expressions: MultiExpression) -> list[Column]: diff --git a/src/dve/core_engine/backends/readers/xml.py b/src/dve/core_engine/backends/readers/xml.py index bd7b8e4..5de23c4 100644 --- a/src/dve/core_engine/backends/readers/xml.py +++ b/src/dve/core_engine/backends/readers/xml.py @@ -3,14 +3,7 @@ import re from collections.abc import Collection, Iterator -from typing import ( - IO, - Any, - GenericAlias, # type: ignore - Optional, - Union, - overload -) +from typing import IO, Any, GenericAlias, Optional, Union, overload # type: ignore import polars as pl from lxml import etree # type: ignore diff --git a/src/dve/core_engine/backends/utilities.py b/src/dve/core_engine/backends/utilities.py index 9319780..1ef115c 100644 --- a/src/dve/core_engine/backends/utilities.py +++ b/src/dve/core_engine/backends/utilities.py @@ -4,8 +4,8 @@ from dataclasses import is_dataclass from datetime import date, datetime from decimal import Decimal -from typing import Any, ClassVar, Union from typing import GenericAlias # type: ignore +from typing import Any, ClassVar, Union import polars as pl # type: ignore from polars.datatypes.classes import DataTypeClass as PolarsType @@ -39,7 +39,9 @@ def stringify_type(type_: Union[type, GenericAlias]) -> type: """Stringify an individual type.""" - if isinstance(type_, type) and not isinstance(type_, GenericAlias): # A model, return the contents. # pylint: disable=C0301 + if isinstance(type_, type) and not isinstance( + type_, GenericAlias + ): # A model, return the contents. # pylint: disable=C0301 if issubclass(type_, BaseModel): return stringify_model(type_) diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index d81acde..7dd4f02 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -2,8 +2,8 @@ import copy import datetime as dt -import operator import json +import operator from collections.abc import Callable from decimal import Decimal from functools import reduce diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py index a6c0c44..0fc53ec 100644 --- a/src/dve/core_engine/type_hints.py +++ b/src/dve/core_engine/type_hints.py @@ -6,12 +6,13 @@ from pathlib import Path from queue import Queue as ThreadQueue from typing import TYPE_CHECKING, Any, List, Optional, TypeVar, Union # pylint: disable=W1901 -# TODO - cannot remove List from Typing. See L60 for details. from pyspark.sql import DataFrame from pyspark.sql.types import StructType from typing_extensions import Literal, ParamSpec, get_args +# TODO - cannot remove List from Typing. See L60 for details. + if TYPE_CHECKING: # pragma: no cover from dve.core_engine.message import FeedbackMessage diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py new file mode 100644 index 0000000..8490ab5 --- /dev/null +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py @@ -0,0 +1,56 @@ +from typing import Dict, List +import pytest + +from dve.core_engine.backends.implementations.duckdb.utilities import ( + expr_mapping_to_columns, + expr_array_to_columns, +) + + +@pytest.mark.parametrize( + ["expressions", "expected"], + [ + ( + {"size(array_field)": "field_length", "another_field": "rename_another_field"}, + ["size(array_field) as field_length", "another_field as rename_another_field"], + ), + ], +) +def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]): + observed = expr_mapping_to_columns(expressions) + assert observed == expected + + +@pytest.mark.parametrize( + ["expressions", "expected"], + [ + ( + [ + "a_field", + "another_field as renamed", + "struct(a_field, another_field) as struct_field", + ], + [ + "a_field", + "another_field as renamed", + "struct(a_field, another_field) as struct_field", + ], + ), + ( + [ + "size(array_field)", + "another_field as rename_another_field", + "a_dynamic_field, another_dynamic_field", + ], + [ + "size(array_field)", + "another_field as rename_another_field", + "a_dynamic_field", + "another_dynamic_field", + ], + ), + ], +) +def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]): + observed = expr_array_to_columns(expressions) + assert observed == expected diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py new file mode 100644 index 0000000..d8c8d3d --- /dev/null +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py @@ -0,0 +1,38 @@ +import pytest +from pyspark.sql.functions import expr + +from dve.core_engine.backends.implementations.spark.utilities import ( + expr_mapping_to_columns, + expr_array_to_columns, +) + + +@pytest.mark.parametrize( + ["expressions"], + [ + ( + {"size(array_field)": "field_length", "another_field": "rename_another_field"}, + ), + ] +) +def test_expr_mapping_to_columns(spark, expressions: dict[str, str]): + observed = expr_mapping_to_columns(expressions) + assert [cl._jc.toString() for cl in observed] == [expr(expression).alias(rename)._jc.toString() for expression, rename in expressions.items()] + + +@pytest.mark.parametrize( + ["expressions", "expected"], + [ + ( + ["a_field", "another_field as renamed", "struct(a_field, another_field) as struct_field"], + ["a_field", "another_field as renamed", "struct(a_field, another_field) as struct_field"] + ), + ( + ["size(array_field)", "another_field as rename_another_field", "a_dynamic_field, another_dynamic_field"], + ["size(array_field)", "another_field as rename_another_field", "a_dynamic_field", "another_dynamic_field"], + ), + ], +) +def test_expr_array_to_columns(spark, expressions: dict[str, str], expected: list[str]): + observed = expr_array_to_columns(expressions) + assert [cl._jc.toString() for cl in observed] == [expr(expression)._jc.toString() for expression in expected] \ No newline at end of file