Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 90 additions & 6 deletions bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import google.cloud.bigquery as bigquery
import google.cloud.bigquery.table

import bigframes.core
import bigframes.core.events
import bigframes.exceptions as bfe
import bigframes.session._io.bigquery
Expand All @@ -37,18 +38,79 @@
import bigframes.session


def _convert_information_schema_table_id_to_table_reference(
table_id: str,
default_project: Optional[str],
) -> bigquery.TableReference:
"""Squeeze an INFORMATION_SCHEMA reference into a TableReference.
This is kind-of a hack. INFORMATION_SCHEMA is a view that isn't available
via the tables.get REST API.
"""
parts = table_id.split(".")
parts_casefold = [part.casefold() for part in parts]
dataset_index = parts_casefold.index("INFORMATION_SCHEMA".casefold())

if dataset_index == 0:
project = default_project
else:
project = ".".join(parts[:dataset_index])

if project is None:
message = (
"Could not determine project ID. "
"Please provide a project or region in your INFORMATION_SCHEMA table ID, "
"For example, 'region-REGION_NAME.INFORMATION_SCHEMA.JOBS'."
)
raise ValueError(message)

dataset = "INFORMATION_SCHEMA"
table_id_short = ".".join(parts[dataset_index + 1 :])
return bigquery.TableReference(
bigquery.DatasetReference(project, dataset),
table_id_short,
)


def get_information_schema_metadata(
bqclient: bigquery.Client,
table_id: str,
default_project: Optional[str],
) -> bigquery.Table:
job_config = bigquery.QueryJobConfig(dry_run=True)
job = bqclient.query(
f"SELECT * FROM `{table_id}`",
job_config=job_config,
)
table_ref = _convert_information_schema_table_id_to_table_reference(
table_id=table_id,
default_project=default_project,
)
table = bigquery.Table.from_api_repr(
{
"tableReference": table_ref.to_api_repr(),
"location": job.location,
# Prevent ourselves from trying to read the table with the BQ
# Storage API.
"type": "VIEW",
}
)
table.schema = job.schema
return table


def get_table_metadata(
bqclient: bigquery.Client,
table_ref: google.cloud.bigquery.table.TableReference,
bq_time: datetime.datetime,
*,
cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]],
table_id: str,
default_project: Optional[str],
bq_time: datetime.datetime,
cache: Dict[str, Tuple[datetime.datetime, bigquery.Table]],
use_cache: bool = True,
publisher: bigframes.core.events.Publisher,
) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]:
"""Get the table metadata, either from cache or via REST API."""

cached_table = cache.get(table_ref)
cached_table = cache.get(table_id)
if use_cache and cached_table is not None:
snapshot_timestamp, table = cached_table

