Skip to content

Commit 5f87995

Browse files
Merge branch 'sycai_rolling_window' of https://github.com/googleapis/python-bigquery-dataframes into sycai_rolling_window
2 parents 75de6ad + 91d349e commit 5f87995

File tree

6 files changed

+476
-354
lines changed

6 files changed

+476
-354
lines changed

bigframes/core/blocks.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@ def to_pandas_batches(
590590
page_size: Optional[int] = None,
591591
max_results: Optional[int] = None,
592592
allow_large_results: Optional[bool] = None,
593+
squeeze: Optional[bool] = False,
593594
):
594595
"""Download results one message at a time.
595596
@@ -605,7 +606,10 @@ def to_pandas_batches(
605606
for record_batch in execute_result.arrow_batches():
606607
df = io_pandas.arrow_to_pandas(record_batch, self.expr.schema)
607608
self._copy_index_to_pandas(df)
608-
yield df
609+
if squeeze:
610+
yield df.squeeze(axis=1)
611+
else:
612+
yield df
609613

610614
def _copy_index_to_pandas(self, df: pd.DataFrame):
611615
"""Set the index on pandas DataFrame to match this block.

bigframes/series.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,18 @@
2323
import numbers
2424
import textwrap
2525
import typing
26-
from typing import Any, cast, List, Literal, Mapping, Optional, Sequence, Tuple, Union
26+
from typing import (
27+
Any,
28+
cast,
29+
Iterable,
30+
List,
31+
Literal,
32+
Mapping,
33+
Optional,
34+
Sequence,
35+
Tuple,
36+
Union,
37+
)
2738

