From cfc1869d79f2817b7e805245f398a2335bd60058 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 27 Jan 2026 21:02:39 +0000 Subject: [PATCH] feat: Initial support for biglake iceberg tables --- bigframes/core/array_value.py | 13 +- bigframes/core/bq_data.py | 99 +++++++- .../compile/ibis_compiler/ibis_compiler.py | 4 +- bigframes/core/compile/sqlglot/compiler.py | 18 +- bigframes/core/nodes.py | 4 +- bigframes/core/schema.py | 27 +-- bigframes/core/sql/__init__.py | 3 +- bigframes/dtypes.py | 4 +- bigframes/operations/aggregations.py | 4 +- .../session/_io/bigquery/read_gbq_table.py | 216 +++++------------- bigframes/session/bq_caching_executor.py | 2 +- bigframes/session/dry_runs.py | 27 ++- bigframes/session/iceberg.py | 179 +++++++++++++++ bigframes/session/loader.py | 123 ++++++++-- bigframes/session/read_api_execution.py | 5 +- bigframes/streaming/dataframe.py | 2 +- tests/system/small/test_iceberg.py | 23 ++ tests/unit/test_planner.py | 4 +- 18 files changed, 516 insertions(+), 241 deletions(-) create mode 100644 bigframes/session/iceberg.py create mode 100644 tests/system/small/test_iceberg.py diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index 7901243e4b0..ccec1f9b954 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -17,9 +17,8 @@ import datetime import functools import typing -from typing import Iterable, List, Mapping, Optional, Sequence, Tuple +from typing import Iterable, List, Mapping, Optional, Sequence, Tuple, Union -import google.cloud.bigquery import pandas import pyarrow as pa @@ -91,7 +90,7 @@ def from_range(cls, start, end, step): @classmethod def from_table( cls, - table: google.cloud.bigquery.Table, + table: Union[bq_data.BiglakeIcebergTable, bq_data.GbqNativeTable], session: Session, *, columns: Optional[Sequence[str]] = None, @@ -103,8 +102,6 @@ def from_table( ): if offsets_col and primary_key: raise ValueError("must set at most one of 'offests', 'primary_key'") - # define data source only for needed columns, this makes row-hashing cheaper - table_def = bq_data.GbqTable.from_table(table, columns=columns or ()) # create ordering from info ordering = None @@ -115,7 +112,9 @@ def from_table( [ids.ColumnId(key_part) for key_part in primary_key] ) - bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns) + bf_schema = schemata.ArraySchema.from_bq_schema( + table.physical_schema, columns=columns + ) # Scan all columns by default, we define this list as it can be pruned while preserving source_def scan_list = nodes.ScanList( tuple( @@ -124,7 +123,7 @@ def from_table( ) ) source_def = bq_data.BigqueryDataSource( - table=table_def, + table=table, schema=bf_schema, at_time=at_time, sql_predicate=predicate, diff --git a/bigframes/core/bq_data.py b/bigframes/core/bq_data.py index 3b42ff7c031..42d883e0cd3 100644 --- a/bigframes/core/bq_data.py +++ b/bigframes/core/bq_data.py @@ -22,7 +22,7 @@ import queue import threading import typing -from typing import Any, Iterator, Optional, Sequence, Tuple +from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union from google.cloud import bigquery_storage_v1 import google.cloud.bigquery as bq @@ -37,23 +37,48 @@ import bigframes.core.ordering as orderings +# what is the line between metadata and core fields? Mostly metadata fields are optional or unreliable, but its fuzzy @dataclasses.dataclass(frozen=True) -class GbqTable: +class TableMetadata: + # this size metadata might be stale, don't use where strict correctness is needed + numBytes: Optional[int] = None + numRows: Optional[int] = None + location: Optional[str] = None + type: Optional[str] = None + created_time: Optional[datetime.datetime] = None + modified_time: Optional[datetime.datetime] = None + + +@dataclasses.dataclass(frozen=True) +class GbqNativeTable: project_id: str = dataclasses.field() dataset_id: str = dataclasses.field() table_id: str = dataclasses.field() physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field() is_physically_stored: bool = dataclasses.field() - cluster_cols: typing.Optional[Tuple[str, ...]] + partition_col: Optional[str] = None + cluster_cols: typing.Optional[Tuple[str, ...]] = None + primary_key: Optional[Tuple[str, ...]] = None + metadata: TableMetadata = TableMetadata() @staticmethod - def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: + def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqNativeTable: # Subsetting fields with columns can reduce cost of row-hash default ordering if columns: schema = tuple(item for item in table.schema if item.name in columns) else: schema = tuple(table.schema) - return GbqTable( + + metadata = TableMetadata( + numBytes=table.num_bytes, + numRows=table.num_rows, + location=table.location, # type: ignore + type=table.table_type, # type: ignore + created_time=table.created, + modified_time=table.modified, + ) + + return GbqNativeTable( project_id=table.project, dataset_id=table.dataset_id, table_id=table.table_id, @@ -62,6 +87,8 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: cluster_cols=None if table.clustering_fields is None else tuple(table.clustering_fields), + primary_key=tuple(_get_primary_keys(table)), + metadata=metadata, ) @staticmethod @@ -69,8 +96,8 @@ def from_ref_and_schema( table_ref: bq.TableReference, schema: Sequence[bq.SchemaField], cluster_cols: Optional[Sequence[str]] = None, - ) -> GbqTable: - return GbqTable( + ) -> GbqNativeTable: + return GbqNativeTable( project_id=table_ref.project, dataset_id=table_ref.dataset_id, table_id=table_ref.table_id, @@ -84,12 +111,48 @@ def get_table_ref(self) -> bq.TableReference: bq.DatasetReference(self.project_id, self.dataset_id), self.table_id ) + def get_full_id(self, quoted: bool = False) -> str: + if quoted: + return f"`{self.project_id}`.`{self.dataset_id}`.`{self.table_id}`" + return f"{self.project_id}.{self.dataset_id}.{self.table_id}" + @property @functools.cache def schema_by_id(self): return {col.name: col for col in self.physical_schema} +@dataclasses.dataclass(frozen=True) +class BiglakeIcebergTable: + project_id: str = dataclasses.field() + catalog_id: str = dataclasses.field() + namespace_id: str = dataclasses.field() + table_id: str = dataclasses.field() + physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field() + cluster_cols: typing.Optional[Tuple[str, ...]] + metadata: TableMetadata + + def get_full_id(self, quoted: bool = False) -> str: + if quoted: + return f"`{self.project_id}`.`{self.catalog_id}`.`{self.namespace_id}`.`{self.table_id}`" + return ( + f"{self.project_id}.{self.catalog_id}.{self.namespace_id}.{self.table_id}" + ) + + @property + @functools.cache + def schema_by_id(self): + return {col.name: col for col in self.physical_schema} + + @property + def partition_col(self) -> Optional[str]: + return None + + @property + def primary_key(self) -> Optional[Tuple[str, ...]]: + return None + + @dataclasses.dataclass(frozen=True) class BigqueryDataSource: """ @@ -104,7 +167,7 @@ def __post_init__(self): self.schema.names ) - table: GbqTable + table: Union[GbqNativeTable, BiglakeIcebergTable] schema: bigframes.core.schema.ArraySchema at_time: typing.Optional[datetime.datetime] = None # Added for backwards compatibility, not validated @@ -188,6 +251,8 @@ def get_arrow_batches( project_id: str, sample_rate: Optional[float] = None, ) -> ReadResult: + assert isinstance(data.table, GbqNativeTable) + table_mod_options = {} read_options_dict: dict[str, Any] = {"selected_fields": list(columns)} @@ -245,3 +310,21 @@ def process_batch(pa_batch): return ReadResult( batches, session.estimated_row_count, session.estimated_total_bytes_scanned ) + + +def _get_primary_keys( + table: bq.Table, +) -> List[str]: + """Get primary keys from table if they are set.""" + + primary_keys: List[str] = [] + if ( + (table_constraints := getattr(table, "table_constraints", None)) is not None + and (primary_key := table_constraints.primary_key) is not None + # This will be False for either None or empty list. + # We want primary_keys = None if no primary keys are set. + and (columns := primary_key.columns) + ): + primary_keys = columns if columns is not None else [] + + return primary_keys diff --git a/bigframes/core/compile/ibis_compiler/ibis_compiler.py b/bigframes/core/compile/ibis_compiler/ibis_compiler.py index 31cd9a0456b..c8ed82fc728 100644 --- a/bigframes/core/compile/ibis_compiler/ibis_compiler.py +++ b/bigframes/core/compile/ibis_compiler/ibis_compiler.py @@ -207,9 +207,7 @@ def _table_to_ibis( source: bq_data.BigqueryDataSource, scan_cols: typing.Sequence[str], ) -> ibis_types.Table: - full_table_name = ( - f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}" - ) + full_table_name = source.table.get_full_id(quoted=False) # Physical schema might include unused columns, unsupported datatypes like JSON physical_schema = ibis_bigquery.BigQuerySchema.to_ibis( list(source.table.physical_schema) diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py index e77370892c0..ae5fe8cdb37 100644 --- a/bigframes/core/compile/sqlglot/compiler.py +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -21,6 +21,7 @@ from bigframes.core import ( agg_expressions, + bq_data, expression, guid, identifiers, @@ -173,10 +174,21 @@ def compile_readlocal(node: nodes.ReadLocalNode, child: ir.SQLGlotIR) -> ir.SQLG @_compile_node.register def compile_readtable(node: nodes.ReadTableNode, child: ir.SQLGlotIR): table = node.source.table + if isinstance(table, bq_data.GbqNativeTable): + project, dataset, table_id = table.project_id, table.dataset_id, table.table_id + elif isinstance(table, bq_data.BiglakeIcebergTable): + project, dataset, table_id = ( + table.project_id, + table.catalog_id, + f"{table.namespace_id}.{table.table_id}", + ) + + else: + raise ValueError(f"Unrecognized table type: {table}") return ir.SQLGlotIR.from_table( - table.project_id, - table.dataset_id, - table.table_id, + project, + dataset, + table_id, col_names=[col.source_id for col in node.scan_list.items], alias_names=[col.id.sql for col in node.scan_list.items], uid_gen=child.uid_gen, diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index ddccb39ef98..4b1efcb285c 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -825,9 +825,7 @@ def variables_introduced(self) -> int: @property def row_count(self) -> typing.Optional[int]: - if self.source.sql_predicate is None and self.source.table.is_physically_stored: - return self.source.n_rows - return None + return self.source.n_rows @property def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]: diff --git a/bigframes/core/schema.py b/bigframes/core/schema.py index 395ad55f492..d0c6d8656cb 100644 --- a/bigframes/core/schema.py +++ b/bigframes/core/schema.py @@ -17,7 +17,7 @@ from dataclasses import dataclass import functools import typing -from typing import Dict, List, Optional, Sequence +from typing import Dict, Optional, Sequence import google.cloud.bigquery import pyarrow @@ -40,31 +40,16 @@ class ArraySchema: def __iter__(self): yield from self.items - @classmethod - def from_bq_table( - cls, - table: google.cloud.bigquery.Table, - column_type_overrides: Optional[ - typing.Dict[str, bigframes.dtypes.Dtype] - ] = None, - columns: Optional[Sequence[str]] = None, - ): - if not columns: - fields = table.schema - else: - lookup = {field.name: field for field in table.schema} - fields = [lookup[col] for col in columns] - - return ArraySchema.from_bq_schema( - fields, column_type_overrides=column_type_overrides - ) - @classmethod def from_bq_schema( cls, - schema: List[google.cloud.bigquery.SchemaField], + schema: Sequence[google.cloud.bigquery.SchemaField], column_type_overrides: Optional[Dict[str, bigframes.dtypes.Dtype]] = None, + columns: Optional[Sequence[str]] = None, ): + if columns: + lookup = {field.name: field for field in schema} + schema = [lookup[col] for col in columns] if column_type_overrides is None: column_type_overrides = {} items = tuple( diff --git a/bigframes/core/sql/__init__.py b/bigframes/core/sql/__init__.py index ccd2a16ddcd..de633b7b845 100644 --- a/bigframes/core/sql/__init__.py +++ b/bigframes/core/sql/__init__.py @@ -28,7 +28,6 @@ import bigframes.core.compile.googlesql as googlesql if TYPE_CHECKING: - import google.cloud.bigquery as bigquery import bigframes.core.ordering @@ -131,7 +130,7 @@ def infix_op(opname: str, left_arg: str, right_arg: str): return f"{left_arg} {opname} {right_arg}" -def is_distinct_sql(columns: Iterable[str], table_ref: bigquery.TableReference) -> str: +def is_distinct_sql(columns: Iterable[str], table_ref) -> str: is_unique_sql = f"""WITH full_table AS ( {googlesql.Select().from_(table_ref).select(columns).sql()} ), diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 29e1be1acea..8caddcdb002 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -800,7 +800,7 @@ def convert_to_schema_field( name, inner_field.field_type, mode="REPEATED", fields=inner_field.fields ) if pa.types.is_struct(bigframes_dtype.pyarrow_dtype): - inner_fields: list[pa.Field] = [] + inner_fields: list[google.cloud.bigquery.SchemaField] = [] struct_type = typing.cast(pa.StructType, bigframes_dtype.pyarrow_dtype) for i in range(struct_type.num_fields): field = struct_type.field(i) @@ -823,7 +823,7 @@ def convert_to_schema_field( def bf_type_from_type_kind( - bq_schema: list[google.cloud.bigquery.SchemaField], + bq_schema: Sequence[google.cloud.bigquery.SchemaField], ) -> typing.Dict[str, Dtype]: """Converts bigquery sql type to the default bigframes dtype.""" return {name: dtype for name, dtype in map(convert_schema_field, bq_schema)} diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 5fe83302638..eee710b2882 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -205,7 +205,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT return dtypes.TIMEDELTA_DTYPE if dtypes.is_numeric(input_types[0]): - if pd.api.types.is_bool_dtype(input_types[0]): + if pd.api.types.is_bool_dtype(input_types[0]): # type: ignore return dtypes.INT_DTYPE return input_types[0] @@ -224,7 +224,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT # These will change if median is changed to exact implementation. if not dtypes.is_orderable(input_types[0]): raise TypeError(f"Type {input_types[0]} is not orderable") - if pd.api.types.is_bool_dtype(input_types[0]): + if pd.api.types.is_bool_dtype(input_types[0]): # type: ignore return dtypes.INT_DTYPE else: return input_types[0] diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index e12fe502c0f..76c019a5594 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -20,15 +20,15 @@ import datetime import typing -from typing import Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Dict, Iterable, Optional, Sequence, Tuple, Union import warnings import bigframes_vendored.constants as constants import google.api_core.exceptions import google.cloud.bigquery as bigquery -import google.cloud.bigquery.table import bigframes.core +from bigframes.core import bq_data import bigframes.core.events import bigframes.exceptions as bfe import bigframes.session._io.bigquery @@ -98,81 +98,6 @@ def get_information_schema_metadata( return table -def get_table_metadata( - bqclient: bigquery.Client, - *, - table_id: str, - default_project: Optional[str], - bq_time: datetime.datetime, - cache: Dict[str, Tuple[datetime.datetime, bigquery.Table]], - use_cache: bool = True, - publisher: bigframes.core.events.Publisher, -) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]: - """Get the table metadata, either from cache or via REST API.""" - - cached_table = cache.get(table_id) - if use_cache and cached_table is not None: - snapshot_timestamp, table = cached_table - - if is_time_travel_eligible( - bqclient=bqclient, - table=table, - columns=None, - snapshot_time=snapshot_timestamp, - filter_str=None, - # Don't warn, because that will already have been taken care of. - should_warn=False, - should_dry_run=False, - publisher=publisher, - ): - # This warning should only happen if the cached snapshot_time will - # have any effect on bigframes (b/437090788). For example, with - # cached query results, such as after re-running a query, time - # travel won't be applied and thus this check is irrelevent. - # - # In other cases, such as an explicit read_gbq_table(), Cache hit - # could be unexpected. See internal issue 329545805. Raise a - # warning with more information about how to avoid the problems - # with the cache. - msg = bfe.format_message( - f"Reading cached table from {snapshot_timestamp} to avoid " - "incompatibilies with previous reads of this table. To read " - "the latest version, set `use_cache=False` or close the " - "current session with Session.close() or " - "bigframes.pandas.close_session()." - ) - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - # -> get_snapshot_datetime_and_table_metadata - warnings.warn(msg, category=bfe.TimeTravelCacheWarning, stacklevel=7) - - return cached_table - - if is_information_schema(table_id): - table = get_information_schema_metadata( - bqclient=bqclient, table_id=table_id, default_project=default_project - ) - else: - table_ref = google.cloud.bigquery.table.TableReference.from_string( - table_id, default_project=default_project - ) - table = bqclient.get_table(table_ref) - - # local time will lag a little bit do to network latency - # make sure it is at least table creation time. - # This is relevant if the table was created immediately before loading it here. - if (table.created is not None) and (table.created > bq_time): - bq_time = table.created - - cached_table = (bq_time, table) - cache[table_id] = cached_table - return cached_table - - def is_information_schema(table_id: str): table_id_casefold = table_id.casefold() # Include the "."s to ensure we don't have false positives for some user @@ -184,9 +109,13 @@ def is_information_schema(table_id: str): ) +def is_iceberg_table(table_id: str): + return len(table_id.split(".")) == 4 + + def is_time_travel_eligible( bqclient: bigquery.Client, - table: google.cloud.bigquery.table.Table, + table: Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable], columns: Optional[Sequence[str]], snapshot_time: datetime.datetime, filter_str: Optional[str] = None, @@ -220,43 +149,48 @@ def is_time_travel_eligible( # -> is_time_travel_eligible stacklevel = 7 - # Anonymous dataset, does not support snapshot ever - if table.dataset_id.startswith("_"): - return False + if isinstance(table, bq_data.GbqNativeTable): + # Anonymous dataset, does not support snapshot ever + if table.dataset_id.startswith("_"): + return False - # Only true tables support time travel - if table.table_id.endswith("*"): - if should_warn: - msg = bfe.format_message( - "Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. " - "Attempting query without time travel. Be aware that " - "modifications to the underlying data may result in errors or " - "unexpected behavior." - ) - warnings.warn( - msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel - ) - return False - elif table.table_type != "TABLE": - if table.table_type == "MATERIALIZED_VIEW": + # Only true tables support time travel + if table.table_id.endswith("*"): if should_warn: msg = bfe.format_message( - "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " - "Attempting query without time travel. Be aware that as materialized views " - "are updated periodically, modifications to the underlying data in the view may " - "result in errors or unexpected behavior." + "Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that " + "modifications to the underlying data may result in errors or " + "unexpected behavior." ) warnings.warn( msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel ) return False - elif table.table_type == "VIEW": - return False + elif table.metadata.type != "TABLE": + if table.metadata.type == "MATERIALIZED_VIEW": + if should_warn: + msg = bfe.format_message( + "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that as materialized views " + "are updated periodically, modifications to the underlying data in the view may " + "result in errors or unexpected behavior." + ) + warnings.warn( + msg, + category=bfe.TimeTravelDisabledWarning, + stacklevel=stacklevel, + ) + return False + elif table.metadata.type == "VIEW": + return False # table might support time travel, lets do a dry-run query with time travel if should_dry_run: snapshot_sql = bigframes.session._io.bigquery.to_query( - query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}", + query_or_table=table.get_full_id( + quoted=False + ), # to_query will quote for us columns=columns or (), sql_predicate=filter_str, time_travel_timestamp=snapshot_time, @@ -299,8 +233,8 @@ def is_time_travel_eligible( def infer_unique_columns( - table: google.cloud.bigquery.table.Table, - index_cols: List[str], + table: Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable], + index_cols: Sequence[str], ) -> Tuple[str, ...]: """Return a set of columns that can provide a unique row key or empty if none can be inferred. @@ -309,7 +243,7 @@ def infer_unique_columns( """ # If index_cols contain the primary_keys, the query engine assumes they are # provide a unique index. - primary_keys = tuple(_get_primary_keys(table)) + primary_keys = table.primary_key or () if (len(primary_keys) > 0) and frozenset(primary_keys) <= frozenset(index_cols): # Essentially, just reordering the primary key to match the index col order return tuple(index_col for index_col in index_cols if index_col in primary_keys) @@ -322,8 +256,8 @@ def infer_unique_columns( def check_if_index_columns_are_unique( bqclient: bigquery.Client, - table: google.cloud.bigquery.table.Table, - index_cols: List[str], + table: Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable], + index_cols: Sequence[str], *, publisher: bigframes.core.events.Publisher, ) -> Tuple[str, ...]: @@ -332,7 +266,9 @@ def check_if_index_columns_are_unique( # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring # table_expression only selects just index_cols. - is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference) + is_unique_sql = bigframes.core.sql.is_distinct_sql( + index_cols, table.get_full_id(quoted=False) + ) job_config = bigquery.QueryJobConfig() results, _ = bigframes.session._io.bigquery.start_query_with_client( bq_client=bqclient, @@ -352,49 +288,8 @@ def check_if_index_columns_are_unique( return () -def _get_primary_keys( - table: google.cloud.bigquery.table.Table, -) -> List[str]: - """Get primary keys from table if they are set.""" - - primary_keys: List[str] = [] - if ( - (table_constraints := getattr(table, "table_constraints", None)) is not None - and (primary_key := table_constraints.primary_key) is not None - # This will be False for either None or empty list. - # We want primary_keys = None if no primary keys are set. - and (columns := primary_key.columns) - ): - primary_keys = columns if columns is not None else [] - - return primary_keys - - -def _is_table_clustered_or_partitioned( - table: google.cloud.bigquery.table.Table, -) -> bool: - """Returns True if the table is clustered or partitioned.""" - - # Could be None or an empty tuple if it's not clustered, both of which are - # falsey. - if table.clustering_fields: - return True - - if ( - time_partitioning := table.time_partitioning - ) is not None and time_partitioning.type_ is not None: - return True - - if ( - range_partitioning := table.range_partitioning - ) is not None and range_partitioning.field is not None: - return True - - return False - - def get_index_cols( - table: google.cloud.bigquery.table.Table, + table: Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable], index_col: Iterable[str] | str | Iterable[int] @@ -403,7 +298,7 @@ def get_index_cols( *, rename_to_schema: Optional[Dict[str, str]] = None, default_index_type: bigframes.enums.DefaultIndexKind = bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, -) -> List[str]: +) -> Sequence[str]: """ If we can get a total ordering from the table, such as via primary key column(s), then return those too so that ordering generation can be @@ -411,9 +306,9 @@ def get_index_cols( """ # Transform index_col -> index_cols so we have a variable that is # always a list of column names (possibly empty). - schema_len = len(table.schema) + schema_len = len(table.physical_schema) - index_cols: List[str] = [] + index_cols = [] if isinstance(index_col, bigframes.enums.DefaultIndexKind): if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: # User has explicity asked for a default, sequential index. @@ -438,7 +333,7 @@ def get_index_cols( f"Integer index {index_col} is out of bounds " f"for table with {schema_len} columns (must be >= 0 and < {schema_len})." ) - index_cols = [table.schema[index_col].name] + index_cols = [table.physical_schema[index_col].name] elif isinstance(index_col, Iterable): for item in index_col: if isinstance(item, str): @@ -451,7 +346,7 @@ def get_index_cols( f"Integer index {item} is out of bounds " f"for table with {schema_len} columns (must be >= 0 and < {schema_len})." ) - index_cols.append(table.schema[item].name) + index_cols.append(table.physical_schema[item].name) else: raise TypeError( "If index_col is an iterable, it must contain either strings " @@ -466,19 +361,20 @@ def get_index_cols( # If the isn't an index selected, use the primary keys of the table as the # index. If there are no primary keys, we'll return an empty list. if len(index_cols) == 0: - primary_keys = _get_primary_keys(table) + primary_keys = table.primary_key or () # If table has clustering/partitioning, fail if we haven't been able to # find index_cols to use. This is to avoid unexpected performance and # resource utilization because of the default sequential index. See # internal issue 335727141. if ( - _is_table_clustered_or_partitioned(table) + table.partition_col is not None + or table.cluster_cols and not primary_keys and default_index_type == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64 ): msg = bfe.format_message( - f"Table '{str(table.reference)}' is clustered and/or " + f"Table '{str(table.get_full_id())}' is clustered and/or " "partitioned, but BigQuery DataFrames was not able to find a " "suitable index. To avoid this warning, set at least one of: " # TODO(b/338037499): Allow max_results to override this too, @@ -490,6 +386,6 @@ def get_index_cols( # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make # the same assumption and use these columns as the total ordering keys. - index_cols = primary_keys + index_cols = list(primary_keys) return index_cols diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index ca19d1be86f..45730651aac 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -675,7 +675,7 @@ def _execute_plan_gbq( result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema)) dst = query_job.destination result_bq_data = bq_data.BigqueryDataSource( - table=bq_data.GbqTable( + table=bq_data.GbqNativeTable( dst.project, dst.dataset_id, dst.table_id, diff --git a/bigframes/session/dry_runs.py b/bigframes/session/dry_runs.py index bd54bb65d7b..99ac2b360e3 100644 --- a/bigframes/session/dry_runs.py +++ b/bigframes/session/dry_runs.py @@ -14,16 +14,18 @@ from __future__ import annotations import copy -from typing import Any, Dict, List, Sequence +from typing import Any, Dict, List, Sequence, Union from google.cloud import bigquery import pandas from bigframes import dtypes -from bigframes.core import bigframe_node, nodes +from bigframes.core import bigframe_node, bq_data, nodes -def get_table_stats(table: bigquery.Table) -> pandas.Series: +def get_table_stats( + table: Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable] +) -> pandas.Series: values: List[Any] = [] index: List[Any] = [] @@ -32,7 +34,7 @@ def get_table_stats(table: bigquery.Table) -> pandas.Series: values.append(False) # Populate column and index types - col_dtypes = dtypes.bf_type_from_type_kind(table.schema) + col_dtypes = dtypes.bf_type_from_type_kind(table.physical_schema) index.append("columnCount") values.append(len(col_dtypes)) index.append("columnDtypes") @@ -40,17 +42,22 @@ def get_table_stats(table: bigquery.Table) -> pandas.Series: # Add raw BQ schema index.append("bigquerySchema") - values.append(table.schema) + values.append(table.physical_schema) - for key in ("numBytes", "numRows", "location", "type"): - index.append(key) - values.append(table._properties[key]) + index.append("numBytes") + values.append(table.metadata.numBytes) + index.append("numRows") + values.append(table.metadata.numRows) + index.append("location") + values.append(table.metadata.location) + index.append("type") + values.append(table.metadata.type) index.append("creationTime") - values.append(table.created) + values.append(table.metadata.created_time) index.append("lastModifiedTime") - values.append(table.modified) + values.append(table.metadata.modified_time) return pandas.Series(values, index=index) diff --git a/bigframes/session/iceberg.py b/bigframes/session/iceberg.py new file mode 100644 index 00000000000..0137c85b4d7 --- /dev/null +++ b/bigframes/session/iceberg.py @@ -0,0 +1,179 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import datetime +from typing import List + +import google.cloud.bigquery as bq +import pyiceberg +from pyiceberg.catalog import load_catalog +import pyiceberg.schema +import pyiceberg.types + +from bigframes.core import bq_data + + +def get_table( + user_project_id: str, full_table_id: str, credentials +) -> bq_data.BiglakeIcebergTable: + table_parts = full_table_id.split(".") + if len(table_parts) != 4: + raise ValueError("Iceberg catalog table must contain exactly 4 parts") + + catalog_project_id, catalog_id, namespace, table = table_parts + + token = credentials.token + + base_uri = "https://biglake.googleapis.com/iceberg/v1/restcatalog" + catalog_prefix = f"/v1/projects/{catalog_project_id}/catalogs/{catalog_id}" + + catalog = load_catalog( + f"{catalog_project_id}.{catalog_id}", + **{ + "uri": base_uri, + "prefix": catalog_prefix, + "header.x-goog-user-project": user_project_id, + "oauth2-server-uri": "https://oauth2.googleapis.com/token", + "token": token, + "warehouse": f"gs://{catalog_id}", + }, + ) + + iceberg_table = catalog.load_table(f"{namespace}.{table}") + bq_schema = pyiceberg.schema.visit(iceberg_table.schema(), SchemaVisitor()) + # TODO: Handle physical layout to help optimize + # TODO: Use snapshot metadata to get row, byte counts + return bq_data.BiglakeIcebergTable( + catalog_project_id, + catalog_id, + namespace, + table, + physical_schema=bq_schema, + cluster_cols=(), + metadata=bq_data.TableMetadata( + type="TABLE", + modified_time=datetime.datetime.fromtimestamp( + iceberg_table.metadata.last_updated_ms / 1000.0 + ), + ), + ) + + +class SchemaVisitor(pyiceberg.schema.SchemaVisitorPerPrimitiveType[bq.SchemaField]): + def schema(self, schema: pyiceberg.schema.Schema, struct_result: bq.SchemaField) -> tuple[bq.SchemaField, ...]: # type: ignore + return tuple(f for f in struct_result.fields) + + def struct( + self, struct: pyiceberg.types.StructType, field_results: List[bq.SchemaField] + ) -> bq.SchemaField: + return bq.SchemaField("", "RECORD", fields=field_results) + + def field( + self, field: pyiceberg.types.NestedField, field_result: bq.SchemaField + ) -> bq.SchemaField: + return bq.SchemaField( + field.name, + field_result.field_type, + mode=field_result.mode or "NULLABLE", + fields=field_result.fields, + ) + + def map( + self, + map_type: pyiceberg.types.MapType, + key_result: bq.SchemaField, + value_result: bq.SchemaField, + ) -> bq.SchemaField: + return bq.SchemaField("", "UNKNOWN") + + def list( + self, list_type: pyiceberg.types.ListType, element_result: bq.SchemaField + ) -> bq.SchemaField: + return bq.SchemaField( + "", element_result.field_type, mode="REPEATED", fields=element_result.fields + ) + + def visit_fixed(self, fixed_type: pyiceberg.types.FixedType) -> bq.SchemaField: + return bq.SchemaField("", "UNKNOWN") + + def visit_decimal( + self, decimal_type: pyiceberg.types.DecimalType + ) -> bq.SchemaField: + # BIGNUMERIC not supported in iceberg tables yet, so just assume numeric + return bq.SchemaField("", "NUMERIC") + + def visit_boolean( + self, boolean_type: pyiceberg.types.BooleanType + ) -> bq.SchemaField: + return bq.SchemaField("", "NUMERIC") + + def visit_integer( + self, integer_type: pyiceberg.types.IntegerType + ) -> bq.SchemaField: + return bq.SchemaField("", "INTEGER") + + def visit_long(self, long_type: pyiceberg.types.LongType) -> bq.SchemaField: + return bq.SchemaField("", "INTEGER") + + def visit_float(self, float_type: pyiceberg.types.FloatType) -> bq.SchemaField: + # 32-bit IEEE 754 floating point + return bq.SchemaField("", "FLOAT") + + def visit_double(self, double_type: pyiceberg.types.DoubleType) -> bq.SchemaField: + # 64-bit IEEE 754 floating point + return bq.SchemaField("", "FLOAT") + + def visit_date(self, date_type: pyiceberg.types.DateType) -> bq.SchemaField: + # Date encoded as an int + return bq.SchemaField("", "DATE") + + def visit_time(self, time_type: pyiceberg.types.TimeType) -> bq.SchemaField: + return bq.SchemaField("", "TIME") + + def visit_timestamp( + self, timestamp_type: pyiceberg.types.TimestampType + ) -> bq.SchemaField: + return bq.SchemaField("", "DATETIME") + + def visit_timestamp_ns( + self, timestamp_type: pyiceberg.types.TimestampNanoType + ) -> bq.SchemaField: + return bq.SchemaField("", "UNKNOWN") + + def visit_timestamptz( + self, timestamptz_type: pyiceberg.types.TimestamptzType + ) -> bq.SchemaField: + return bq.SchemaField("", "TIMESTAMP") + + def visit_timestamptz_ns( + self, timestamptz_ns_type: pyiceberg.types.TimestamptzNanoType + ) -> bq.SchemaField: + return bq.SchemaField("", "UNKNOWN") + + def visit_string(self, string_type: pyiceberg.types.StringType) -> bq.SchemaField: + return bq.SchemaField("", "STRING") + + def visit_uuid(self, uuid_type: pyiceberg.types.UUIDType) -> bq.SchemaField: + return bq.SchemaField("", "UNKNOWN") + + def visit_unknown( + self, unknown_type: pyiceberg.types.UnknownType + ) -> bq.SchemaField: + """Type `UnknownType` can be promoted to any primitive type in V3+ tables per the Iceberg spec.""" + return bq.SchemaField("", "UNKNOWN") + + def visit_binary(self, binary_type: pyiceberg.types.BinaryType) -> bq.SchemaField: + return bq.SchemaField("", "BINARY") diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 9c18d727c80..44cec704730 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -39,7 +39,9 @@ Sequence, Tuple, TypeVar, + Union, ) +import warnings import bigframes_vendored.constants as constants import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq @@ -68,11 +70,13 @@ import bigframes.core.events import bigframes.core.schema as schemata import bigframes.dtypes +import bigframes.exceptions as bfe import bigframes.formatting_helpers as formatting_helpers from bigframes.session import dry_runs import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_query as bf_read_gbq_query import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table +import bigframes.session.iceberg import bigframes.session.metrics import bigframes.session.temporary_storage import bigframes.session.time as session_time @@ -98,6 +102,8 @@ bigframes.dtypes.TIMEDELTA_DTYPE: "INTEGER", } +TABLE_TYPE = Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable] + def _to_index_cols( index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (), @@ -287,7 +293,7 @@ def __init__( self._default_index_type = default_index_type self._scan_index_uniqueness = scan_index_uniqueness self._force_total_order = force_total_order - self._df_snapshot: Dict[str, Tuple[datetime.datetime, bigquery.Table]] = {} + self._df_snapshot: Dict[str, Tuple[datetime.datetime, TABLE_TYPE]] = {} self._metrics = metrics self._publisher = publisher # Unfortunate circular reference, but need to pass reference when constructing objects @@ -391,7 +397,7 @@ def load_data( # must get table metadata after load job for accurate metadata destination_table = self._bqclient.get_table(load_table_destination) return bq_data.BigqueryDataSource( - bq_data.GbqTable.from_table(destination_table), + bq_data.GbqNativeTable.from_table(destination_table), schema=schema_w_offsets, ordering=ordering.TotalOrdering.from_offset_col(offsets_col), n_rows=data.metadata.row_count, @@ -445,7 +451,7 @@ def stream_data( ) destination_table = self._bqclient.get_table(load_table_destination) return bq_data.BigqueryDataSource( - bq_data.GbqTable.from_table(destination_table), + bq_data.GbqNativeTable.from_table(destination_table), schema=schema_w_offsets, ordering=ordering.TotalOrdering.from_offset_col(offsets_col), n_rows=data.metadata.row_count, @@ -544,7 +550,7 @@ def request_generator(): for error in response.stream_errors: raise ValueError(f"Errors commiting stream {error}") - result_table = bq_data.GbqTable.from_ref_and_schema( + result_table = bq_data.GbqNativeTable.from_ref_and_schema( bq_table_ref, schema=bq_schema, cluster_cols=[offsets_col] ) return bq_data.BigqueryDataSource( @@ -716,33 +722,35 @@ def read_gbq_table( # Fetch table metadata and validate # --------------------------------- - time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( - self._bqclient, + time_travel_timestamp, table = self._get_table_metadata( table_id=table_id, default_project=self._bqclient.project, bq_time=self._clock.get_time(), - cache=self._df_snapshot, use_cache=use_cache, - publisher=self._publisher, ) - if table.location.casefold() != self._storage_manager.location.casefold(): + if ( + isinstance(table, bq_data.GbqNativeTable) + and (table.metadata.location or "").casefold() + != self._storage_manager.location.casefold() + ): raise ValueError( - f"Current session is in {self._storage_manager.location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" + f"Current session is in {self._storage_manager.location} but table '{table.get_full_id()}' is located in {table.metadata.location}" ) - table_column_names = [field.name for field in table.schema] + table_column_names = [field.name for field in table.physical_schema] rename_to_schema: Optional[Dict[str, str]] = None if names is not None: _check_names_param(names, index_col, columns, table_column_names) # Additional unnamed columns is going to set as index columns len_names = len(list(names)) - len_schema = len(table.schema) + len_schema = len(table.physical_schema) if len(columns) == 0 and len_names < len_schema: index_col = range(len_schema - len_names) names = [ - field.name for field in table.schema[: len_schema - len_names] + field.name + for field in table.physical_schema[: len_schema - len_names] ] + list(names) assert len_schema >= len_names @@ -799,7 +807,7 @@ def read_gbq_table( itertools.chain(index_cols, columns) if columns else () ) query = bf_io_bigquery.to_query( - f"{table.project}.{table.dataset_id}.{table.table_id}", + table.get_full_id(quoted=False), columns=all_columns, sql_predicate=bf_io_bigquery.compile_filters(filters) if filters @@ -884,7 +892,7 @@ def read_gbq_table( bigframes.core.events.ExecutionFinished(), ) - selected_cols = None if include_all_columns else index_cols + columns + selected_cols = None if include_all_columns else (*index_cols, *columns) array_value = core.ArrayValue.from_table( table, columns=selected_cols, @@ -959,6 +967,90 @@ def read_gbq_table( df.sort_index() return df + def _get_table_metadata( + self, + *, + table_id: str, + default_project: Optional[str], + bq_time: datetime.datetime, + use_cache: bool = True, + ) -> Tuple[ + datetime.datetime, Union[bq_data.GbqNativeTable, bq_data.BiglakeIcebergTable] + ]: + """Get the table metadata, either from cache or via REST API.""" + + cached_table = self._df_snapshot.get(table_id) + if use_cache and cached_table is not None: + snapshot_timestamp, table = cached_table + + if bf_read_gbq_table.is_time_travel_eligible( + bqclient=self._bqclient, + table=table, + columns=None, + snapshot_time=snapshot_timestamp, + filter_str=None, + # Don't warn, because that will already have been taken care of. + should_warn=False, + should_dry_run=False, + publisher=self._publisher, + ): + # This warning should only happen if the cached snapshot_time will + # have any effect on bigframes (b/437090788). For example, with + # cached query results, such as after re-running a query, time + # travel won't be applied and thus this check is irrelevent. + # + # In other cases, such as an explicit read_gbq_table(), Cache hit + # could be unexpected. See internal issue 329545805. Raise a + # warning with more information about how to avoid the problems + # with the cache. + msg = bfe.format_message( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session()." + ) + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + warnings.warn(msg, category=bfe.TimeTravelCacheWarning, stacklevel=7) + + return cached_table + + if bf_read_gbq_table.is_information_schema(table_id): + client_table = bf_read_gbq_table.get_information_schema_metadata( + bqclient=self._bqclient, + table_id=table_id, + default_project=default_project, + ) + table = bq_data.GbqNativeTable.from_table(client_table) + elif bf_read_gbq_table.is_iceberg_table(table_id): + table = bigframes.session.iceberg.get_table( + self._bqclient.project, table_id, self._bqclient._credentials + ) + else: + table_ref = google.cloud.bigquery.table.TableReference.from_string( + table_id, default_project=default_project + ) + client_table = self._bqclient.get_table(table_ref) + table = bq_data.GbqNativeTable.from_table(client_table) + + # local time will lag a little bit do to network latency + # make sure it is at least table creation time. + # This is relevant if the table was created immediately before loading it here. + if (table.metadata.created_time is not None) and ( + table.metadata.created_time > bq_time + ): + bq_time = table.metadata.created_time + + cached_table = (bq_time, table) + self._df_snapshot[table_id] = cached_table + return cached_table + def load_file( self, filepath_or_buffer: str | IO["bytes"], @@ -1359,6 +1451,7 @@ def _start_query_with_job( def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: + """ For backwards-compatibility, convert any previously client-side only parameters such as timeoutMs to the property name expected by the REST API. diff --git a/bigframes/session/read_api_execution.py b/bigframes/session/read_api_execution.py index c7138f7b307..9f2d196ce8e 100644 --- a/bigframes/session/read_api_execution.py +++ b/bigframes/session/read_api_execution.py @@ -17,7 +17,7 @@ from google.cloud import bigquery_storage_v1 -from bigframes.core import bigframe_node, nodes, rewrite +from bigframes.core import bigframe_node, bq_data, nodes, rewrite from bigframes.session import executor, semi_executor @@ -47,6 +47,9 @@ def execute( if node.explicitly_ordered and ordered: return None + if not isinstance(node.source.table, bq_data.GbqNativeTable): + return None + if not node.source.table.is_physically_stored: return None diff --git a/bigframes/streaming/dataframe.py b/bigframes/streaming/dataframe.py index b7b67178cea..1dfd0529c7e 100644 --- a/bigframes/streaming/dataframe.py +++ b/bigframes/streaming/dataframe.py @@ -251,7 +251,7 @@ def _from_table_df(cls, df: dataframe.DataFrame) -> StreamingDataFrame: def _original_table(self): def traverse(node: nodes.BigFrameNode): if isinstance(node, nodes.ReadTableNode): - return f"{node.source.table.project_id}.{node.source.table.dataset_id}.{node.source.table.table_id}" + return node.source.table.get_full_id(quoted=False) for child in node.child_nodes: original_table = traverse(child) if original_table: diff --git a/tests/system/small/test_iceberg.py b/tests/system/small/test_iceberg.py new file mode 100644 index 00000000000..271ecde210e --- /dev/null +++ b/tests/system/small/test_iceberg.py @@ -0,0 +1,23 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import bigframes.pandas as bpd + + +def test_read_iceberg_table(): + bpd.reset_session() + bpd.config.options.bigquery.location = "us-central1" + df = bpd.read_gbq( + "bigquery-public-data.biglake-public-nyc-taxi-iceberg.public_data.nyc_taxicab_2021" + ) + assert df.shape == (30904427, 20) diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index 66d83f362dd..36a568a4165 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -19,9 +19,9 @@ import pandas as pd import bigframes.core as core +import bigframes.core.bq_data import bigframes.core.expression as ex import bigframes.core.identifiers as ids -import bigframes.core.schema import bigframes.operations as ops import bigframes.session.planner as planner @@ -38,7 +38,7 @@ type(FAKE_SESSION)._strictly_ordered = mock.PropertyMock(return_value=True) LEAF: core.ArrayValue = core.ArrayValue.from_table( session=FAKE_SESSION, - table=TABLE, + table=bigframes.core.bq_data.GbqNativeTable.from_table(TABLE), )