Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions pandas_gbq/core/biglake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2026 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from __future__ import annotations

import dataclasses

import google.auth.transport.requests
import google.oauth2.credentials

_ICEBERG_REST_CATALOG_URI = "https://biglake.googleapis.com/iceberg/v1/restcatalog"
_TABLE_METADATA_PATH = (
"/v1/projects/{project}/catalogs/{catalog}/namespaces/{namespace}/tables/{table}"
)


@dataclasses.dataclass(frozen=True)
class BigLakeTableId:
project: str
catalog: str
namespace: str
table: str


def get_table_metadata(
*,
table_id: str,
credentials: google.oauth2.credentials.Credentials,
billing_project_id: str,
):
"""
Docstring for get_table_metadata
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for get_table_metadata is a placeholder. Please add a proper docstring explaining the function's purpose, its parameters (table_id, credentials, billing_project_id), and what it returns. This is important for maintainability and usability.

https://iceberg.apache.org/spec/#metrics;
curl -X GET -H "Authorization: Bearer \"$(gcloud auth application-default print-access-token)\"" \
-H "Content-Type: application/json; charset=utf-8" \
-H 'x-goog-user-project: swast-scratch' \
-H 'X-Iceberg-Access-Delegation: vended-credentials' \
"""
# https://iceberg.apache.org/spec/#metrics
# total-files-size
project, catalog, namespace, table = table_id.split(".")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Unpacking table_id.split('.') directly into four variables is unsafe. If table_id doesn't contain exactly three dots, this will raise a ValueError. It's better to validate the number of parts before unpacking to provide a more informative error message and prevent unexpected crashes.

    parts = table_id.split(".")
    if len(parts) != 4:
        raise ValueError(
            "Invalid BigLake table ID. Expected format: "
            "project.catalog.namespace.table"
        )
    project, catalog, namespace, table = parts

session = google.auth.transport.requests.AuthorizedSession(credentials=credentials)
path = _TABLE_METADATA_PATH.format(
project=project,
catalog=catalog,
namespace=namespace,
table=table,
)
return session.get(
f"{_ICEBERG_REST_CATALOG_URI}.{path}",
headers={
"x-goog-user-project": billing_project_id,
"Content-Type": "application/json; charset=utf-8",
# TODO(tswast): parameter for this option (or get from catalog metadata?)
# /iceberg/{$api_version}/restcatalog/extensions/{name=projects/*/catalogs/*}
"X-Iceberg-Access-Delegation": "vended-credentials",
},
).json()
Comment on lines +52 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The URL for the BigLake REST API is constructed incorrectly. There's an extra . between the base URI and the path, which will lead to a 404 Not Found error. The path already starts with a /. Additionally, the response from session.get is not checked for errors before attempting to parse it as JSON. This can lead to unhelpful JSONDecodeError exceptions on HTTP failures. You should call response.raise_for_status() to handle non-2xx responses gracefully.

    response = session.get(
        f"{_ICEBERG_REST_CATALOG_URI}{path}",
        headers={
            "x-goog-user-project": billing_project_id,
            "Content-Type": "application/json; charset=utf-8",
            # TODO(tswast): parameter for this option (or get from catalog metadata?)
            # /iceberg/{$api_version}/restcatalog/extensions/{name=projects/*/catalogs/*}
            "X-Iceberg-Access-Delegation": "vended-credentials",
        },
    )
    response.raise_for_status()
    return response.json()

142 changes: 93 additions & 49 deletions pandas_gbq/core/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import pandas_gbq.constants
import pandas_gbq.core.read
import pandas_gbq.core.biglake
import pandas_gbq.gbq_connector

