diff --git a/src/dve/core_engine/backends/implementations/duckdb/utilities.py b/src/dve/core_engine/backends/implementations/duckdb/utilities.py index 39e4929..6211500 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/utilities.py +++ b/src/dve/core_engine/backends/implementations/duckdb/utilities.py @@ -1,5 +1,7 @@ """Utility objects for use with duckdb backend""" +import itertools + from dve.core_engine.backends.base.utilities import _split_multiexpr_string @@ -24,7 +26,11 @@ def expr_mapping_to_columns(expressions: dict) -> list[str]: def expr_array_to_columns(expressions: list[str]) -> list[str]: """Create list of duckdb expressions from list of expressions""" - return [f"{expression}" for expression in expressions] + return list( + itertools.chain.from_iterable( + _split_multiexpr_string(expression) for expression in expressions + ) + ) def multiexpr_string_to_columns(expressions: str) -> list[str]: diff --git a/src/dve/core_engine/backends/implementations/spark/utilities.py b/src/dve/core_engine/backends/implementations/spark/utilities.py index bd5b02a..5eca158 100644 --- a/src/dve/core_engine/backends/implementations/spark/utilities.py +++ b/src/dve/core_engine/backends/implementations/spark/utilities.py @@ -1,6 +1,7 @@ """Some utilities which are useful for implementing Spark transformations.""" import datetime as dt +import itertools from collections.abc import Callable from json import JSONEncoder from operator import and_, or_ @@ -70,7 +71,13 @@ def expr_mapping_to_columns(expressions: ExpressionMapping) -> list[Column]: def expr_array_to_columns(expressions: ExpressionArray) -> list[Column]: """Convert an array of expressions to a list of columns.""" - return list(map(sf.expr, expressions)) + + _expr_list = list( + itertools.chain.from_iterable( + _split_multiexpr_string(expression) for expression in expressions + ) + ) + return list(map(sf.expr, _expr_list)) def multiexpr_string_to_columns(expressions: MultiExpression) -> list[Column]: diff --git a/src/dve/core_engine/type_hints.py b/src/dve/core_engine/type_hints.py index ac6cf2a..991f46f 100644 --- a/src/dve/core_engine/type_hints.py +++ b/src/dve/core_engine/type_hints.py @@ -13,8 +13,6 @@ # TODO - cannot remove List from Typing. See L60 for details. - - if TYPE_CHECKING: # pragma: no cover from dve.core_engine.message import FeedbackMessage diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py new file mode 100644 index 0000000..8490ab5 --- /dev/null +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_ddb_utils.py @@ -0,0 +1,56 @@ +from typing import Dict, List +import pytest + +from dve.core_engine.backends.implementations.duckdb.utilities import ( + expr_mapping_to_columns, + expr_array_to_columns, +) + + +@pytest.mark.parametrize( + ["expressions", "expected"], + [ + ( + {"size(array_field)": "field_length", "another_field": "rename_another_field"}, + ["size(array_field) as field_length", "another_field as rename_another_field"], + ), + ], +) +def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]): + observed = expr_mapping_to_columns(expressions) + assert observed == expected + + +@pytest.mark.parametrize( + ["expressions", "expected"], + [ + ( + [ + "a_field", + "another_field as renamed", + "struct(a_field, another_field) as struct_field", + ], + [ + "a_field", + "another_field as renamed", + "struct(a_field, another_field) as struct_field", + ], + ), + ( + [ + "size(array_field)", + "another_field as rename_another_field", + "a_dynamic_field, another_dynamic_field", + ], + [ + "size(array_field)", + "another_field as rename_another_field", + "a_dynamic_field", + "another_dynamic_field", + ], + ), + ], +) +def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]): + observed = expr_array_to_columns(expressions) + assert observed == expected diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py new file mode 100644 index 0000000..d8c8d3d --- /dev/null +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_spark_utils.py @@ -0,0 +1,38 @@ +import pytest +from pyspark.sql.functions import expr + +from dve.core_engine.backends.implementations.spark.utilities import ( + expr_mapping_to_columns, + expr_array_to_columns, +) + + +@pytest.mark.parametrize( + ["expressions"], + [ + ( + {"size(array_field)": "field_length", "another_field": "rename_another_field"}, + ), + ] +) +def test_expr_mapping_to_columns(spark, expressions: dict[str, str]): + observed = expr_mapping_to_columns(expressions) + assert [cl._jc.toString() for cl in observed] == [expr(expression).alias(rename)._jc.toString() for expression, rename in expressions.items()] + + +@pytest.mark.parametrize( + ["expressions", "expected"], + [ + ( + ["a_field", "another_field as renamed", "struct(a_field, another_field) as struct_field"], + ["a_field", "another_field as renamed", "struct(a_field, another_field) as struct_field"] + ), + ( + ["size(array_field)", "another_field as rename_another_field", "a_dynamic_field, another_dynamic_field"], + ["size(array_field)", "another_field as rename_another_field", "a_dynamic_field", "another_dynamic_field"], + ), + ], +) +def test_expr_array_to_columns(spark, expressions: dict[str, str], expected: list[str]): + observed = expr_array_to_columns(expressions) + assert [cl._jc.toString() for cl in observed] == [expr(expression)._jc.toString() for expression in expected] \ No newline at end of file