-
Notifications
You must be signed in to change notification settings - Fork 126
feat: support biglake tables in pandas_gbq.sample #1014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| # Copyright (c) 2026 pandas-gbq Authors All rights reserved. | ||
| # Use of this source code is governed by a BSD-style | ||
| # license that can be found in the LICENSE file. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
|
|
||
| import google.auth.transport.requests | ||
| import google.oauth2.credentials | ||
|
|
||
| _ICEBERG_REST_CATALOG_URI = "https://biglake.googleapis.com/iceberg/v1/restcatalog" | ||
| _TABLE_METADATA_PATH = ( | ||
| "/v1/projects/{project}/catalogs/{catalog}/namespaces/{namespace}/tables/{table}" | ||
| ) | ||
|
|
||
|
|
||
| @dataclasses.dataclass(frozen=True) | ||
| class BigLakeTableId: | ||
| project: str | ||
| catalog: str | ||
| namespace: str | ||
| table: str | ||
|
|
||
|
|
||
| def get_table_metadata( | ||
| *, | ||
| table_id: str, | ||
| credentials: google.oauth2.credentials.Credentials, | ||
| billing_project_id: str, | ||
| ): | ||
| """ | ||
| Docstring for get_table_metadata | ||
| https://iceberg.apache.org/spec/#metrics; | ||
| curl -X GET -H "Authorization: Bearer \"$(gcloud auth application-default print-access-token)\"" \ | ||
| -H "Content-Type: application/json; charset=utf-8" \ | ||
| -H 'x-goog-user-project: swast-scratch' \ | ||
| -H 'X-Iceberg-Access-Delegation: vended-credentials' \ | ||
| """ | ||
| # https://iceberg.apache.org/spec/#metrics | ||
| # total-files-size | ||
| project, catalog, namespace, table = table_id.split(".") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unpacking parts = table_id.split(".")
if len(parts) != 4:
raise ValueError(
"Invalid BigLake table ID. Expected format: "
"project.catalog.namespace.table"
)
project, catalog, namespace, table = parts |
||
| session = google.auth.transport.requests.AuthorizedSession(credentials=credentials) | ||
| path = _TABLE_METADATA_PATH.format( | ||
| project=project, | ||
| catalog=catalog, | ||
| namespace=namespace, | ||
| table=table, | ||
| ) | ||
| return session.get( | ||
| f"{_ICEBERG_REST_CATALOG_URI}.{path}", | ||
| headers={ | ||
| "x-goog-user-project": billing_project_id, | ||
| "Content-Type": "application/json; charset=utf-8", | ||
| # TODO(tswast): parameter for this option (or get from catalog metadata?) | ||
| # /iceberg/{$api_version}/restcatalog/extensions/{name=projects/*/catalogs/*} | ||
| "X-Iceberg-Access-Delegation": "vended-credentials", | ||
| }, | ||
| ).json() | ||
|
Comment on lines
+52
to
+61
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The URL for the BigLake REST API is constructed incorrectly. There's an extra response = session.get(
f"{_ICEBERG_REST_CATALOG_URI}{path}",
headers={
"x-goog-user-project": billing_project_id,
"Content-Type": "application/json; charset=utf-8",
# TODO(tswast): parameter for this option (or get from catalog metadata?)
# /iceberg/{$api_version}/restcatalog/extensions/{name=projects/*/catalogs/*}
"X-Iceberg-Access-Delegation": "vended-credentials",
},
)
response.raise_for_status()
return response.json() |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
|
|
||
| import pandas_gbq.constants | ||
| import pandas_gbq.core.read | ||
| import pandas_gbq.core.biglake | ||
| import pandas_gbq.gbq_connector | ||
|
|
||
| # Only import at module-level at type checking time to avoid circular | ||
|
|
@@ -156,7 +157,7 @@ def _download_results_in_parallel( | |
|
|
||
|
|
||
| def _sample_with_tablesample( | ||
| table: google.cloud.bigquery.Table, | ||
| table_id: str, | ||
| *, | ||
| bqclient: google.cloud.bigquery.Client, | ||
| proportion: float, | ||
|
|
@@ -166,7 +167,7 @@ def _sample_with_tablesample( | |
| ) -> Optional[pandas.DataFrame]: | ||
| query = f""" | ||
| SELECT * | ||
| FROM `{table.project}.{table.dataset_id}.{table.table_id}` | ||
| FROM `{table_id}` | ||
| TABLESAMPLE SYSTEM ({float(proportion) * 100.0} PERCENT) | ||
| ORDER BY RAND() DESC | ||
| LIMIT {int(target_row_count)}; | ||
|
|
@@ -181,7 +182,7 @@ def _sample_with_tablesample( | |
|
|
||
|
|
||
| def _sample_with_limit( | ||
| table: google.cloud.bigquery.Table, | ||
| table_id: str, | ||
| *, | ||
| bqclient: google.cloud.bigquery.Client, | ||
| target_row_count: int, | ||
|
|
@@ -190,7 +191,7 @@ def _sample_with_limit( | |
| ) -> Optional[pandas.DataFrame]: | ||
| query = f""" | ||
| SELECT * | ||
| FROM `{table.project}.{table.dataset_id}.{table.table_id}` | ||
| FROM `{table_id}` | ||
| ORDER BY RAND() DESC | ||
| LIMIT {int(target_row_count)}; | ||
| """ | ||
|
|
@@ -203,6 +204,82 @@ def _sample_with_limit( | |
| ) | ||
|
|
||
|
|
||
| def _sample_biglake_table( | ||
| *, | ||
| table_id: str, | ||
| credentials: google.oauth2.credentials.Credentials, | ||
| bqclient: google.cloud.bigquery.Client, | ||
| target_bytes: int, | ||
| progress_bar_type: str | None, | ||
| use_bqstorage_api: bool, | ||
| ) -> Optional[pandas.DataFrame]: | ||
| pass | ||
|
Comment on lines
+207
to
+216
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function |
||
|
|
||
|
|
||
| def _sample_bq_table( | ||
| *, | ||
| table_id: str, | ||
| bqclient: google.cloud.bigquery.Client, | ||
| target_bytes: int, | ||
| progress_bar_type: str | None, | ||
| use_bqstorage_api: bool, | ||
| ) -> Optional[pandas.DataFrame]: | ||
| table = bqclient.get_table(table_id) | ||
| num_rows = table.num_rows | ||
| num_bytes = table.num_bytes | ||
| table_type = table.table_type | ||
|
|
||
| # Some tables such as views report 0 despite actually having rows. | ||
| if num_bytes == 0: | ||
| num_bytes = None | ||
|
|
||
| # Table is small enough to download the whole thing. | ||
| if ( | ||
| table_type in _READ_API_ELIGIBLE_TYPES | ||
| and num_bytes is not None | ||
| and num_bytes <= target_bytes | ||
| ): | ||
| rows_iter = bqclient.list_rows(table) | ||
| return pandas_gbq.core.read.download_results( | ||
| rows_iter, | ||
| bqclient=bqclient, | ||
| progress_bar_type=progress_bar_type, | ||
| warn_on_large_results=False, | ||
| max_results=None, | ||
| user_dtypes=None, | ||
| use_bqstorage_api=use_bqstorage_api, | ||
| ) | ||
|
|
||
| target_row_count = _estimate_limit( | ||
| target_bytes=target_bytes, | ||
| table_bytes=num_bytes, | ||
| table_rows=num_rows, | ||
| fields=table.schema, | ||
| ) | ||
|
|
||
| # Table is eligible for TABLESAMPLE. | ||
| if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES: | ||
| proportion = target_bytes / num_bytes | ||
| return _sample_with_tablesample( | ||
| f"{table.project}.{table.dataset_id}.{table.table_id}", | ||
| bqclient=bqclient, | ||
| proportion=proportion, | ||
| target_row_count=target_row_count, | ||
| progress_bar_type=progress_bar_type, | ||
| use_bqstorage_api=use_bqstorage_api, | ||
| ) | ||
|
|
||
| # Not eligible for TABLESAMPLE or reading directly, so take a random sample | ||
| # with a full table scan. | ||
| return _sample_with_limit( | ||
| f"{table.project}.{table.dataset_id}.{table.table_id}", | ||
| bqclient=bqclient, | ||
| target_row_count=target_row_count, | ||
| progress_bar_type=progress_bar_type, | ||
| use_bqstorage_api=use_bqstorage_api, | ||
| ) | ||
|
|
||
|
|
||
| def sample( | ||
| table_id: str, | ||
| *, | ||
|
|
@@ -267,57 +344,24 @@ def sample( | |
| ) | ||
| credentials = cast(google.oauth2.credentials.Credentials, connector.credentials) | ||
| bqclient = connector.get_client() | ||
| table = bqclient.get_table(table_id) | ||
| num_rows = table.num_rows | ||
| num_bytes = table.num_bytes | ||
| table_type = table.table_type | ||
|
|
||
| # Some tables such as views report 0 despite actually having rows. | ||
| if num_bytes == 0: | ||
| num_bytes = None | ||
|
|
||
| # Table is small enough to download the whole thing. | ||
| if ( | ||
| table_type in _READ_API_ELIGIBLE_TYPES | ||
| and num_bytes is not None | ||
| and num_bytes <= target_bytes | ||
| ): | ||
| rows_iter = bqclient.list_rows(table) | ||
| return pandas_gbq.core.read.download_results( | ||
| rows_iter, | ||
| # BigLake tables can't be read directly by the BQ Storage Read API, so make | ||
| # sure we run a query first. | ||
| parts = table_id.split(".") | ||
| if len(parts) == 4: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Relying on |
||
| return _sample_biglake_table( | ||
| table_id=table_id, | ||
| credentials=credentials, | ||
| bqclient=bqclient, | ||
| target_bytes=target_bytes, | ||
| progress_bar_type=progress_bar_type, | ||
| warn_on_large_results=False, | ||
| max_results=None, | ||
| user_dtypes=None, | ||
| use_bqstorage_api=use_bqstorage_api, | ||
| ) | ||
|
|
||
| target_row_count = _estimate_limit( | ||
| target_bytes=target_bytes, | ||
| table_bytes=num_bytes, | ||
| table_rows=num_rows, | ||
| fields=table.schema, | ||
| ) | ||
|
|
||
| # Table is eligible for TABLESAMPLE. | ||
| if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES: | ||
| proportion = target_bytes / num_bytes | ||
| return _sample_with_tablesample( | ||
| table, | ||
| else: | ||
| return _sample_bq_table( | ||
| table_id=table_id, | ||
| bqclient=bqclient, | ||
| proportion=proportion, | ||
| target_row_count=target_row_count, | ||
| target_bytes=target_bytes, | ||
| progress_bar_type=progress_bar_type, | ||
| use_bqstorage_api=use_bqstorage_api, | ||
| ) | ||
|
|
||
| # Not eligible for TABLESAMPLE or reading directly, so take a random sample | ||
| # with a full table scan. | ||
| return _sample_with_limit( | ||
| table, | ||
| bqclient=bqclient, | ||
| target_row_count=target_row_count, | ||
| progress_bar_type=progress_bar_type, | ||
| use_bqstorage_api=use_bqstorage_api, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring for
get_table_metadatais a placeholder. Please add a proper docstring explaining the function's purpose, its parameters (table_id,credentials,billing_project_id), and what it returns. This is important for maintainability and usability.