diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 9731857ea0..495523d2fc 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -41,8 +41,10 @@ def get_axis_number(axis: typing.Union[str, int]) -> typing.Literal[0, 1]: raise ValueError(f"Not a valid axis: {axis}") -def is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: - return pd.api.types.is_list_like(obj) +def is_list_like( + obj: typing.Any, allow_sets: bool = True +) -> typing_extensions.TypeGuard[typing.Sequence]: + return pd.api.types.is_list_like(obj, allow_sets=allow_sets) def is_dict_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Mapping]: diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c38d124196..6801937fbe 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,6 +16,7 @@ from __future__ import annotations +from collections import abc import datetime import logging import os @@ -569,7 +570,7 @@ def read_gbq_table( columns = col_order return self._loader.read_gbq_table( - query=query, + table_id=query, index_col=index_col, columns=columns, max_results=max_results, @@ -953,14 +954,21 @@ def _read_csv_w_bigquery_engine( native CSV loading capabilities, making it suitable for large datasets that may not fit into local memory. """ - - if any(param is not None for param in (dtype, names)): - not_supported = ("dtype", "names") + if dtype is not None: raise NotImplementedError( - f"BigQuery engine does not support these arguments: {not_supported}. " + f"BigQuery engine does not support the `dtype` argument." f"{constants.FEEDBACK_LINK}" ) + if names is not None: + if len(names) != len(set(names)): + raise ValueError("Duplicated names are not allowed.") + if not ( + bigframes.core.utils.is_list_like(names, allow_sets=False) + or isinstance(names, abc.KeysView) + ): + raise ValueError("Names should be an ordered collection.") + if index_col is True: raise ValueError("The value of index_col couldn't be 'True'") @@ -1004,11 +1012,9 @@ def _read_csv_w_bigquery_engine( elif header > 0: job_config.skip_leading_rows = header + 1 - return self._loader.read_bigquery_load_job( - filepath_or_buffer, - job_config=job_config, - index_col=index_col, - columns=columns, + table_id = self._loader.load_file(filepath_or_buffer, job_config=job_config) + return self._loader.read_gbq_table( + table_id, index_col=index_col, columns=columns, names=names ) def read_pickle( @@ -1049,8 +1055,8 @@ def read_parquet( job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.PARQUET job_config.labels = {"bigframes-api": "read_parquet"} - - return self._loader.read_bigquery_load_job(path, job_config=job_config) + table_id = self._loader.load_file(path, job_config=job_config) + return self._loader.read_gbq_table(table_id) else: if "*" in path: raise ValueError( @@ -1121,10 +1127,8 @@ def read_json( job_config.encoding = encoding job_config.labels = {"bigframes-api": "read_json"} - return self._loader.read_bigquery_load_job( - path_or_buf, - job_config=job_config, - ) + table_id = self._loader.load_file(path_or_buf, job_config=job_config) + return self._loader.read_gbq_table(table_id) else: if any(arg in kwargs for arg in ("chunksize", "iterator")): raise NotImplementedError( diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 34183b22bc..8d8f247185 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -235,6 +235,8 @@ def get_index_cols( | Iterable[int] | int | bigframes.enums.DefaultIndexKind, + *, + names: Optional[Iterable[str]] = None, ) -> List[str]: """ If we can get a total ordering from the table, such as via primary key @@ -245,6 +247,14 @@ def get_index_cols( # Transform index_col -> index_cols so we have a variable that is # always a list of column names (possibly empty). schema_len = len(table.schema) + + # If the `names` is provided, the index_col provided by the user is the new + # name, so we need to rename it to the original name in the table schema. + renamed_schema: Optional[Dict[str, str]] = None + if names is not None: + assert len(list(names)) == schema_len + renamed_schema = {name: field.name for name, field in zip(names, table.schema)} + index_cols: List[str] = [] if isinstance(index_col, bigframes.enums.DefaultIndexKind): if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64: @@ -261,6 +271,8 @@ def get_index_cols( f"Got unexpected index_col {repr(index_col)}. {constants.FEEDBACK_LINK}" ) elif isinstance(index_col, str): + if renamed_schema is not None: + index_col = renamed_schema.get(index_col, index_col) index_cols = [index_col] elif isinstance(index_col, int): if not 0 <= index_col < schema_len: @@ -272,6 +284,8 @@ def get_index_cols( elif isinstance(index_col, Iterable): for item in index_col: if isinstance(item, str): + if renamed_schema is not None: + item = renamed_schema.get(item, item) index_cols.append(item) elif isinstance(item, int): if not 0 <= item < schema_len: diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 76f12ae438..e6b24e016c 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -348,7 +348,7 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob): def read_gbq_table( self, - query: str, + table_id: str, *, index_col: Iterable[str] | str @@ -356,6 +356,7 @@ def read_gbq_table( | int | bigframes.enums.DefaultIndexKind = (), columns: Iterable[str] = (), + names: Optional[Iterable[str]] = None, max_results: Optional[int] = None, api_name: str = "read_gbq_table", use_cache: bool = True, @@ -375,7 +376,7 @@ def read_gbq_table( ) table_ref = google.cloud.bigquery.table.TableReference.from_string( - query, default_project=self._bqclient.project + table_id, default_project=self._bqclient.project ) columns = list(columns) @@ -411,12 +412,37 @@ def read_gbq_table( f"Column '{key}' of `columns` not found in this table. Did you mean '{possibility}'?" ) + # TODO(b/408499371): check `names` work with `use_cols` for read_csv method. + if names is not None: + len_names = len(list(names)) + len_columns = len(table.schema) + if len_names > len_columns: + raise ValueError( + f"Too many columns specified: expected {len_columns}" + f" and found {len_names}" + ) + elif len_names < len_columns: + if ( + isinstance(index_col, bigframes.enums.DefaultIndexKind) + or index_col != () + ): + raise KeyError( + "When providing both `index_col` and `names`, ensure the " + "number of `names` matches the number of columns in your " + "data." + ) + index_col = range(len_columns - len_names) + names = [ + field.name for field in table.schema[: len_columns - len_names] + ] + list(names) + # Converting index_col into a list of column names requires # the table metadata because we might use the primary keys # when constructing the index. index_cols = bf_read_gbq_table.get_index_cols( table=table, index_col=index_col, + names=names, ) _check_column_duplicates(index_cols, columns) @@ -443,7 +469,7 @@ def read_gbq_table( # TODO(b/338419730): We don't need to fallback to a query for wildcard # tables if we allow some non-determinism when time travel isn't supported. if max_results is not None or bf_io_bigquery.is_table_with_wildcard_suffix( - query + table_id ): # TODO(b/338111344): If we are running a query anyway, we might as # well generate ROW_NUMBER() at the same time. @@ -451,7 +477,7 @@ def read_gbq_table( itertools.chain(index_cols, columns) if columns else () ) query = bf_io_bigquery.to_query( - query, + table_id, columns=all_columns, sql_predicate=bf_io_bigquery.compile_filters(filters) if filters @@ -561,6 +587,15 @@ def read_gbq_table( index_names = [None] value_columns = [col for col in array_value.column_ids if col not in index_cols] + if names is not None: + renamed_cols: Dict[str, str] = { + col: new_name for col, new_name in zip(array_value.column_ids, names) + } + index_names = [ + renamed_cols.get(index_col, index_col) for index_col in index_cols + ] + value_columns = [renamed_cols.get(col, col) for col in value_columns] + block = blocks.Block( array_value, index_columns=index_cols, @@ -576,18 +611,12 @@ def read_gbq_table( df.sort_index() return df - def read_bigquery_load_job( + def load_file( self, filepath_or_buffer: str | IO["bytes"], *, job_config: bigquery.LoadJobConfig, - index_col: Iterable[str] - | str - | Iterable[int] - | int - | bigframes.enums.DefaultIndexKind = (), - columns: Iterable[str] = (), - ) -> dataframe.DataFrame: + ) -> str: # Need to create session table beforehand table = self._storage_manager.create_temp_table(_PLACEHOLDER_SCHEMA) # but, we just overwrite the placeholder schema immediately with the load job @@ -615,16 +644,7 @@ def read_bigquery_load_job( self._start_generic_job(load_job) table_id = f"{table.project}.{table.dataset_id}.{table.table_id}" - - # The BigQuery REST API for tables.get doesn't take a session ID, so we - # can't get the schema for a temp table that way. - - return self.read_gbq_table( - query=table_id, - index_col=index_col, - columns=columns, - api_name="read_gbq_table", - ) + return table_id def read_gbq_query( self, diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index c7bf5b3f5e..ced01c940f 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -59,7 +59,7 @@ def df_and_local_csv(scalars_df_index): with tempfile.TemporaryDirectory() as dir: # Prepares local CSV file for reading - path = dir + "/write_df_to_local_csv_file.csv" + path = dir + "/test_read_csv_w_local_csv.csv" scalars_df_index.to_csv(path, index=True) yield scalars_df_index, path @@ -71,7 +71,19 @@ def df_and_gcs_csv(scalars_df_index, gcs_folder): drop_columns = ["bytes_col", "datetime_col", "numeric_col", "geography_col"] scalars_df_index = scalars_df_index.drop(columns=drop_columns) - path = gcs_folder + "test_read_csv_w_write_engine*.csv" + path = gcs_folder + "test_read_csv_w_gcs_csv*.csv" + read_path = utils.get_first_file_from_wildcard(path) + scalars_df_index.to_csv(path, index=True) + return scalars_df_index, read_path + + +@pytest.fixture(scope="module") +def df_and_gcs_csv_for_two_columns(scalars_df_index, gcs_folder): + # Some tests require only two columns to be present in the CSV file. + selected_cols = ["bool_col", "int64_col"] + scalars_df_index = scalars_df_index[selected_cols] + + path = gcs_folder + "df_and_gcs_csv_for_two_columns*.csv" read_path = utils.get_first_file_from_wildcard(path) scalars_df_index.to_csv(path, index=True) return scalars_df_index, read_path @@ -1260,6 +1272,98 @@ def test_read_csv_raises_error_for_invalid_index_col( session.read_csv(path, engine="bigquery", index_col=index_col) +def test_read_csv_for_names(session, df_and_gcs_csv_for_two_columns): + _, path = df_and_gcs_csv_for_two_columns + + names = ["a", "b", "c"] + bf_df = session.read_csv(path, engine="bigquery", names=names) + + # Convert default pandas dtypes to match BigQuery DataFrames dtypes. + pd_df = session.read_csv(path, names=names, dtype=bf_df.dtypes.to_dict()) + + assert bf_df.shape == pd_df.shape + assert bf_df.columns.tolist() == pd_df.columns.tolist() + + # BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs + # (b/280889935) or guarantee row ordering. + bf_df = bf_df.set_index(names[0]).sort_index() + pd_df = pd_df.set_index(names[0]) + pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas()) + + +def test_read_csv_for_names_more_than_columns_can_raise_error( + session, df_and_gcs_csv_for_two_columns +): + _, path = df_and_gcs_csv_for_two_columns + names = ["a", "b", "c", "d"] + with pytest.raises( + ValueError, + match="Too many columns specified: expected 3 and found 4", + ): + session.read_csv(path, engine="bigquery", names=names) + + +def test_read_csv_for_names_less_than_columns(session, df_and_gcs_csv_for_two_columns): + _, path = df_and_gcs_csv_for_two_columns + + names = ["b", "c"] + bf_df = session.read_csv(path, engine="bigquery", names=names) + + # Convert default pandas dtypes to match BigQuery DataFrames dtypes. + pd_df = session.read_csv(path, names=names, dtype=bf_df.dtypes.to_dict()) + + assert bf_df.shape == pd_df.shape + assert bf_df.columns.tolist() == pd_df.columns.tolist() + + # BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs + # (b/280889935) or guarantee row ordering. + bf_df = bf_df.sort_index() + + # Pandas's index name is None, while BigFrames's index name is "rowindex". + pd_df.index.name = "rowindex" + pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas()) + + +def test_read_csv_for_names_less_than_columns_raise_error_when_index_col_set( + session, df_and_gcs_csv_for_two_columns +): + _, path = df_and_gcs_csv_for_two_columns + + names = ["b", "c"] + with pytest.raises( + KeyError, + match="ensure the number of `names` matches the number of columns in your data.", + ): + session.read_csv(path, engine="bigquery", names=names, index_col="rowindex") + + +@pytest.mark.parametrize( + "index_col", + [ + pytest.param("a", id="single_str"), + pytest.param(["a", "b"], id="multi_str"), + pytest.param(0, id="single_int"), + ], +) +def test_read_csv_for_names_and_index_col( + session, df_and_gcs_csv_for_two_columns, index_col +): + _, path = df_and_gcs_csv_for_two_columns + names = ["a", "b", "c"] + bf_df = session.read_csv(path, engine="bigquery", index_col=index_col, names=names) + + # Convert default pandas dtypes to match BigQuery DataFrames dtypes. + pd_df = session.read_csv( + path, index_col=index_col, names=names, dtype=bf_df.dtypes.to_dict() + ) + + assert bf_df.shape == pd_df.shape + assert bf_df.columns.tolist() == pd_df.columns.tolist() + pd.testing.assert_frame_equal( + bf_df.to_pandas(), pd_df.to_pandas(), check_index_type=False + ) + + @pytest.mark.parametrize( ("kwargs", "match"), [ diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 22b439a38b..91b6679702 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -108,14 +108,9 @@ @pytest.mark.parametrize( ("kwargs", "match"), [ - pytest.param( - {"engine": "bigquery", "names": []}, - "BigQuery engine does not support these arguments", - id="with_names", - ), pytest.param( {"engine": "bigquery", "dtype": {}}, - "BigQuery engine does not support these arguments", + "BigQuery engine does not support the `dtype` argument", id="with_dtype", ), pytest.param( @@ -203,6 +198,23 @@ def test_read_csv_with_incompatible_write_engine(engine, write_engine): ) +@pytest.mark.parametrize( + ("names", "error_message"), + ( + pytest.param("abc", "Names should be an ordered collection."), + pytest.param({"a", "b", "c"}, "Names should be an ordered collection."), + pytest.param(["a", "a"], "Duplicated names are not allowed."), + ), +) +def test_read_csv_w_bigquery_engine_raises_error_for_invalid_names( + names, error_message +): + session = mocks.create_bigquery_session() + + with pytest.raises(ValueError, match=error_message): + session.read_csv("path/to/csv.csv", engine="bigquery", names=names) + + @pytest.mark.parametrize("missing_parts_table_id", [(""), ("table")]) def test_read_gbq_missing_parts(missing_parts_table_id): session = mocks.create_bigquery_session() diff --git a/third_party/bigframes_vendored/pandas/io/parsers/readers.py b/third_party/bigframes_vendored/pandas/io/parsers/readers.py index 2b1e3dd70b..4757f5ed9d 100644 --- a/third_party/bigframes_vendored/pandas/io/parsers/readers.py +++ b/third_party/bigframes_vendored/pandas/io/parsers/readers.py @@ -114,7 +114,7 @@ def read_csv( names (default None): a list of column names to use. If the file contains a header row and you want to pass this parameter, then `header=0` should be passed as well so the - first (header) row is ignored. Only to be used with default engine. + first (header) row is ignored. index_col (default None): column(s) to use as the row labels of the DataFrame, either given as string name or column index. `index_col=False` can be used with the default