Expand Down Expand Up @@ -90,18 +152,38 @@ def get_table_metadata(

return cached_table

table = bqclient.get_table(table_ref)
if is_information_schema(table_id):
table = get_information_schema_metadata(
bqclient=bqclient, table_id=table_id, default_project=default_project
)
else:
table_ref = google.cloud.bigquery.table.TableReference.from_string(
table_id, default_project=default_project
)
table = bqclient.get_table(table_ref)

# local time will lag a little bit do to network latency
# make sure it is at least table creation time.
# This is relevant if the table was created immediately before loading it here.
if (table.created is not None) and (table.created > bq_time):
bq_time = table.created

cached_table = (bq_time, table)
cache[table_ref] = cached_table
cache[table_id] = cached_table
return cached_table


def is_information_schema(table_id: str):
table_id_casefold = table_id.casefold()
# Include the "."s to ensure we don't have false positives for some user
# defined dataset like MY_INFORMATION_SCHEMA or tables called
# INFORMATION_SCHEMA.
return (
".INFORMATION_SCHEMA.".casefold() in table_id_casefold
or table_id_casefold.startswith("INFORMATION_SCHEMA.".casefold())
)


def is_time_travel_eligible(
bqclient: bigquery.Client,
table: google.cloud.bigquery.table.Table,
Expand Down Expand Up @@ -168,6 +250,8 @@ def is_time_travel_eligible(
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
)
return False
elif table.table_type == "VIEW":
return False

# table might support time travel, lets do a dry-run query with time travel
if should_dry_run:
Expand Down
30 changes: 16 additions & 14 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import pandas
import pyarrow as pa

import bigframes._tools
import bigframes._tools.strings
from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils
import bigframes.core as core
import bigframes.core.blocks as blocks
Expand Down Expand Up @@ -272,9 +274,7 @@ def __init__(
self._default_index_type = default_index_type
self._scan_index_uniqueness = scan_index_uniqueness
self._force_total_order = force_total_order
self._df_snapshot: Dict[
bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]
] = {}
self._df_snapshot: Dict[str, Tuple[datetime.datetime, bigquery.Table]] = {}
self._metrics = metrics
self._publisher = publisher
# Unfortunate circular reference, but need to pass reference when constructing objects
Expand Down Expand Up @@ -629,10 +629,6 @@ def read_gbq_table(

_check_duplicates("columns", columns)

table_ref = google.cloud.bigquery.table.TableReference.from_string(
table_id, default_project=self._bqclient.project
)

columns = list(columns)
include_all_columns = columns is None or len(columns) == 0
filters = typing.cast(list, list(filters))
Expand All @@ -643,7 +639,8 @@ def read_gbq_table(

time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata(
self._bqclient,
table_ref=table_ref,
table_id=table_id,
default_project=self._bqclient.project,
bq_time=self._clock.get_time(),
cache=self._df_snapshot,
use_cache=use_cache,
Expand Down Expand Up @@ -706,18 +703,23 @@ def read_gbq_table(
# Optionally, execute the query
# -----------------------------

# max_results introduces non-determinism and limits the cost on
# clustered tables, so fallback to a query. We do this here so that
# the index is consistent with tables that have primary keys, even
# when max_results is set.
if max_results is not None:
if (
# max_results introduces non-determinism and limits the cost on
# clustered tables, so fallback to a query. We do this here so that
# the index is consistent with tables that have primary keys, even
# when max_results is set.
max_results is not None
# Views such as INFORMATION_SCHEMA can introduce non-determinism.
# They can update frequently and don't support time travel.
or bf_read_gbq_table.is_information_schema(table_id)
):
# TODO(b/338111344): If we are running a query anyway, we might as
# well generate ROW_NUMBER() at the same time.
all_columns: Iterable[str] = (
itertools.chain(index_cols, columns) if columns else ()
)
query = bf_io_bigquery.to_query(
table_id,
f"{table.project}.{table.dataset_id}.{table.table_id}",
columns=all_columns,
sql_predicate=bf_io_bigquery.compile_filters(filters)
if filters
Expand Down
3 changes: 3 additions & 0 deletions bigframes/session/read_api_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def execute(
if node.explicitly_ordered and ordered:
return None

if not node.source.table.is_physically_stored:
return None

if limit is not None:
if peek is None or limit < peek:
peek = limit
Expand Down
50 changes: 50 additions & 0 deletions tests/system/small/pandas/test_read_gbq_information_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest


@pytest.mark.parametrize("include_project", [True, False])
@pytest.mark.parametrize(
"view_id",
[
# https://cloud.google.com/bigquery/docs/information-schema-intro
"region-US.INFORMATION_SCHEMA.SESSIONS_BY_USER",
"region-US.INFORMATION_SCHEMA.SCHEMATA",
],
)
def test_read_gbq_jobs_by_user_returns_schema(
unordered_session, view_id: str, include_project: bool
):
if include_project:
table_id = unordered_session.bqclient.project + "." + view_id
else:
table_id = view_id

df = unordered_session.read_gbq(table_id, max_results=10)
assert df.dtypes is not None


def test_read_gbq_schemata_can_be_peeked(unordered_session):
df = unordered_session.read_gbq("region-US.INFORMATION_SCHEMA.SCHEMATA")
result = df.peek()
assert result is not None


def test_read_gbq_schemata_four_parts_can_be_peeked(unordered_session):
df = unordered_session.read_gbq(
f"{unordered_session.bqclient.project}.region-US.INFORMATION_SCHEMA.SCHEMATA"
)
result = df.peek()
assert result is not None
4 changes: 2 additions & 2 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def test_read_gbq_cached_table():
table._properties["numRows"] = "1000000000"
table._properties["location"] = session._location
table._properties["type"] = "TABLE"
session._loader._df_snapshot[table_ref] = (
session._loader._df_snapshot[str(table_ref)] = (
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
table,
)
Expand Down Expand Up @@ -273,7 +273,7 @@ def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_inclu
table._properties["numRows"] = "1000000000"
table._properties["location"] = session._location
table._properties["type"] = "TABLE"
session._loader._df_snapshot[table_ref] = (
session._loader._df_snapshot[str(table_ref)] = (
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
table,
)
Expand Down