# Only import at module-level at type checking time to avoid circular
Expand Down Expand Up @@ -156,7 +157,7 @@ def _download_results_in_parallel(


def _sample_with_tablesample(
table: google.cloud.bigquery.Table,
table_id: str,
*,
bqclient: google.cloud.bigquery.Client,
proportion: float,
Expand All @@ -166,7 +167,7 @@ def _sample_with_tablesample(
) -> Optional[pandas.DataFrame]:
query = f"""
SELECT *
FROM `{table.project}.{table.dataset_id}.{table.table_id}`
FROM `{table_id}`
TABLESAMPLE SYSTEM ({float(proportion) * 100.0} PERCENT)
ORDER BY RAND() DESC
LIMIT {int(target_row_count)};
Expand All @@ -181,7 +182,7 @@ def _sample_with_tablesample(


def _sample_with_limit(
table: google.cloud.bigquery.Table,
table_id: str,
*,
bqclient: google.cloud.bigquery.Client,
target_row_count: int,
Expand All @@ -190,7 +191,7 @@ def _sample_with_limit(
) -> Optional[pandas.DataFrame]:
query = f"""
SELECT *
FROM `{table.project}.{table.dataset_id}.{table.table_id}`
FROM `{table_id}`
ORDER BY RAND() DESC
LIMIT {int(target_row_count)};
"""
Expand All @@ -203,6 +204,82 @@ def _sample_with_limit(
)


def _sample_biglake_table(
*,
table_id: str,
credentials: google.oauth2.credentials.Credentials,
bqclient: google.cloud.bigquery.Client,
target_bytes: int,
progress_bar_type: str | None,
use_bqstorage_api: bool,
) -> Optional[pandas.DataFrame]:
pass
Comment on lines +207 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The function _sample_biglake_table is a stub with a pass statement. The PR title suggests support for BigLake tables is being added, but the core implementation is missing. This function needs to be implemented to fulfill the feature's goal. Additionally, corresponding unit tests should be added to verify its functionality.



def _sample_bq_table(
*,
table_id: str,
bqclient: google.cloud.bigquery.Client,
target_bytes: int,
progress_bar_type: str | None,
use_bqstorage_api: bool,
) -> Optional[pandas.DataFrame]:
table = bqclient.get_table(table_id)
num_rows = table.num_rows
num_bytes = table.num_bytes
table_type = table.table_type

# Some tables such as views report 0 despite actually having rows.
if num_bytes == 0:
num_bytes = None

# Table is small enough to download the whole thing.
if (
table_type in _READ_API_ELIGIBLE_TYPES
and num_bytes is not None
and num_bytes <= target_bytes
):
rows_iter = bqclient.list_rows(table)
return pandas_gbq.core.read.download_results(
rows_iter,
bqclient=bqclient,
progress_bar_type=progress_bar_type,
warn_on_large_results=False,
max_results=None,
user_dtypes=None,
use_bqstorage_api=use_bqstorage_api,
)

target_row_count = _estimate_limit(
target_bytes=target_bytes,
table_bytes=num_bytes,
table_rows=num_rows,
fields=table.schema,
)

# Table is eligible for TABLESAMPLE.
if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES:
proportion = target_bytes / num_bytes
return _sample_with_tablesample(
f"{table.project}.{table.dataset_id}.{table.table_id}",
bqclient=bqclient,
proportion=proportion,
target_row_count=target_row_count,
progress_bar_type=progress_bar_type,
use_bqstorage_api=use_bqstorage_api,
)

# Not eligible for TABLESAMPLE or reading directly, so take a random sample
# with a full table scan.
return _sample_with_limit(
f"{table.project}.{table.dataset_id}.{table.table_id}",
bqclient=bqclient,
target_row_count=target_row_count,
progress_bar_type=progress_bar_type,
use_bqstorage_api=use_bqstorage_api,
)


def sample(
table_id: str,
*,
Expand Down Expand Up @@ -267,57 +344,24 @@ def sample(
)
credentials = cast(google.oauth2.credentials.Credentials, connector.credentials)
bqclient = connector.get_client()
table = bqclient.get_table(table_id)
num_rows = table.num_rows
num_bytes = table.num_bytes
table_type = table.table_type

# Some tables such as views report 0 despite actually having rows.
if num_bytes == 0:
num_bytes = None

# Table is small enough to download the whole thing.
if (
table_type in _READ_API_ELIGIBLE_TYPES
and num_bytes is not None
and num_bytes <= target_bytes
):
rows_iter = bqclient.list_rows(table)
return pandas_gbq.core.read.download_results(
rows_iter,
# BigLake tables can't be read directly by the BQ Storage Read API, so make
# sure we run a query first.
parts = table_id.split(".")
if len(parts) == 4:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Relying on len(table_id.split('.')) == 4 to identify a BigLake table is fragile. BigQuery identifiers can contain dots if they are quoted (e.g., my-project.dataset.with.dot.table). This could lead to misidentifying a standard BigQuery table as a BigLake table. Consider a more robust detection mechanism. For example, you could attempt to parse the ID as a BigLake ID and handle failure, or introduce an explicit parameter to specify the table type.

return _sample_biglake_table(
table_id=table_id,
credentials=credentials,
bqclient=bqclient,
target_bytes=target_bytes,
progress_bar_type=progress_bar_type,
warn_on_large_results=False,
max_results=None,
user_dtypes=None,
use_bqstorage_api=use_bqstorage_api,
)

target_row_count = _estimate_limit(
target_bytes=target_bytes,
table_bytes=num_bytes,
table_rows=num_rows,
fields=table.schema,
)

# Table is eligible for TABLESAMPLE.
if num_bytes is not None and table_type in _TABLESAMPLE_ELIGIBLE_TYPES:
proportion = target_bytes / num_bytes
return _sample_with_tablesample(
table,
else:
return _sample_bq_table(
table_id=table_id,
bqclient=bqclient,
proportion=proportion,
target_row_count=target_row_count,
target_bytes=target_bytes,
progress_bar_type=progress_bar_type,
use_bqstorage_api=use_bqstorage_api,
)

# Not eligible for TABLESAMPLE or reading directly, so take a random sample
# with a full table scan.
return _sample_with_limit(
table,
bqclient=bqclient,
target_row_count=target_row_count,
progress_bar_type=progress_bar_type,
use_bqstorage_api=use_bqstorage_api,
)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
# allow pandas-gbq to detect invalid package versions at runtime.
"google-cloud-bigquery >=3.20.0,<4.0.0",
"packaging >=22.0.0",
"requests >= 2.20.0, < 3.0.0",
]
extras = {
"bqstorage": [
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ google-auth==2.14.1
google-auth-oauthlib==0.7.0
google-cloud-bigquery==3.20.0
packaging==22.0.0
requests==2.20.0
# Extras
google-cloud-bigquery-storage==2.16.2
tqdm==4.23.0
Expand Down
24 changes: 4 additions & 20 deletions tests/unit/test_core_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,11 @@ def test_estimate_limit(target_bytes, table_bytes, table_rows, fields, expected_

@mock.patch("pandas_gbq.core.read.download_results")
def test_sample_with_tablesample(mock_download_results, mock_bigquery_client):
mock_table = mock.Mock(spec=google.cloud.bigquery.Table)
mock_table.project = "test-project"
mock_table.dataset_id = "test_dataset"
mock_table.table_id = "test_table"

proportion = 0.1
target_row_count = 100

pandas_gbq.core.sample._sample_with_tablesample(
mock_table,
"test-project.test_dataset.test_table",
bqclient=mock_bigquery_client,
proportion=proportion,
target_row_count=target_row_count,
Expand All @@ -226,25 +221,17 @@ def test_sample_with_tablesample(mock_download_results, mock_bigquery_client):
query = mock_bigquery_client.query_and_wait.call_args[0][0]
assert "TABLESAMPLE SYSTEM (10.0 PERCENT)" in query
assert "LIMIT 100" in query
assert (
f"FROM `{mock_table.project}.{mock_table.dataset_id}.{mock_table.table_id}`"
in query
)
assert "FROM `test-project.test_dataset.test_table`" in query

mock_download_results.assert_called_once()


@mock.patch("pandas_gbq.core.read.download_results")
def test_sample_with_limit(mock_download_results, mock_bigquery_client):
mock_table = mock.Mock(spec=google.cloud.bigquery.Table)
mock_table.project = "test-project"
mock_table.dataset_id = "test_dataset"
mock_table.table_id = "test_table"

target_row_count = 200

pandas_gbq.core.sample._sample_with_limit(
mock_table,
"test-project.test_dataset.test_table",
bqclient=mock_bigquery_client,
target_row_count=target_row_count,
)
Expand All @@ -253,10 +240,7 @@ def test_sample_with_limit(mock_download_results, mock_bigquery_client):
query = mock_bigquery_client.query_and_wait.call_args[0][0]
assert "TABLESAMPLE" not in query
assert "LIMIT 200" in query
assert (
f"FROM `{mock_table.project}.{mock_table.dataset_id}.{mock_table.table_id}`"
in query
)
assert "FROM `test-project.test_dataset.test_table`" in query

mock_download_results.assert_called_once()

Expand Down
Loading