Skip to content

perf: defer query in read_gbq with wildcard tables #1661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a9edb2a
perf: defer query in `read_gbq` with wildcard tables
tswast Apr 27, 2025
df795b1
remove obsolete comments
tswast Apr 27, 2025
f81fe4e
Merge remote-tracking branch 'origin/main' into b405773140-wildcard
tswast Apr 28, 2025
79f4c58
use sql node instead of ibis table node to keep select * from omittin…
tswast Apr 28, 2025
5b0d0a0
test with cache and to_gbq
tswast Apr 29, 2025
118964b
rename columns before caching
tswast Apr 29, 2025
ca33463
remove unnecessary comment
tswast Apr 29, 2025
e546745
Merge remote-tracking branch 'origin/main' into b405773140-wildcard
tswast Apr 29, 2025
4897ca4
add missing import
tswast Apr 29, 2025
e1a7341
do not materialize _TABLE_SUFFIX
tswast Apr 29, 2025
af06200
fix unit tests
tswast Apr 29, 2025
af5c036
Merge branch 'main' into b405773140-wildcard
tswast Apr 29, 2025
f26574b
correct number of columns in cache with offsets
tswast Apr 29, 2025
dd05c2d
Merge branch 'main' into b405773140-wildcard
tswast Apr 29, 2025
ab0e50a
fix formatting
tswast Apr 29, 2025
89535e2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 29, 2025
8bb09d5
Merge branch 'b405773140-wildcard' of https://github.com/googleapis/p…
gcf-owl-bot[bot] Apr 29, 2025
40e2e77
Merge branch 'main' into b405773140-wildcard
tswast Apr 29, 2025
d37bf5e
revert datetime change, max_results change
tswast Apr 29, 2025
2f25f8d
Merge remote-tracking branch 'origin/b405773140-wildcard' into b40577…
tswast Apr 29, 2025
4bf66b6
add pseudocolumns to node
tswast Apr 29, 2025
8c96498
fix unit tests
tswast Apr 29, 2025
e1780a6
actually fix unit tests
tswast Apr 29, 2025
b027b51
try to rename as part of compile
tswast Apr 29, 2025
00fbd91
add renames to as cached table
tswast Apr 30, 2025
9a778db
use correct node for table schema
tswast Apr 30, 2025
d076cd3
Merge branch 'main' into b405773140-wildcard
tswast Apr 30, 2025
f3d5b7b
Merge branch 'main' into b405773140-wildcard
tswast May 5, 2025
7d8ddcc
Merge remote-tracking branch 'origin/main' into b405773140-pseudocolumns
tswast May 5, 2025
0722229
revert pseudocolumn addition
tswast May 5, 2025
80ce9c6
revert pseudocolumn fix
tswast May 5, 2025
c2ffc02
Merge remote-tracking branch 'origin/b405773140-wildcard' into b40577…
tswast May 5, 2025
2f2dcd6
add test for warning
tswast May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,12 @@ class GbqTable:
@staticmethod
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
# Subsetting fields with columns can reduce cost of row-hash default ordering
table_schema = bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table)

if columns:
schema = tuple(item for item in table.schema if item.name in columns)
schema = tuple(item for item in table_schema if item.name in columns)
else:
schema = tuple(table.schema)
schema = tuple(table_schema)
return GbqTable(
project_id=table.project,
dataset_id=table.dataset_id,
Expand Down
12 changes: 8 additions & 4 deletions bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ def from_bq_table(
typing.Dict[str, bigframes.dtypes.Dtype]
] = None,
):
# Avoid circular imports.
import bigframes.core.tools.bigquery

if column_type_overrides is None:
column_type_overrides = {}
items = tuple(
items = [
SchemaItem(name, column_type_overrides.get(name, dtype))
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
table.schema
bigframes.core.tools.bigquery.get_schema_and_pseudocolumns(table)
).items()
)
return ArraySchema(items)
]

return ArraySchema(tuple(items))

@property
def names(self) -> typing.Tuple[str, ...]:
Expand Down
39 changes: 39 additions & 0 deletions bigframes/core/tools/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Private helpers for loading a BigQuery table as a BigQuery DataFrames DataFrame.
"""

from __future__ import annotations

import google.cloud.bigquery as bigquery


def get_schema_and_pseudocolumns(
table: bigquery.table.Table,
) -> list[bigquery.SchemaField]:
fields = list(table.schema)

# TODO(tswast): Add _PARTITIONTIME and/or _PARTIONDATE for injestion
# time partitioned tables.
if table.table_id.endswith("*"):
fields.append(
bigquery.SchemaField(
"_TABLE_SUFFIX",
"STRING",
)
)

return fields
3 changes: 3 additions & 0 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ def to_query(
else:
select_clause = "SELECT *"

if query_or_table.endswith("*"):
select_clause += ", _TABLE_SUFFIX"

time_travel_clause = ""
if time_travel_timestamp is not None:
time_travel_literal = bigframes.core.sql.simple_literal(time_travel_timestamp)
Expand Down
9 changes: 9 additions & 0 deletions bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,16 @@ def validate_table(
# Anonymous dataset, does not support snapshot ever
if table.dataset_id.startswith("_"):
pass

# Only true tables support time travel
elif table.table_id.endswith("*"):
msg = bfe.format_message(
"Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that "
"modifications to the underlying data may result in errors or "
"unexpected behavior."
)
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
elif table.table_type != "TABLE":
if table.table_type == "MATERIALIZED_VIEW":
msg = bfe.format_message(
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def export_gbq(
# Only update schema if this is not modifying an existing table, and the
# new table contains timedelta columns.
table = self.bqclient.get_table(destination)
# TODO(tswast): What to do with pseudocolumns?
table.schema = array_value.schema.to_bigquery()
self.bqclient.update_table(table, ["schema"])

Expand Down
6 changes: 1 addition & 5 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,7 @@ def read_gbq_table(
# clustered tables, so fallback to a query. We do this here so that
# the index is consistent with tables that have primary keys, even
# when max_results is set.
# TODO(b/338419730): We don't need to fallback to a query for wildcard
# tables if we allow some non-determinism when time travel isn't supported.
if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix(
query
):
if max_results is not None:
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
all_columns: Iterable[str] = (
Expand Down
14 changes: 14 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import io
import operator
import re
import sys
import tempfile
import typing
Expand Down Expand Up @@ -5284,6 +5285,19 @@ def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_cre
assert not loaded_scalars_df_index.empty


def test_read_gbq_to_pandas_wildcard(unordered_session: bigframes.Session):
with pytest.warns(
bigframes.exceptions.TimeTravelDisabledWarning,
match=re.escape("Wildcard tables do not support FOR SYSTEM_TIME"),
):
df = unordered_session.read_gbq("bigquery-public-data.noaa_gsod.gsod*")
df = df[df["_TABLE_SUFFIX"] == "1929"][["da", "mo", "year", "max"]]
df.to_pandas()
rows, columns = df.shape
assert rows > 0
assert columns == 4


def test_read_gbq_to_pandas_no_exec(unordered_session: bigframes.Session):
metrics = unordered_session._metrics
execs_pre = metrics.execution_count
Expand Down
Loading