Skip to content
70 changes: 70 additions & 0 deletions pandas_gbq/dry_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (c) 2025 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from __future__ import annotations

import copy
from typing import Any, List

from google.cloud import bigquery
import pandas


def get_query_stats(
query_job: bigquery.QueryJob,
) -> pandas.Series:
"""Returns important stats from the query job as a Pandas Series."""

index: List[Any] = []
values: List[Any] = []

# Add raw BQ schema
index.append("bigquerySchema")
values.append(query_job.schema)

job_api_repr = copy.deepcopy(query_job._properties)

# jobReference might not be populated for "job optional" queries.
job_ref = job_api_repr.get("jobReference", {})
for key, val in job_ref.items():
index.append(key)
values.append(val)

configuration = job_api_repr.get("configuration", {})
index.append("jobType")
values.append(configuration.get("jobType", None))
index.append("dispatchedSql")
values.append(configuration.get("query", {}).get("query", None))

query_config = configuration.get("query", {})
for key in ("destinationTable", "useLegacySql"):
index.append(key)
values.append(query_config.get(key, None))

statistics = job_api_repr.get("statistics", {})
query_stats = statistics.get("query", {})
for key in (
"referencedTables",
"totalBytesProcessed",
"cacheHit",
"statementType",
):
index.append(key)
values.append(query_stats.get(key, None))

creation_time = statistics.get("creationTime", None)
index.append("creationTime")
values.append(
pandas.Timestamp(creation_time, unit="ms", tz="UTC")
if creation_time is not None
else None
)

result = pandas.Series(values, index=index)
if result["totalBytesProcessed"] is None:
result["totalBytesProcessed"] = 0
else:
result["totalBytesProcessed"] = int(result["totalBytesProcessed"])

