Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Utility objects for use with duckdb backend"""

import itertools

from dve.core_engine.backends.base.utilities import _split_multiexpr_string


Expand All @@ -24,7 +26,11 @@ def expr_mapping_to_columns(expressions: dict) -> list[str]:

def expr_array_to_columns(expressions: list[str]) -> list[str]:
"""Create list of duckdb expressions from list of expressions"""
return [f"{expression}" for expression in expressions]
return list(
itertools.chain.from_iterable(
_split_multiexpr_string(expression) for expression in expressions
)
)


def multiexpr_string_to_columns(expressions: str) -> list[str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Some utilities which are useful for implementing Spark transformations."""

import datetime as dt
import itertools
from collections.abc import Callable
from json import JSONEncoder
from operator import and_, or_
Expand Down Expand Up @@ -70,7 +71,13 @@ def expr_mapping_to_columns(expressions: ExpressionMapping) -> list[Column]:

def expr_array_to_columns(expressions: ExpressionArray) -> list[Column]:
"""Convert an array of expressions to a list of columns."""
return list(map(sf.expr, expressions))

_expr_list = list(
itertools.chain.from_iterable(
_split_multiexpr_string(expression) for expression in expressions
)
)
return list(map(sf.expr, _expr_list))


def multiexpr_string_to_columns(expressions: MultiExpression) -> list[Column]:
Expand Down
2 changes: 0 additions & 2 deletions src/dve/core_engine/type_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

# TODO - cannot remove List from Typing. See L60 for details.



if TYPE_CHECKING: # pragma: no cover
from dve.core_engine.message import FeedbackMessage

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import Dict, List
import pytest

from dve.core_engine.backends.implementations.duckdb.utilities import (
expr_mapping_to_columns,
expr_array_to_columns,
)


@pytest.mark.parametrize(
["expressions", "expected"],
[
(
{"size(array_field)": "field_length", "another_field": "rename_another_field"},
["size(array_field) as field_length", "another_field as rename_another_field"],
),
],
)
def test_expr_mapping_to_columns(expressions: Dict[str, str], expected: list[str]):
observed = expr_mapping_to_columns(expressions)
assert observed == expected


@pytest.mark.parametrize(
["expressions", "expected"],
[
(
[
"a_field",
"another_field as renamed",
"struct(a_field, another_field) as struct_field",
],
[
"a_field",
"another_field as renamed",
"struct(a_field, another_field) as struct_field",
],
),
(
[
"size(array_field)",
"another_field as rename_another_field",
"a_dynamic_field, another_dynamic_field",
],
[
"size(array_field)",
"another_field as rename_another_field",
"a_dynamic_field",
"another_dynamic_field",
],
),
],
)
def test_expr_array_to_columns(expressions: Dict[str, str], expected: list[str]):
observed = expr_array_to_columns(expressions)
assert observed == expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest
from pyspark.sql.functions import expr

from dve.core_engine.backends.implementations.spark.utilities import (
expr_mapping_to_columns,
expr_array_to_columns,
)


@pytest.mark.parametrize(
["expressions"],
[
(
{"size(array_field)": "field_length", "another_field": "rename_another_field"},
),
]
)
def test_expr_mapping_to_columns(spark, expressions: dict[str, str]):
observed = expr_mapping_to_columns(expressions)
assert [cl._jc.toString() for cl in observed] == [expr(expression).alias(rename)._jc.toString() for expression, rename in expressions.items()]


@pytest.mark.parametrize(
["expressions", "expected"],
[
(
["a_field", "another_field as renamed", "struct(a_field, another_field) as struct_field"],
["a_field", "another_field as renamed", "struct(a_field, another_field) as struct_field"]
),
(
["size(array_field)", "another_field as rename_another_field", "a_dynamic_field, another_dynamic_field"],
["size(array_field)", "another_field as rename_another_field", "a_dynamic_field", "another_dynamic_field"],
),
],
)
def test_expr_array_to_columns(spark, expressions: dict[str, str], expected: list[str]):
observed = expr_array_to_columns(expressions)
assert [cl._jc.toString() for cl in observed] == [expr(expression)._jc.toString() for expression in expected]