diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index 60f5315554..20773fd1b4 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -133,8 +133,17 @@ def from_table( ordering=ordering, n_rows=n_rows, ) + return cls.from_bq_data_source(source_def, scan_list, session) + + @classmethod + def from_bq_data_source( + cls, + source: nodes.BigqueryDataSource, + scan_list: nodes.ScanList, + session: Session, + ): node = nodes.ReadTableNode( - source=source_def, + source=source, scan_list=scan_list, table_session=session, ) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 0fbfe7bd37..3e4bdb57c4 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -578,6 +578,9 @@ class ScanItem(typing.NamedTuple): def with_id(self, id: identifiers.ColumnId) -> ScanItem: return ScanItem(id, self.dtype, self.source_id) + def with_source_id(self, source_id: str) -> ScanItem: + return ScanItem(self.id, self.dtype, source_id) + @dataclasses.dataclass(frozen=True) class ScanList: @@ -614,6 +617,21 @@ def project( result = ScanList((self.items[:1])) return result + def remap_source_ids( + self, + mapping: Mapping[str, str], + ) -> ScanList: + items = tuple( + item.with_source_id(mapping.get(item.source_id, item.source_id)) + for item in self.items + ) + return ScanList(items) + + def append( + self, source_id: str, dtype: bigframes.dtypes.Dtype, id: identifiers.ColumnId + ) -> ScanList: + return ScanList((*self.items, ScanItem(id, dtype, source_id))) + @dataclasses.dataclass(frozen=True, eq=False) class ReadLocalNode(LeafNode): @@ -621,9 +639,9 @@ class ReadLocalNode(LeafNode): local_data_source: local_data.ManagedArrowTable # Mapping of local ids to bfet id. scan_list: ScanList + session: bigframes.session.Session # Offsets are generated only if this is non-null offsets_col: Optional[identifiers.ColumnId] = None - session: typing.Optional[bigframes.session.Session] = None @property def fields(self) -> Sequence[Field]: diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7260553c14..47aef29326 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -246,13 +246,6 @@ def __init__( self._temp_storage_manager = ( self._session_resource_manager or self._anon_dataset_manager ) - self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor( - bqclient=self._clients_provider.bqclient, - bqstoragereadclient=self._clients_provider.bqstoragereadclient, - storage_manager=self._temp_storage_manager, - strictly_ordered=self._strictly_ordered, - metrics=self._metrics, - ) self._loader = bigframes.session.loader.GbqDataLoader( session=self, bqclient=self._clients_provider.bqclient, @@ -263,6 +256,14 @@ def __init__( force_total_order=self._strictly_ordered, metrics=self._metrics, ) + self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor( + bqclient=self._clients_provider.bqclient, + bqstoragereadclient=self._clients_provider.bqstoragereadclient, + loader=self._loader, + storage_manager=self._temp_storage_manager, + strictly_ordered=self._strictly_ordered, + metrics=self._metrics, + ) def __del__(self): """Automatic cleanup of internal resources.""" @@ -929,6 +930,10 @@ def _read_pandas( return self._loader.read_pandas( pandas_dataframe, method="write", api_name=api_name ) + elif write_engine == "_deferred": + import bigframes.dataframe as dataframe + + return dataframe.DataFrame(blocks.Block.from_local(pandas_dataframe, self)) else: raise ValueError(f"Got unexpected write_engine '{write_engine}'") diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 72f2dfa4b5..2e295b08c7 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -17,6 +17,7 @@ import dataclasses import math import os +import threading from typing import cast, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings import weakref @@ -28,7 +29,7 @@ import google.cloud.bigquery_storage_v1 import bigframes.core -from bigframes.core import compile, rewrite +from bigframes.core import compile, local_data, rewrite import bigframes.core.guid import bigframes.core.nodes as nodes import bigframes.core.ordering as order @@ -36,7 +37,7 @@ import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.features -from bigframes.session import executor, local_scan_executor, read_api_execution +from bigframes.session import executor, loader, local_scan_executor, read_api_execution import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -65,12 +66,19 @@ def _get_default_output_spec() -> OutputSpec: ) +SourceIdMapping = Mapping[str, str] + + class ExecutionCache: def __init__(self): # current assumption is only 1 cache of a given node # in future, might have multiple caches, with different layout, localities self._cached_executions: weakref.WeakKeyDictionary[ - nodes.BigFrameNode, nodes.BigFrameNode + nodes.BigFrameNode, nodes.CachedTableNode + ] = weakref.WeakKeyDictionary() + self._uploaded_local_data: weakref.WeakKeyDictionary[ + local_data.ManagedArrowTable, + tuple[nodes.BigqueryDataSource, SourceIdMapping], ] = weakref.WeakKeyDictionary() @property @@ -103,6 +111,17 @@ def cache_results_table( assert original_root.schema == cached_replacement.schema self._cached_executions[original_root] = cached_replacement + def cache_remote_replacement( + self, + local_data: local_data.ManagedArrowTable, + bq_data: nodes.BigqueryDataSource, + ): + mapping = { + local_data.schema.items[i].column: bq_data.table.physical_schema[i].name + for i in range(len(local_data.schema)) + } + self._uploaded_local_data[local_data] = (bq_data, mapping) + class BigQueryCachingExecutor(executor.Executor): """Computes BigFrames values using BigQuery Engine. @@ -118,6 +137,7 @@ def __init__( bqclient: bigquery.Client, storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager, bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, + loader: loader.GbqDataLoader, *, strictly_ordered: bool = True, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, @@ -127,6 +147,7 @@ def __init__( self.strictly_ordered: bool = strictly_ordered self.cache: ExecutionCache = ExecutionCache() self.metrics = metrics + self.loader = loader self.bqstoragereadclient = bqstoragereadclient # Simple left-to-right precedence for now self._semi_executors = ( @@ -136,6 +157,7 @@ def __init__( ), local_scan_executor.LocalScanExecutor(), ) + self._upload_lock = threading.Lock() def to_sql( self, @@ -147,6 +169,7 @@ def to_sql( if offset_column: array_value, _ = array_value.promote_offsets() node = self.logical_plan(array_value.node) if enable_cache else array_value.node + node = self._substitute_large_local_sources(node) compiled = compile.compile_sql(compile.CompileRequest(node, sort_rows=ordered)) return compiled.sql @@ -378,6 +401,7 @@ def _cache_with_cluster_cols( ): """Executes the query and uses the resulting table to rewrite future executions.""" plan = self.logical_plan(array_value.node) + plan = self._substitute_large_local_sources(plan) compiled = compile.compile_sql( compile.CompileRequest( plan, sort_rows=False, materialize_all_order_keys=True @@ -398,7 +422,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): w_offsets, offset_column = array_value.promote_offsets() compiled = compile.compile_sql( compile.CompileRequest( - self.logical_plan(w_offsets.node), + self.logical_plan(self._substitute_large_local_sources(w_offsets.node)), sort_rows=False, ) ) @@ -509,6 +533,48 @@ def _validate_result_schema( f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}" ) + def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): + """ + Replace large local sources with the uploaded version of those datasources. + """ + # Step 1: Upload all previously un-uploaded data + for leaf in original_root.unique_nodes(): + if isinstance(leaf, nodes.ReadLocalNode): + if leaf.local_data_source.metadata.total_bytes > 5000: + self._upload_local_data(leaf.local_data_source) + + # Step 2: Replace local scans with remote scans + def map_local_scans(node: nodes.BigFrameNode): + if not isinstance(node, nodes.ReadLocalNode): + return node + if node.local_data_source not in self.cache._uploaded_local_data: + return node + bq_source, source_mapping = self.cache._uploaded_local_data[ + node.local_data_source + ] + scan_list = node.scan_list.remap_source_ids(source_mapping) + if node.offsets_col is not None: + scan_list = scan_list.append( + bq_source.table.physical_schema[-1].name, + bigframes.dtypes.INT_DTYPE, + node.offsets_col, + ) + return nodes.ReadTableNode(bq_source, scan_list, node.session) + + return original_root.bottom_up(map_local_scans) + + def _upload_local_data(self, local_table: local_data.ManagedArrowTable): + if local_table in self.cache._uploaded_local_data: + return + # Lock prevents concurrent repeated work, but slows things down. + # Might be better as a queue and a worker thread + with self._upload_lock: + if local_table not in self.cache._uploaded_local_data: + uploaded = self.loader.load_data( + local_table, bigframes.core.guid.generate_guid() + ) + self.cache.cache_remote_replacement(local_table, uploaded) + def _execute_plan( self, plan: nodes.BigFrameNode, @@ -539,6 +605,8 @@ def _execute_plan( # Use explicit destination to avoid 10GB limit of temporary table if destination_table is not None: job_config.destination = destination_table + + plan = self._substitute_large_local_sources(plan) compiled = compile.compile_sql( compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index b630dedb7b..fccc4b35de 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -44,7 +44,7 @@ import pandas import pyarrow as pa -from bigframes.core import guid, local_data, utils +from bigframes.core import guid, identifiers, local_data, nodes, ordering, utils import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.schema as schemata @@ -184,36 +184,59 @@ def read_pandas( [*idx_cols, *val_cols], axis="columns" ) managed_data = local_data.ManagedArrowTable.from_pandas(prepared_df) + block = blocks.Block( + self.read_managed_data(managed_data, method=method, api_name=api_name), + index_columns=idx_cols, + column_labels=pandas_dataframe.columns, + index_labels=pandas_dataframe.index.names, + ) + return dataframe.DataFrame(block) + def read_managed_data( + self, + data: local_data.ManagedArrowTable, + method: Literal["load", "stream", "write"], + api_name: str, + ) -> core.ArrayValue: + offsets_col = guid.generate_guid("upload_offsets_") if method == "load": - array_value = self.load_data(managed_data, api_name=api_name) + gbq_source = self.load_data( + data, offsets_col=offsets_col, api_name=api_name + ) elif method == "stream": - array_value = self.stream_data(managed_data) + gbq_source = self.stream_data(data, offsets_col=offsets_col) elif method == "write": - array_value = self.write_data(managed_data) + gbq_source = self.write_data(data, offsets_col=offsets_col) else: raise ValueError(f"Unsupported read method {method}") - block = blocks.Block( - array_value, - index_columns=idx_cols, - column_labels=pandas_dataframe.columns, - index_labels=pandas_dataframe.index.names, + return core.ArrayValue.from_bq_data_source( + source=gbq_source, + scan_list=nodes.ScanList( + tuple( + nodes.ScanItem( + identifiers.ColumnId(item.column), item.dtype, item.column + ) + for item in data.schema.items + ) + ), + session=self._session, ) - return dataframe.DataFrame(block) def load_data( - self, data: local_data.ManagedArrowTable, api_name: Optional[str] = None - ) -> core.ArrayValue: + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + api_name: Optional[str] = None, + ) -> nodes.BigqueryDataSource: """Load managed data into bigquery""" - ordering_col = guid.generate_guid("load_offsets_") # JSON support incomplete for item in data.schema.items: _validate_dtype_can_load(item.column, item.dtype) schema_w_offsets = data.schema.append( - schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) ) bq_schema = schema_w_offsets.to_bigquery(_LOAD_JOB_TYPE_OVERRIDES) @@ -231,13 +254,13 @@ def load_data( job_config.labels = {"bigframes-api": api_name} load_table_destination = self._storage_manager.create_temp_table( - bq_schema, [ordering_col] + bq_schema, [offsets_col] ) buffer = io.BytesIO() data.to_parquet( buffer, - offsets_col=ordering_col, + offsets_col=offsets_col, geo_format="wkt", duration_type="duration", json_type="string", @@ -249,23 +272,24 @@ def load_data( self._start_generic_job(load_job) # must get table metadata after load job for accurate metadata destination_table = self._bqclient.get_table(load_table_destination) - return core.ArrayValue.from_table( - table=destination_table, - schema=schema_w_offsets, - session=self._session, - offsets_col=ordering_col, - n_rows=data.data.num_rows, - ).drop_columns([ordering_col]) + return nodes.BigqueryDataSource( + nodes.GbqTable.from_table(destination_table), + ordering=ordering.TotalOrdering.from_offset_col(offsets_col), + n_rows=destination_table.num_rows, + ) - def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: + def stream_data( + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + ) -> nodes.BigqueryDataSource: """Load managed data into bigquery""" - ordering_col = guid.generate_guid("stream_offsets_") schema_w_offsets = data.schema.append( - schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) ) bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES) load_table_destination = self._storage_manager.create_temp_table( - bq_schema, [ordering_col] + bq_schema, [offsets_col] ) rows = data.itertuples( @@ -284,24 +308,23 @@ def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) destination_table = self._bqclient.get_table(load_table_destination) - return core.ArrayValue.from_table( - table=destination_table, - schema=schema_w_offsets, - session=self._session, - offsets_col=ordering_col, - n_rows=data.data.num_rows, - ).drop_columns([ordering_col]) + return nodes.BigqueryDataSource( + nodes.GbqTable.from_table(destination_table), + ordering=ordering.TotalOrdering.from_offset_col(offsets_col), + n_rows=destination_table.num_rows, + ) - def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: + def write_data( + self, + data: local_data.ManagedArrowTable, + offsets_col: str, + ) -> nodes.BigqueryDataSource: """Load managed data into bigquery""" - ordering_col = guid.generate_guid("stream_offsets_") schema_w_offsets = data.schema.append( - schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE) ) bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES) - bq_table_ref = self._storage_manager.create_temp_table( - bq_schema, [ordering_col] - ) + bq_table_ref = self._storage_manager.create_temp_table(bq_schema, [offsets_col]) requested_stream = bq_storage_types.stream.WriteStream() requested_stream.type_ = bq_storage_types.stream.WriteStream.Type.COMMITTED # type: ignore @@ -313,7 +336,7 @@ def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: schema, batches = data.to_arrow( - offsets_col=ordering_col, duration_type="int" + offsets_col=offsets_col, duration_type="int" ) offset = 0 for batch in batches: @@ -339,13 +362,11 @@ def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: assert response.row_count == data.data.num_rows destination_table = self._bqclient.get_table(bq_table_ref) - return core.ArrayValue.from_table( - table=destination_table, - schema=schema_w_offsets, - session=self._session, - offsets_col=ordering_col, - n_rows=data.data.num_rows, - ).drop_columns([ordering_col]) + return nodes.BigqueryDataSource( + nodes.GbqTable.from_table(destination_table), + ordering=ordering.TotalOrdering.from_offset_col(offsets_col), + n_rows=destination_table.num_rows, + ) def _start_generic_job(self, job: formatting_helpers.GenericJob): if bigframes.options.display.progress_bar is not None: diff --git a/tests/system/small/test_large_local_data.py b/tests/system/small/test_large_local_data.py new file mode 100644 index 0000000000..91794df8b4 --- /dev/null +++ b/tests/system/small/test_large_local_data.py @@ -0,0 +1,50 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pandas as pd + +import bigframes +from tests.system.utils import assert_pandas_df_equal + +large_dataframe = pd.DataFrame(np.random.rand(10000, 10), dtype="Float64") +large_dataframe.index = large_dataframe.index.astype("Int64") + + +def test_read_pandas_defer_noop(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + + assert_pandas_df_equal(large_dataframe, bf_df.to_pandas()) + + +def test_read_pandas_defer_cumsum(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + bf_df = bf_df.cumsum() + + assert_pandas_df_equal(large_dataframe.cumsum(), bf_df.to_pandas()) + + +def test_read_pandas_defer_cache_cumsum_cumsum(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + bf_df = bf_df.cumsum().cache().cumsum() + + assert_pandas_df_equal(large_dataframe.cumsum().cumsum(), bf_df.to_pandas()) + + +def test_read_pandas_defer_peek(session: bigframes.Session): + bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") + bf_result = bf_df.peek(15) + + assert len(bf_result) == 15 + assert_pandas_df_equal(large_dataframe.loc[bf_df.index], bf_result) diff --git a/third_party/bigframes_vendored/constants.py b/third_party/bigframes_vendored/constants.py index af87694cd5..6d55817a27 100644 --- a/third_party/bigframes_vendored/constants.py +++ b/third_party/bigframes_vendored/constants.py @@ -52,5 +52,6 @@ "bigquery_load", "bigquery_streaming", "bigquery_write", + "_deferred", ] VALID_WRITE_ENGINES = typing.get_args(WriteEngineType)