diff --git a/bigframes/core/compile/configs.py b/bigframes/core/compile/configs.py index 5ffca0cf43..62c28f87ca 100644 --- a/bigframes/core/compile/configs.py +++ b/bigframes/core/compile/configs.py @@ -34,3 +34,4 @@ class CompileResult: sql: str sql_schema: typing.Sequence[google.cloud.bigquery.SchemaField] row_order: typing.Optional[ordering.RowOrdering] + encoded_type_refs: str diff --git a/bigframes/core/compile/ibis_compiler/ibis_compiler.py b/bigframes/core/compile/ibis_compiler/ibis_compiler.py index 31cd9a0456..9e209ea3b3 100644 --- a/bigframes/core/compile/ibis_compiler/ibis_compiler.py +++ b/bigframes/core/compile/ibis_compiler/ibis_compiler.py @@ -29,6 +29,7 @@ import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.configs as configs import bigframes.core.compile.explode +from bigframes.core.logging import data_types as data_type_logger import bigframes.core.nodes as nodes import bigframes.core.ordering as bf_ordering import bigframes.core.rewrite as rewrites @@ -56,15 +57,20 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: ) if request.sort_rows: result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node)) + encoded_type_refs = data_type_logger.encode_type_refs(result_node) sql = compile_result_node(result_node) return configs.CompileResult( - sql, result_node.schema.to_bigquery(), result_node.order_by + sql, + result_node.schema.to_bigquery(), + result_node.order_by, + encoded_type_refs, ) ordering: Optional[bf_ordering.RowOrdering] = result_node.order_by result_node = dataclasses.replace(result_node, order_by=None) result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node)) result_node = cast(nodes.ResultNode, rewrites.defer_selection(result_node)) + encoded_type_refs = data_type_logger.encode_type_refs(result_node) sql = compile_result_node(result_node) # Return the ordering iff no extra columns are needed to define the row order if ordering is not None: @@ -72,7 +78,9 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: ordering if ordering.referenced_columns.issubset(result_node.ids) else None ) assert (not request.materialize_all_order_keys) or (output_order is not None) - return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order) + return configs.CompileResult( + sql, result_node.schema.to_bigquery(), output_order, encoded_type_refs + ) def _replace_unsupported_ops(node: nodes.BigFrameNode): diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py index e77370892c..0276f2a879 100644 --- a/bigframes/core/compile/sqlglot/compiler.py +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -34,6 +34,7 @@ from bigframes.core.compile.sqlglot.expressions import typed_expr import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler import bigframes.core.compile.sqlglot.sqlglot_ir as ir +from bigframes.core.logging import data_types as data_type_logger import bigframes.core.ordering as bf_ordering from bigframes.core.rewrite import schema_binding @@ -65,9 +66,13 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: result_node = typing.cast( nodes.ResultNode, rewrite.defer_selection(result_node) ) + encoded_type_refs = data_type_logger.encode_type_refs(result_node) sql = _compile_result_node(result_node, uid_gen) return configs.CompileResult( - sql, result_node.schema.to_bigquery(), result_node.order_by + sql, + result_node.schema.to_bigquery(), + result_node.order_by, + encoded_type_refs, ) ordering: typing.Optional[bf_ordering.RowOrdering] = result_node.order_by @@ -76,6 +81,7 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: result_node = _remap_variables(result_node, uid_gen) result_node = typing.cast(nodes.ResultNode, rewrite.defer_selection(result_node)) + encoded_type_refs = data_type_logger.encode_type_refs(result_node) sql = _compile_result_node(result_node, uid_gen) # Return the ordering iff no extra columns are needed to define the row order if ordering is not None: @@ -83,7 +89,9 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult: ordering if ordering.referenced_columns.issubset(result_node.ids) else None ) assert (not request.materialize_all_order_keys) or (output_order is not None) - return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order) + return configs.CompileResult( + sql, result_node.schema.to_bigquery(), output_order, encoded_type_refs + ) def _remap_variables( diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index ca19d1be86..81c25c8bc4 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -318,6 +318,8 @@ def _export_gbq( clustering_fields=spec.cluster_cols if spec.cluster_cols else None, ) + # Attach data type usage to the job labels + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs # TODO(swast): plumb through the api_name of the user-facing api that # caused this query. iterator, job = self._run_execute_query( @@ -661,6 +663,8 @@ def _execute_plan_gbq( ) job_config.destination = destination_table + # Attach data type usage to the job labels + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs iterator, query_job = self._run_execute_query( sql=compiled.sql, job_config=job_config, diff --git a/tests/system/small/session/test_session_logging.py b/tests/system/small/session/test_session_logging.py new file mode 100644 index 0000000000..b951582309 --- /dev/null +++ b/tests/system/small/session/test_session_logging.py @@ -0,0 +1,40 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from bigframes.core.logging import data_types +import bigframes.session._io.bigquery as bq_io + + +def test_data_type_logging(scalars_df_index): + s = scalars_df_index["int64_col"] + 1.5 + + # We want to check the job_config passed to _query_and_wait_bigframes + with mock.patch( + "bigframes.session._io.bigquery.start_query_with_client", + wraps=bq_io.start_query_with_client, + ) as mock_query: + s.to_pandas() + + # Fetch job labels sent to the BQ client and verify their values + assert mock_query.called + call_args = mock_query.call_args + job_config = call_args.kwargs.get("job_config") + assert job_config is not None + job_labels = job_config.labels + assert "bigframes-dtypes" in job_labels + assert job_labels["bigframes-dtypes"] == data_types.encode_type_refs( + s._block._expr.node + )