2839
import bigframes_vendored.constants as constants
2940
import bigframes_vendored.pandas.core.series as vendored_pandas_series
@@ -479,6 +490,70 @@ def to_pandas(
479490
series.name = self._name
480491
return series
481492

493+
def to_pandas_batches(
494+
self,
495+
page_size: Optional[int] = None,
496+
max_results: Optional[int] = None,
497+
*,
498+
allow_large_results: Optional[bool] = None,
499+
) -> Iterable[pandas.Series]:
500+
"""Stream Series results to an iterable of pandas Series.
501+
502+
page_size and max_results determine the size and number of batches,
503+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result
504+
505+
**Examples:**
506+
507+
>>> import bigframes.pandas as bpd
508+
>>> bpd.options.display.progress_bar = None
509+
>>> s = bpd.Series([4, 3, 2, 2, 3])
510+
511+
Iterate through the results in batches, limiting the total rows yielded
512+
across all batches via `max_results`:
513+
514+
>>> for s_batch in s.to_pandas_batches(max_results=3):
515+
... print(s_batch)
516+
0 4
517+
1 3
518+
2 2
519+
dtype: Int64
520+
521+
Alternatively, control the approximate size of each batch using `page_size`
522+
and fetch batches manually using `next()`:
523+
524+
>>> it = s.to_pandas_batches(page_size=2)
525+
>>> next(it)
526+
0 4
527+
1 3
528+
dtype: Int64
529+
>>> next(it)
530+
2 2
531+
3 2
532+
dtype: Int64
533+
534+
Args:
535+
page_size (int, default None):
536+
The maximum number of rows of each batch. Non-positive values are ignored.
537+
max_results (int, default None):
538+
The maximum total number of rows of all batches.
539+
allow_large_results (bool, default None):
540+
If not None, overrides the global setting to allow or disallow large query results
541+
over the default size limit of 10 GB.
542+
543+
Returns:
544+
Iterable[pandas.Series]:
545+
An iterable of smaller Series which combine to
546+
form the original Series. Results stream from bigquery,
547+
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable
548+
"""
549+
df = self._block.to_pandas_batches(
550+
page_size=page_size,
551+
max_results=max_results,
552+
allow_large_results=allow_large_results,
553+
squeeze=True,
554+
)
555+
return df
556+
482557
def _compute_dry_run(self) -> bigquery.QueryJob:
483558
_, query_job = self._block._compute_dry_run((self._value_column,))
484559
return query_job

bigframes/session/__init__.py

Lines changed: 162 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -910,112 +910,183 @@ def read_csv(
910910
engine=engine,
911911
write_engine=write_engine,
912912
)
913-
if engine is not None and engine == "bigquery":
914-
if any(param is not None for param in (dtype, names)):
915-
not_supported = ("dtype", "names")
916-
raise NotImplementedError(
917-
f"BigQuery engine does not support these arguments: {not_supported}. "
918-
f"{constants.FEEDBACK_LINK}"
919-
)
920913

921-
# TODO(b/338089659): Looks like we can relax this 1 column
922-
# restriction if we check the contents of an iterable are strings
923-
# not integers.
924-
if (
925-
# Empty tuples, None, and False are allowed and falsey.
926-
index_col
927-
and not isinstance(index_col, bigframes.enums.DefaultIndexKind)
928-
and not isinstance(index_col, str)
929-
):
930-
raise NotImplementedError(
931-
"BigQuery engine only supports a single column name for `index_col`, "
932-
f"got: {repr(index_col)}. {constants.FEEDBACK_LINK}"
933-
)
914+
if engine != "bigquery":
915+
# Using pandas.read_csv by default and warning about potential issues with
916+
# large files.
917+
return self._read_csv_w_pandas_engines(
918+
filepath_or_buffer,
919+
sep=sep,
920+
header=header,
921+
names=names,
922+
index_col=index_col,
923+
usecols=usecols, # type: ignore
924+
dtype=dtype,
925+
engine=engine,
926+
encoding=encoding,
927+
write_engine=write_engine,
928+
**kwargs,
929+
)
930+
else:
931+
return self._read_csv_w_bigquery_engine(
932+
filepath_or_buffer,
933+
sep=sep,
934+
header=header,
935+
names=names,
936+
index_col=index_col,
937+
usecols=usecols, # type: ignore
938+
dtype=dtype,
939+
encoding=encoding,
940+
)
934941

935-
# None and False cannot be passed to read_gbq.
936-
# TODO(b/338400133): When index_col is None, we should be using the
937-
# first column of the CSV as the index to be compatible with the
938-
# pandas engine. According to the pandas docs, only "False"
939-
# indicates a default sequential index.
940-
if not index_col:
941-
index_col = ()
942+
def _read_csv_w_pandas_engines(
943+
self,
944+
filepath_or_buffer,
945+
*,
946+
sep,
947+
header,
948+
names,
949+
index_col,
950+
usecols,
951+
dtype,
952+
engine,
953+
encoding,
954+
write_engine,
955+
**kwargs,
956+
) -> dataframe.DataFrame:
957+
"""Reads a CSV file using pandas engines into a BigQuery DataFrames.
942958
943-
index_col = typing.cast(
944-
Union[
945-
Sequence[str], # Falsey values
946-
bigframes.enums.DefaultIndexKind,
947-
str,
948-
],
949-
index_col,
959+
This method serves as the implementation backend for read_csv when the
960+
specified engine is one supported directly by pandas ('c', 'python',
961+
'pyarrow').
962+
"""
963+
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
964+
raise NotImplementedError(
965+
f"With index_col={repr(index_col)}, only engine='bigquery' is supported. "
966+
f"{constants.FEEDBACK_LINK}"
950967
)
968+
if any(arg in kwargs for arg in ("chunksize", "iterator")):
969+
raise NotImplementedError(
970+
"'chunksize' and 'iterator' arguments are not supported. "
971+
f"{constants.FEEDBACK_LINK}"
972+
)
973+
if isinstance(filepath_or_buffer, str):
974+
self._check_file_size(filepath_or_buffer)
951975

952-
# usecols should only be an iterable of strings (column names) for use as columns in read_gbq.
953-
columns: Tuple[Any, ...] = tuple()
954-
if usecols is not None:
955-
if isinstance(usecols, Iterable) and all(
956-
isinstance(col, str) for col in usecols
957-
):
958-
columns = tuple(col for col in usecols)
959-
else:
960-
raise NotImplementedError(
961-
"BigQuery engine only supports an iterable of strings for `usecols`. "
962-
f"{constants.FEEDBACK_LINK}"
963-
)
976+
pandas_df = pandas.read_csv(
977+
filepath_or_buffer,
978+
sep=sep,
979+
header=header,
980+
names=names,
981+
index_col=index_col,
982+
usecols=usecols, # type: ignore
983+
dtype=dtype,
984+
engine=engine,
985+
encoding=encoding,
986+
**kwargs,
987+
)
988+
return self._read_pandas(pandas_df, api_name="read_csv", write_engine=write_engine) # type: ignore
964989

965-
if encoding is not None and encoding not in _VALID_ENCODINGS:
966-
raise NotImplementedError(
967-
f"BigQuery engine only supports the following encodings: {_VALID_ENCODINGS}. "
968-
f"{constants.FEEDBACK_LINK}"
969-
)
990+
def _read_csv_w_bigquery_engine(
991+
self,
992+
filepath_or_buffer,
993+
*,
994+
sep,
995+
header,
996+
names,
997+
index_col,
998+
usecols,
999+
dtype,
1000+
encoding,
1001+
) -> dataframe.DataFrame:
1002+
"""Reads a CSV file using the BigQuery engine into a BigQuery DataFrames.
9701003
971-
job_config = bigquery.LoadJobConfig()
972-
job_config.source_format = bigquery.SourceFormat.CSV
973-
job_config.autodetect = True
974-
job_config.field_delimiter = sep
975-
job_config.encoding = encoding
976-
job_config.labels = {"bigframes-api": "read_csv"}
1004+
This method serves as the implementation backend for read_csv when the
1005+
'bigquery' engine is specified or inferred. It leverages BigQuery's
1006+
native CSV loading capabilities, making it suitable for large datasets
1007+
that may not fit into local memory.
1008+
"""
9771009

978-
# We want to match pandas behavior. If header is 0, no rows should be skipped, so we
979-
# do not need to set `skip_leading_rows`. If header is None, then there is no header.
980-
# Setting skip_leading_rows to 0 does that. If header=N and N>0, we want to skip N rows.
981-
if header is None:
982-
job_config.skip_leading_rows = 0
983-
elif header > 0:
984-
job_config.skip_leading_rows = header
1010+
if any(param is not None for param in (dtype, names)):
1011+
not_supported = ("dtype", "names")
1012+
raise NotImplementedError(
1013+
f"BigQuery engine does not support these arguments: {not_supported}. "
1014+
f"{constants.FEEDBACK_LINK}"
1015+
)
9851016

986-
return self._loader.read_bigquery_load_job(
987-
filepath_or_buffer,
988-
job_config=job_config,
989-
index_col=index_col,
990-
columns=columns,
1017+
# TODO(b/338089659): Looks like we can relax this 1 column
1018+
# restriction if we check the contents of an iterable are strings
1019+
# not integers.
1020+
if (
1021+
# Empty tuples, None, and False are allowed and falsey.
1022+
index_col
1023+
and not isinstance(index_col, bigframes.enums.DefaultIndexKind)
1024+
and not isinstance(index_col, str)
1025+
):
1026+
raise NotImplementedError(
1027+
"BigQuery engine only supports a single column name for `index_col`, "
1028+
f"got: {repr(index_col)}. {constants.FEEDBACK_LINK}"
9911029
)
992-
else:
993-
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
994-
raise NotImplementedError(
995-
f"With index_col={repr(index_col)}, only engine='bigquery' is supported. "
996-
f"{constants.FEEDBACK_LINK}"
997-
)
998-
if any(arg in kwargs for arg in ("chunksize", "iterator")):
1030+
1031+
# None and False cannot be passed to read_gbq.
1032+
# TODO(b/338400133): When index_col is None, we should be using the
1033+
# first column of the CSV as the index to be compatible with the
1034+
# pandas engine. According to the pandas docs, only "False"
1035+
# indicates a default sequential index.
1036+
if not index_col:
1037+
index_col = ()
1038+
1039+
index_col = typing.cast(
1040+
Union[
1041+
Sequence[str], # Falsey values
1042+
bigframes.enums.DefaultIndexKind,
1043+
str,
1044+
],
1045+
index_col,
1046+
)
1047+
1048+
# usecols should only be an iterable of strings (column names) for use as columns in read_gbq.
1049+
columns: Tuple[Any, ...] = tuple()
1050+
if usecols is not None:
1051+
if isinstance(usecols, Iterable) and all(
1052+
isinstance(col, str) for col in usecols
1053+
):
1054+
columns = tuple(col for col in usecols)
1055+
else:
9991056
raise NotImplementedError(
1000-
"'chunksize' and 'iterator' arguments are not supported. "
1057+
"BigQuery engine only supports an iterable of strings for `usecols`. "
10011058
f"{constants.FEEDBACK_LINK}"
10021059
)
10031060

1004-
if isinstance(filepath_or_buffer, str):
1005-
self._check_file_size(filepath_or_buffer)
1006-
pandas_df = pandas.read_csv(
1007-
filepath_or_buffer,
1008-
sep=sep,
1009-
header=header,
1010-
names=names,
1011-
index_col=index_col,
1012-
usecols=usecols, # type: ignore
1013-
dtype=dtype,
1014-
engine=engine,
1015-
encoding=encoding,
1016-
**kwargs,
1061+
if encoding is not None and encoding not in _VALID_ENCODINGS:
1062+
raise NotImplementedError(
1063+
f"BigQuery engine only supports the following encodings: {_VALID_ENCODINGS}. "
1064+
f"{constants.FEEDBACK_LINK}"
10171065
)
1018-
return self._read_pandas(pandas_df, api_name="read_csv", write_engine=write_engine) # type: ignore
1066+
1067+
job_config = bigquery.LoadJobConfig()
1068+
job_config.source_format = bigquery.SourceFormat.CSV
1069+
job_config.autodetect = True
1070+
job_config.field_delimiter = sep
1071+
job_config.encoding = encoding
1072+
job_config.labels = {"bigframes-api": "read_csv"}
1073+
1074+
# b/409070192: When header > 0, pandas and BigFrames returns different column naming.
1075+
1076+
# We want to match pandas behavior. If header is 0, no rows should be skipped, so we
1077+
# do not need to set `skip_leading_rows`. If header is None, then there is no header.
1078+
# Setting skip_leading_rows to 0 does that. If header=N and N>0, we want to skip N rows.
1079+
if header is None:
1080+
job_config.skip_leading_rows = 0
1081+
elif header > 0:
1082+
job_config.skip_leading_rows = header + 1
1083+
1084+
return self._loader.read_bigquery_load_job(
1085+
filepath_or_buffer,
1086+
job_config=job_config,
1087+
index_col=index_col,
1088+
columns=columns,
1089+
)
10191090

10201091
def read_pickle(
10211092
self,

0 commit comments

Comments
 (0)