From 8784664d2b5679fa051f826ba47183961acd09e0 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Wed, 21 Jan 2026 18:33:56 +0000 Subject: [PATCH] feat: support biglake tables in pandas_gbq.sample --- pandas_gbq/core/biglake.py | 61 ++++++++++++++ pandas_gbq/core/sample.py | 142 +++++++++++++++++++++------------ setup.py | 1 + testing/constraints-3.9.txt | 1 + tests/unit/test_core_sample.py | 24 +----- 5 files changed, 160 insertions(+), 69 deletions(-) create mode 100644 pandas_gbq/core/biglake.py diff --git a/pandas_gbq/core/biglake.py b/pandas_gbq/core/biglake.py new file mode 100644 index 00000000..f00863b0 --- /dev/null +++ b/pandas_gbq/core/biglake.py @@ -0,0 +1,61 @@ +# Copyright (c) 2026 pandas-gbq Authors All rights reserved. +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file. + +from __future__ import annotations + +import dataclasses + +import google.auth.transport.requests +import google.oauth2.credentials + +_ICEBERG_REST_CATALOG_URI = "https://biglake.googleapis.com/iceberg/v1/restcatalog" +_TABLE_METADATA_PATH = ( + "/v1/projects/{project}/catalogs/{catalog}/namespaces/{namespace}/tables/{table}" +) + + +@dataclasses.dataclass(frozen=True) +class BigLakeTableId: + project: str + catalog: str + namespace: str + table: str + + +def get_table_metadata( + *, + table_id: str, + credentials: google.oauth2.credentials.Credentials, + billing_project_id: str, +): + """ + Docstring for get_table_metadata + + https://iceberg.apache.org/spec/#metrics; + + curl -X GET -H "Authorization: Bearer \"$(gcloud auth application-default print-access-token)\"" \ + -H "Content-Type: application/json; charset=utf-8" \ + -H 'x-goog-user-project: swast-scratch' \ + -H 'X-Iceberg-Access-Delegation: vended-credentials' \ + """ + # https://iceberg.apache.org/spec/#metrics + # total-files-size + project, catalog, namespace, table = table_id.split(".") + session = google.auth.transport.requests.AuthorizedSession(credentials=credentials) + path = _TABLE_METADATA_PATH.format( + project=project, + catalog=catalog, + namespace=namespace, + table=table, + ) + return session.get( + f"{_ICEBERG_REST_CATALOG_URI}.{path}", + headers={ + "x-goog-user-project": billing_project_id, + "Content-Type": "application/json; charset=utf-8", + # TODO(tswast): parameter for this option (or get from catalog metadata?) + # /iceberg/{$api_version}/restcatalog/extensions/{name=projects/*/catalogs/*} + "X-Iceberg-Access-Delegation": "vended-credentials", + }, + ).json() diff --git a/pandas_gbq/core/sample.py b/pandas_gbq/core/sample.py index 49eee4b5..b86ce501 100644 --- a/pandas_gbq/core/sample.py +++ b/pandas_gbq/core/sample.py @@ -14,6 +14,7 @@ import pandas_gbq.constants import pandas_gbq.core.read +import pandas_gbq.core.biglake import pandas_gbq.gbq_connector # Only import at module-level at type checking time to avoid circular @@ -156,7 +157,7 @@ def _download_results_in_parallel( def _sample_with_tablesample( - table: google.cloud.bigquery.Table, + table_id: str, *, bqclient: google.cloud.bigquery.Client, proportion: float, @@ -166,7 +167,7 @@ def _sample_with_tablesample( ) -> Optional[pandas.DataFrame]: query = f""" SELECT * - FROM `{table.project}.{table.dataset_id}.{table.table_id}` + FROM `{table_id}` TABLESAMPLE SYSTEM ({float(proportion) * 100.0} PERCENT) ORDER BY RAND() DESC LIMIT {int(target_row_count)}; @@ -181,7 +182,7 @@ def _sample_with_tablesample( def _sample_with_limit( - table: google.cloud.bigquery.Table, + table_id: str, *, bqclient: google.cloud.bigquery.Client, target_row_count: int, @@ -190,7 +191,7 @@ def _sample_with_limit( ) -> Optional[pandas.DataFrame]: query = f""" SELECT * - FROM `{table.project}.{table.dataset_id}.{table.table_id}` + FROM `{table_id}` ORDER BY RAND() DESC LIMIT {int(target_row_count)}; """ @@ -203,6 +204,82 @@ def _sample_with_limit( ) +def _sample_biglake_table( + *, + table_id: str, + credentials: google.oauth2.credentials.Credentials, + bqclient: google.cloud.bigquery.Client, + target_bytes: int, + progress_bar_type: str | None, + use_bqstorage_api: bool, +) -> Optional[pandas.DataFrame]: + pass + + +def _sample_bq_table( + *, + table_id: str, + bqclient: google.cloud.bigquery.Client, + target_bytes: int, + progress_bar_type: str | None, + use_bqstorage_api: bool, +) -> Optional[pandas.DataFrame]: + table = bqclient.get_table(table_id) + num_rows = table.num_rows + num_bytes = table.num_bytes + table_type = table.table_type + + # Some tables such as views report 0 despite actually having rows. + if num_bytes == 0: + num_bytes = None + + # Table is small enough to download the whole thing. + if ( + table_type in _READ_API_ELIGIBLE_TYPES + and num_bytes is not None + and num_bytes <= target_bytes + ): + rows_iter = bqclient.list_rows(table) + return pandas_gbq.core.read.download_results( + rows_iter, + bqclient=bqclient, + progress_bar_type=progress_bar_type, + warn_on_large_results=False, + max_results=None, + user_dtypes=None, + use_bqstorage_api=use_bqstorage_api, + ) + + target_row_count = _estimate_limit( + target_bytes=target_bytes, + table_bytes=num_bytes, + table_rows=num_rows, + fields=table.schema, + ) + + # Table is eligible for TABLESAMPLE. + if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES: + proportion = target_bytes / num_bytes + return _sample_with_tablesample( + f"{table.project}.{table.dataset_id}.{table.table_id}", + bqclient=bqclient, + proportion=proportion, + target_row_count=target_row_count, + progress_bar_type=progress_bar_type, + use_bqstorage_api=use_bqstorage_api, + ) + + # Not eligible for TABLESAMPLE or reading directly, so take a random sample + # with a full table scan. + return _sample_with_limit( + f"{table.project}.{table.dataset_id}.{table.table_id}", + bqclient=bqclient, + target_row_count=target_row_count, + progress_bar_type=progress_bar_type, + use_bqstorage_api=use_bqstorage_api, + ) + + def sample( table_id: str, *, @@ -267,57 +344,24 @@ def sample( ) credentials = cast(google.oauth2.credentials.Credentials, connector.credentials) bqclient = connector.get_client() - table = bqclient.get_table(table_id) - num_rows = table.num_rows - num_bytes = table.num_bytes - table_type = table.table_type - # Some tables such as views report 0 despite actually having rows. - if num_bytes == 0: - num_bytes = None - - # Table is small enough to download the whole thing. - if ( - table_type in _READ_API_ELIGIBLE_TYPES - and num_bytes is not None - and num_bytes <= target_bytes - ): - rows_iter = bqclient.list_rows(table) - return pandas_gbq.core.read.download_results( - rows_iter, + # BigLake tables can't be read directly by the BQ Storage Read API, so make + # sure we run a query first. + parts = table_id.split(".") + if len(parts) == 4: + return _sample_biglake_table( + table_id=table_id, + credentials=credentials, bqclient=bqclient, + target_bytes=target_bytes, progress_bar_type=progress_bar_type, - warn_on_large_results=False, - max_results=None, - user_dtypes=None, use_bqstorage_api=use_bqstorage_api, ) - - target_row_count = _estimate_limit( - target_bytes=target_bytes, - table_bytes=num_bytes, - table_rows=num_rows, - fields=table.schema, - ) - - # Table is eligible for TABLESAMPLE. - if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES: - proportion = target_bytes / num_bytes - return _sample_with_tablesample( - table, + else: + return _sample_bq_table( + table_id=table_id, bqclient=bqclient, - proportion=proportion, - target_row_count=target_row_count, + target_bytes=target_bytes, progress_bar_type=progress_bar_type, use_bqstorage_api=use_bqstorage_api, ) - - # Not eligible for TABLESAMPLE or reading directly, so take a random sample - # with a full table scan. - return _sample_with_limit( - table, - bqclient=bqclient, - target_row_count=target_row_count, - progress_bar_type=progress_bar_type, - use_bqstorage_api=use_bqstorage_api, - ) diff --git a/setup.py b/setup.py index 66d5edf2..c0a6e442 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ # allow pandas-gbq to detect invalid package versions at runtime. "google-cloud-bigquery >=3.20.0,<4.0.0", "packaging >=22.0.0", + "requests >= 2.20.0, < 3.0.0", ] extras = { "bqstorage": [ diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index aff46b28..f164eb37 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -16,6 +16,7 @@ google-auth==2.14.1 google-auth-oauthlib==0.7.0 google-cloud-bigquery==3.20.0 packaging==22.0.0 +requests==2.20.0 # Extras google-cloud-bigquery-storage==2.16.2 tqdm==4.23.0 diff --git a/tests/unit/test_core_sample.py b/tests/unit/test_core_sample.py index 5e5a15e7..7ed5b7ba 100644 --- a/tests/unit/test_core_sample.py +++ b/tests/unit/test_core_sample.py @@ -207,16 +207,11 @@ def test_estimate_limit(target_bytes, table_bytes, table_rows, fields, expected_ @mock.patch("pandas_gbq.core.read.download_results") def test_sample_with_tablesample(mock_download_results, mock_bigquery_client): - mock_table = mock.Mock(spec=google.cloud.bigquery.Table) - mock_table.project = "test-project" - mock_table.dataset_id = "test_dataset" - mock_table.table_id = "test_table" - proportion = 0.1 target_row_count = 100 pandas_gbq.core.sample._sample_with_tablesample( - mock_table, + "test-project.test_dataset.test_table", bqclient=mock_bigquery_client, proportion=proportion, target_row_count=target_row_count, @@ -226,25 +221,17 @@ def test_sample_with_tablesample(mock_download_results, mock_bigquery_client): query = mock_bigquery_client.query_and_wait.call_args[0][0] assert "TABLESAMPLE SYSTEM (10.0 PERCENT)" in query assert "LIMIT 100" in query - assert ( - f"FROM `{mock_table.project}.{mock_table.dataset_id}.{mock_table.table_id}`" - in query - ) + assert "FROM `test-project.test_dataset.test_table`" in query mock_download_results.assert_called_once() @mock.patch("pandas_gbq.core.read.download_results") def test_sample_with_limit(mock_download_results, mock_bigquery_client): - mock_table = mock.Mock(spec=google.cloud.bigquery.Table) - mock_table.project = "test-project" - mock_table.dataset_id = "test_dataset" - mock_table.table_id = "test_table" - target_row_count = 200 pandas_gbq.core.sample._sample_with_limit( - mock_table, + "test-project.test_dataset.test_table", bqclient=mock_bigquery_client, target_row_count=target_row_count, ) @@ -253,10 +240,7 @@ def test_sample_with_limit(mock_download_results, mock_bigquery_client): query = mock_bigquery_client.query_and_wait.call_args[0][0] assert "TABLESAMPLE" not in query assert "LIMIT 200" in query - assert ( - f"FROM `{mock_table.project}.{mock_table.dataset_id}.{mock_table.table_id}`" - in query - ) + assert "FROM `test-project.test_dataset.test_table`" in query mock_download_results.assert_called_once()