Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def _(
else:
result = apply_window_if_present(result, window)

if op.should_floor_result:
if op.should_floor_result or column.dtype == dtypes.TIMEDELTA_DTYPE:
result = sge.Cast(this=sge.func("FLOOR", result), to="INT64")
return result

Expand Down
10 changes: 8 additions & 2 deletions bigframes/core/compile/sqlglot/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,16 @@ def compile_concat(node: nodes.ConcatNode, *children: ir.SQLGlotIR) -> ir.SQLGlo
assert len(children) >= 1
uid_gen = children[0].uid_gen

output_ids = [id.sql for id in node.output_ids]
# BigQuery `UNION` query takes the column names from the first `SELECT` clause.
default_output_ids = [field.id.sql for field in node.child_nodes[0].fields]
output_aliases = [
(default_output_id, output_id.sql)
for default_output_id, output_id in zip(default_output_ids, node.output_ids)
]

return ir.SQLGlotIR.from_union(
[child.expr for child in children],
output_ids=output_ids,
output_aliases=output_aliases,
uid_gen=uid_gen,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def _cast_to_json(expr: TypedExpr, op: ops.AsTypeOp) -> sge.Expression:
sg_expr = expr.expr

if from_type == dtypes.STRING_DTYPE:
func_name = "PARSE_JSON_IN_SAFE" if op.safe else "PARSE_JSON"
func_name = "SAFE.PARSE_JSON" if op.safe else "PARSE_JSON"
return sge.func(func_name, sg_expr)
if from_type in (dtypes.INT_DTYPE, dtypes.BOOL_DTYPE, dtypes.FLOAT_DTYPE):
sg_expr = sge.Cast(this=sg_expr, to="STRING")
Expand Down
52 changes: 22 additions & 30 deletions bigframes/core/compile/sqlglot/sqlglot_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ def from_query_string(
cls,
query_string: str,
) -> SQLGlotIR:
"""Builds a SQLGlot expression from a query string"""
"""Builds a SQLGlot expression from a query string. Wrapping the query
in a CTE can avoid the query parsing issue for unsupported syntax in
SQLGlot."""
uid_gen: guid.SequentialUIDGenerator = guid.SequentialUIDGenerator()
cte_name = sge.to_identifier(
next(uid_gen.get_uid_stream("bfcte_")), quoted=cls.quoted
Expand All @@ -187,7 +189,7 @@ def from_query_string(
def from_union(
cls,
selects: typing.Sequence[sge.Select],
output_ids: typing.Sequence[str],
output_aliases: typing.Sequence[typing.Tuple[str, str]],
uid_gen: guid.SequentialUIDGenerator,
) -> SQLGlotIR:
"""Builds a SQLGlot expression by unioning of multiple select expressions."""
Expand All @@ -196,7 +198,7 @@ def from_union(
), f"At least two select expressions must be provided, but got {selects}."

existing_ctes: list[sge.CTE] = []
union_selects: list[sge.Expression] = []
union_selects: list[sge.Select] = []
for select in selects:
assert isinstance(
select, sge.Select
Expand All @@ -205,37 +207,27 @@ def from_union(
select_expr = select.copy()
select_expr, select_ctes = _pop_query_ctes(select_expr)
existing_ctes = [*existing_ctes, *select_ctes]

new_cte_name = sge.to_identifier(
next(uid_gen.get_uid_stream("bfcte_")), quoted=cls.quoted
)
new_cte = sge.CTE(
this=select_expr,
alias=new_cte_name,
union_selects.append(select_expr)

union_expr: sge.Query = union_selects[0].subquery()
for select in union_selects[1:]:
union_expr = sge.Union(
this=union_expr,
expression=select.subquery(),
distinct=False,
copy=False,
)
existing_ctes = [*existing_ctes, new_cte]

selections = [
sge.Alias(
this=sge.to_identifier(expr.alias_or_name, quoted=cls.quoted),
alias=sge.to_identifier(output_id, quoted=cls.quoted),
)
for expr, output_id in zip(select_expr.expressions, output_ids)
]
union_selects.append(
sge.Select().select(*selections).from_(sge.Table(this=new_cte_name))
selections = [
sge.Alias(
this=sge.to_identifier(old_name, quoted=cls.quoted),
alias=sge.to_identifier(new_name, quoted=cls.quoted),
)

union_expr = typing.cast(
sge.Select,
functools.reduce(
lambda x, y: sge.Union(
this=x, expression=y, distinct=False, copy=False
),
union_selects,
),
for old_name, new_name in output_aliases
]
final_select_expr = (
sge.Select().select(*selections).from_(union_expr.subquery())
)
final_select_expr = sge.Select().select(sge.Star()).from_(union_expr.subquery())
final_select_expr = _set_query_ctes(final_select_expr, existing_ctes)
return cls(expr=final_select_expr, uid_gen=uid_gen)

Expand Down
2 changes: 1 addition & 1 deletion tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def test_read_gbq_w_primary_keys_table(
pd.testing.assert_frame_equal(result, sorted_result)

# Verify that we're working from a snapshot rather than a copy of the table.
assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql
assert "FOR SYSTEM_TIME AS OF" in df.sql


def test_read_gbq_w_primary_keys_table_and_filters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ WITH `bfcte_0` AS (
PARSE_JSON(CAST(`bool_col` AS STRING)) AS `bfcol_6`,
PARSE_JSON(`string_col`) AS `bfcol_7`,
PARSE_JSON(CAST(`bool_col` AS STRING)) AS `bfcol_8`,
PARSE_JSON_IN_SAFE(`string_col`) AS `bfcol_9`
SAFE.PARSE_JSON(`string_col`) AS `bfcol_9`
FROM `bfcte_0`
)
SELECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@ WITH `bfcte_1` AS (
*,
0 AS `bfcol_8`
FROM `bfcte_3`
), `bfcte_6` AS (
SELECT
`rowindex` AS `bfcol_9`,
`rowindex` AS `bfcol_10`,
`int64_col` AS `bfcol_11`,
`string_col` AS `bfcol_12`,
`bfcol_8` AS `bfcol_13`,
`bfcol_7` AS `bfcol_14`
FROM `bfcte_5`
), `bfcte_0` AS (
SELECT
`int64_col`,
Expand All @@ -39,44 +30,44 @@ WITH `bfcte_1` AS (
*,
1 AS `bfcol_23`
FROM `bfcte_2`
), `bfcte_7` AS (
SELECT
`rowindex` AS `bfcol_24`,
`rowindex` AS `bfcol_25`,
`int64_col` AS `bfcol_26`,
`string_col` AS `bfcol_27`,
`bfcol_23` AS `bfcol_28`,
`bfcol_22` AS `bfcol_29`
FROM `bfcte_4`
), `bfcte_8` AS (
), `bfcte_6` AS (
SELECT
*
`bfcol_9` AS `bfcol_30`,
`bfcol_10` AS `bfcol_31`,
`bfcol_11` AS `bfcol_32`,
`bfcol_12` AS `bfcol_33`,
`bfcol_13` AS `bfcol_34`,
`bfcol_14` AS `bfcol_35`
FROM (
SELECT
`bfcol_9` AS `bfcol_30`,
`bfcol_10` AS `bfcol_31`,
`bfcol_11` AS `bfcol_32`,
`bfcol_12` AS `bfcol_33`,
`bfcol_13` AS `bfcol_34`,
`bfcol_14` AS `bfcol_35`
FROM `bfcte_6`
(
SELECT
`rowindex` AS `bfcol_9`,
`rowindex` AS `bfcol_10`,
`int64_col` AS `bfcol_11`,
`string_col` AS `bfcol_12`,
`bfcol_8` AS `bfcol_13`,
`bfcol_7` AS `bfcol_14`
FROM `bfcte_5`
)
UNION ALL
SELECT
`bfcol_24` AS `bfcol_30`,
`bfcol_25` AS `bfcol_31`,
`bfcol_26` AS `bfcol_32`,
`bfcol_27` AS `bfcol_33`,
`bfcol_28` AS `bfcol_34`,
`bfcol_29` AS `bfcol_35`
FROM `bfcte_7`
(
SELECT
`rowindex` AS `bfcol_24`,
`rowindex` AS `bfcol_25`,
`int64_col` AS `bfcol_26`,
`string_col` AS `bfcol_27`,
`bfcol_23` AS `bfcol_28`,
`bfcol_22` AS `bfcol_29`
FROM `bfcte_4`
)
)
)
SELECT
`bfcol_30` AS `rowindex`,
`bfcol_31` AS `rowindex_1`,
`bfcol_32` AS `int64_col`,
`bfcol_33` AS `string_col`
FROM `bfcte_8`
FROM `bfcte_6`
ORDER BY
`bfcol_34` ASC NULLS LAST,
`bfcol_35` ASC NULLS LAST
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ WITH `bfcte_2` AS (
*,
0 AS `bfcol_5`
FROM `bfcte_6`
), `bfcte_13` AS (
SELECT
`float64_col` AS `bfcol_6`,
`int64_col` AS `bfcol_7`,
`bfcol_5` AS `bfcol_8`,
`bfcol_4` AS `bfcol_9`
FROM `bfcte_10`
), `bfcte_0` AS (
SELECT
`bool_col`,
Expand All @@ -42,13 +35,6 @@ WITH `bfcte_2` AS (
*,
1 AS `bfcol_16`
FROM `bfcte_8`
), `bfcte_14` AS (
SELECT
`float64_col` AS `bfcol_17`,
`int64_too` AS `bfcol_18`,
`bfcol_16` AS `bfcol_19`,
`bfcol_15` AS `bfcol_20`
FROM `bfcte_12`
), `bfcte_1` AS (
SELECT
`float64_col`,
Expand All @@ -64,13 +50,6 @@ WITH `bfcte_2` AS (
*,
2 AS `bfcol_26`
FROM `bfcte_5`
), `bfcte_15` AS (
SELECT
`float64_col` AS `bfcol_27`,
`int64_col` AS `bfcol_28`,
`bfcol_26` AS `bfcol_29`,
`bfcol_25` AS `bfcol_30`
FROM `bfcte_9`
), `bfcte_0` AS (
SELECT
`bool_col`,
Expand All @@ -93,50 +72,54 @@ WITH `bfcte_2` AS (
*,
3 AS `bfcol_37`
FROM `bfcte_7`
), `bfcte_16` AS (
SELECT
`float64_col` AS `bfcol_38`,
`int64_too` AS `bfcol_39`,
`bfcol_37` AS `bfcol_40`,
`bfcol_36` AS `bfcol_41`
FROM `bfcte_11`
), `bfcte_17` AS (
), `bfcte_13` AS (
SELECT
*
`bfcol_6` AS `bfcol_42`,
`bfcol_7` AS `bfcol_43`,
`bfcol_8` AS `bfcol_44`,
`bfcol_9` AS `bfcol_45`
FROM (
SELECT
`bfcol_6` AS `bfcol_42`,
`bfcol_7` AS `bfcol_43`,
`bfcol_8` AS `bfcol_44`,
`bfcol_9` AS `bfcol_45`
FROM `bfcte_13`
(
SELECT
`float64_col` AS `bfcol_6`,
`int64_col` AS `bfcol_7`,
`bfcol_5` AS `bfcol_8`,
`bfcol_4` AS `bfcol_9`
FROM `bfcte_10`
)
UNION ALL
SELECT
`bfcol_17` AS `bfcol_42`,
`bfcol_18` AS `bfcol_43`,
`bfcol_19` AS `bfcol_44`,
`bfcol_20` AS `bfcol_45`
FROM `bfcte_14`
(
SELECT
`float64_col` AS `bfcol_17`,
`int64_too` AS `bfcol_18`,
`bfcol_16` AS `bfcol_19`,
`bfcol_15` AS `bfcol_20`
FROM `bfcte_12`
)
UNION ALL
SELECT
`bfcol_27` AS `bfcol_42`,
`bfcol_28` AS `bfcol_43`,
`bfcol_29` AS `bfcol_44`,
`bfcol_30` AS `bfcol_45`
FROM `bfcte_15`
(
SELECT
`float64_col` AS `bfcol_27`,
`int64_col` AS `bfcol_28`,
`bfcol_26` AS `bfcol_29`,
`bfcol_25` AS `bfcol_30`
FROM `bfcte_9`
)
UNION ALL
SELECT
`bfcol_38` AS `bfcol_42`,
`bfcol_39` AS `bfcol_43`,
`bfcol_40` AS `bfcol_44`,
`bfcol_41` AS `bfcol_45`
FROM `bfcte_16`
(
SELECT
`float64_col` AS `bfcol_38`,
`int64_too` AS `bfcol_39`,
`bfcol_37` AS `bfcol_40`,
`bfcol_36` AS `bfcol_41`
FROM `bfcte_11`
)
)
)
SELECT
`bfcol_42` AS `float64_col`,
`bfcol_43` AS `int64_col`
FROM `bfcte_17`
FROM `bfcte_13`
ORDER BY
`bfcol_44` ASC NULLS LAST,
`bfcol_45` ASC NULLS LAST
5 changes: 0 additions & 5 deletions tests/unit/core/compile/sqlglot/test_compile_readlocal.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

import numpy as np
import pandas as pd
import pytest
Expand All @@ -36,7 +34,6 @@ def test_compile_readlocal_w_structs_df(
compiler_session_w_nested_structs_types: bigframes.Session,
snapshot,
):
# TODO(b/427306734): Check why the output is different from the expected output.
bf_df = bpd.DataFrame(
nested_structs_pandas_df, session=compiler_session_w_nested_structs_types
)
Expand Down Expand Up @@ -66,8 +63,6 @@ def test_compile_readlocal_w_json_df(
def test_compile_readlocal_w_special_values(
compiler_session: bigframes.Session, snapshot
):
if sys.version_info < (3, 12):
pytest.skip("Skipping test due to inconsistent SQL formatting")
df = pd.DataFrame(
{
"col_none": [None, 1, 2],
Expand Down
Loading