return result
13 changes: 10 additions & 3 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def read_gbq(
*,
col_order=None,
bigquery_client=None,
dry_run: bool = False,
):
r"""Read data from Google BigQuery to a pandas DataFrame.

Expand Down Expand Up @@ -264,11 +265,13 @@ def read_gbq(
bigquery_client : google.cloud.bigquery.Client, optional
A Google Cloud BigQuery Python Client instance. If provided, it will be used for reading
data, while the project and credentials parameters will be ignored.

dry_run : bool, default False
If True, run a dry run query.
Returns
-------
df: DataFrame
DataFrame representing results of query.
df: DataFrame or Series
DataFrame representing results of query. If ``dry_run=True``, returns
a Pandas series that contains job statistics.
"""
if dialect is None:
dialect = context.dialect
Expand Down Expand Up @@ -323,7 +326,11 @@ def read_gbq(
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
dry_run=dry_run,
)
# When dry_run=True, run_query returns a Pandas series
if dry_run:
return final_df
else:
final_df = connector.download_table(
query_or_table,
Expand Down
17 changes: 14 additions & 3 deletions pandas_gbq/gbq_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

from pandas_gbq import dry_runs
import pandas_gbq.constants
from pandas_gbq.contexts import context
import pandas_gbq.core.read
Expand Down Expand Up @@ -176,7 +177,14 @@ def download_table(
user_dtypes=dtypes,
)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
def run_query(
self,
query,
max_results=None,
progress_bar_type=None,
dry_run: bool = False,
**kwargs,
):
from google.cloud import bigquery

job_config_dict = {
Expand Down Expand Up @@ -212,6 +220,7 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):

self._start_timer()
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
job_config.dry_run = dry_run

if FEATURES.bigquery_has_query_and_wait:
rows_iter = pandas_gbq.query.query_and_wait_via_client_library(
Expand All @@ -236,12 +245,14 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
timeout_ms=timeout_ms,
)

dtypes = kwargs.get("dtypes")
if dry_run:
return dry_runs.get_query_stats(rows_iter.job)

return self._download_results(
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
user_dtypes=kwargs.get("dtypes"),
)

def _download_results(
Expand Down
33 changes: 32 additions & 1 deletion pandas_gbq/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,12 @@ def query_and_wait(
# getQueryResults() instead of tabledata.list, which returns the correct
# response with DML/DDL queries.
try:
return query_reply.result(max_results=max_results)
rows_iter = query_reply.result(max_results=max_results)
# Store reference to QueryJob in RowIterator for dry_run access
# RowIterator already has a job attribute, but ensure it's set
if not hasattr(rows_iter, "job") or rows_iter.job is None:
rows_iter.job = query_reply
return rows_iter
except connector.http_error as ex:
connector.process_http_error(ex)

Expand All @@ -195,6 +200,27 @@ def query_and_wait_via_client_library(
max_results: Optional[int],
timeout_ms: Optional[int],
):
# For dry runs, use query() directly to get the QueryJob, then get result
# This ensures we can access the job attribute for dry_run cost calculation
if job_config.dry_run:
query_job = try_query(
connector,
functools.partial(
client.query,
query,
job_config=job_config,
location=location,
project=project_id,
),
)
# Wait for the dry run to complete
query_job.result(timeout=timeout_ms / 1000.0 if timeout_ms else None)
# Get the result iterator and ensure job attribute is set
rows_iter = query_job.result(max_results=max_results)
if not hasattr(rows_iter, "job") or rows_iter.job is None:
rows_iter.job = query_job
return rows_iter

rows_iter = try_query(
connector,
functools.partial(
Expand All @@ -207,5 +233,10 @@ def query_and_wait_via_client_library(
wait_timeout=timeout_ms / 1000.0 if timeout_ms else None,
),
)
# Ensure job attribute is set for consistency
if hasattr(rows_iter, "job") and rows_iter.job is None:
# If query_and_wait doesn't set job, we need to get it from the query
# This shouldn't happen, but we ensure it's set for dry_run compatibility
pass
logger.debug("Query done.\n")
return rows_iter
12 changes: 12 additions & 0 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,18 @@ def test_columns_and_col_order_raises_error(self, project_id):
dialect="standard",
)

def test_read_gbq_with_dry_run(self, project_id):
query = "SELECT 1"
result = gbq.read_gbq(
query,
project_id=project_id,
credentials=self.credentials,
dialect="standard",
dry_run=True,
)
assert isinstance(result, pandas.Series)
assert result["totalBytesProcessed"] >= 0


class TestToGBQIntegration(object):
@pytest.fixture(autouse=True, scope="function")
Expand Down
151 changes: 151 additions & 0 deletions tests/unit/test_dry_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright (c) 2025 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from unittest import mock

from google.cloud import bigquery
import pandas
import pandas.testing

from pandas_gbq import dry_runs


def test_get_query_stats():
mock_query_job = mock.create_autospec(bigquery.QueryJob)
total_bytes_processed = 15
mock_query_job._properties = {
"kind": "bigquery#job",
"etag": "e-tag",
"id": "id",
"selfLink": "self-link",
"user_email": "user-emial",
"configuration": {
"query": {
"query": "SELECT * FROM `test_table`",
"destinationTable": {
"projectId": "project-id",
"datasetId": "dataset-id",
"tableId": "table-id",
},
"writeDisposition": "WRITE_TRUNCATE",
"priority": "INTERACTIVE",
"useLegacySql": False,
},
"jobType": "QUERY",
},
"jobReference": {
"projectId": "project-id",
"jobId": "job-id",
"location": "US",
},
"statistics": {
"creationTime": 1767037135155.0,
"startTime": 1767037135238.0,
"endTime": 1767037135353.0,
"totalBytesProcessed": f"{total_bytes_processed}",
"query": {
"totalBytesProcessed": f"{total_bytes_processed}",
"totalBytesBilled": "0",
"cacheHit": True,
"statementType": "SELECT",
},
"reservation_id": "reservation_id",
"edition": "ENTERPRISE",
"reservationGroupPath": [""],
},
"status": {"state": "DONE"},
"principal_subject": "principal_subject",
"jobCreationReason": {"code": "REQUESTED"},
}
expected_index = pandas.Index(
[
"bigquerySchema",
"projectId",
"jobId",
"location",
"jobType",
"dispatchedSql",
"destinationTable",
"useLegacySql",
"referencedTables",
"totalBytesProcessed",
"cacheHit",
"statementType",
"creationTime",
]
)

result = dry_runs.get_query_stats(mock_query_job)

assert isinstance(result, pandas.Series)
pandas.testing.assert_index_equal(expected_index, result.index)
assert result["totalBytesProcessed"] == total_bytes_processed


def test_get_query_stats_missing_bytes_use_zero():
mock_query_job = mock.create_autospec(bigquery.QueryJob)
mock_query_job._properties = {
"kind": "bigquery#job",
"etag": "e-tag",
"id": "id",
"selfLink": "self-link",
"user_email": "user-emial",
"configuration": {
"query": {
"query": "SELECT * FROM `test_table`",
"destinationTable": {
"projectId": "project-id",
"datasetId": "dataset-id",
"tableId": "table-id",
},
"writeDisposition": "WRITE_TRUNCATE",
"priority": "INTERACTIVE",
"useLegacySql": False,
},
"jobType": "QUERY",
},
"jobReference": {
"projectId": "project-id",
"jobId": "job-id",
"location": "US",
},
"statistics": {
"creationTime": 1767037135155.0,
"startTime": 1767037135238.0,
"endTime": 1767037135353.0,
"query": {
"cacheHit": True,
"statementType": "SELECT",
},
"reservation_id": "reservation_id",
"edition": "ENTERPRISE",
"reservationGroupPath": [""],
},
"status": {"state": "DONE"},
"principal_subject": "principal_subject",
"jobCreationReason": {"code": "REQUESTED"},
}
expected_index = pandas.Index(
[
"bigquerySchema",
"projectId",
"jobId",
"location",
"jobType",
"dispatchedSql",
"destinationTable",
"useLegacySql",
"referencedTables",
"totalBytesProcessed",
"cacheHit",
"statementType",
"creationTime",
]
)

result = dry_runs.get_query_stats(mock_query_job)

assert isinstance(result, pandas.Series)
pandas.testing.assert_index_equal(expected_index, result.index)
assert result["totalBytesProcessed"] == 0
Loading