Skip to content

Commit 4c271ac

Browse files
committed
Merge remote-tracking branch 'refs/remotes/github/main' into simplify-rf-cleanup-code
2 parents 9ebb3a2 + 0ddee99 commit 4c271ac

File tree

8 files changed

+350
-21
lines changed

8 files changed

+350
-21
lines changed

.kokoro/build.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,6 @@ if [[ -n "${NOX_SESSION:-}" ]]; then
5050
else
5151
python3 -m nox --stop-on-first-error
5252
fi
53+
54+
# Prevent kokoro from trying to collect many mb of artifacts, wasting several minutes
55+
sudo rm -rf "${KOKORO_ARTIFACTS_DIR?}"/*

bigframes/dataframe.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4199,11 +4199,13 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
41994199
udf_input_dtypes = getattr(func, "input_dtypes")
42004200
if len(udf_input_dtypes) != len(self.columns):
42014201
raise ValueError(
4202-
f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns."
4202+
f"BigFrames BigQuery function takes {len(udf_input_dtypes)}"
4203+
f" arguments but DataFrame has {len(self.columns)} columns."
42034204
)
42044205
if udf_input_dtypes != tuple(self.dtypes.to_list()):
42054206
raise ValueError(
4206-
f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}."
4207+
f"BigFrames BigQuery function takes arguments of types "
4208+
f"{udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}."
42074209
)
42084210

42094211
series_list = [self[col] for col in self.columns]

bigframes/functions/_function_session.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,7 @@ def wrapper(func):
892892
func = cloudpickle.loads(cloudpickle.dumps(func))
893893

894894
self._try_delattr(func, "bigframes_bigquery_function")
895+
self._try_delattr(func, "bigframes_bigquery_function_output_dtype")
895896
self._try_delattr(func, "input_dtypes")
896897
self._try_delattr(func, "output_dtype")
897898
self._try_delattr(func, "is_row_processor")
@@ -951,6 +952,10 @@ def wrapper(func):
951952
ibis_signature.output_type
952953
)
953954
)
955+
# Managed function directly supports certain output types which are
956+
# not supported in remote function (e.g. list output). Thus no more
957+
# processing for 'bigframes_bigquery_function_output_dtype'.
958+
func.bigframes_bigquery_function_output_dtype = func.output_dtype
954959
func.is_row_processor = is_row_processor
955960
func.ibis_node = node
956961

bigframes/operations/remote_function_ops.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ def expensive(self) -> bool:
2929
return True
3030

3131
def output_type(self, *input_types):
32-
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
32+
# The output dtype should be set to a valid Dtype by @udf decorator,
33+
# @remote_function decorator, or read_gbq_function method.
3334
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
3435
return self.func.bigframes_bigquery_function_output_dtype
35-
else:
36-
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
36+
37+
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
3738

3839

3940
@dataclasses.dataclass(frozen=True)
@@ -46,11 +47,12 @@ def expensive(self) -> bool:
4647
return True
4748

4849
def output_type(self, *input_types):
49-
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
50+
# The output dtype should be set to a valid Dtype by @udf decorator,
51+
# @remote_function decorator, or read_gbq_function method.
5052
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
5153
return self.func.bigframes_bigquery_function_output_dtype
52-
else:
53-
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
54+
55+
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
5456

5557

5658
@dataclasses.dataclass(frozen=True)
@@ -63,8 +65,9 @@ def expensive(self) -> bool:
6365
return True
6466

6567
def output_type(self, *input_types):
66-
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
68+
# The output dtype should be set to a valid Dtype by @udf decorator,
69+
# @remote_function decorator, or read_gbq_function method.
6770
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
6871
return self.func.bigframes_bigquery_function_output_dtype
69-
else:
70-
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
72+
73+
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")

noxfile.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ def mypy(session):
256256
set(
257257
[
258258
"mypy",
259-
"pandas-stubs",
259+
# TODO: update to latest pandas-stubs once we resolve bigframes issues.
260+
"pandas-stubs<=2.2.3.241126",
260261
"types-protobuf",
261262
"types-python-dateutil",
262263
"types-requests",

tests/system/large/functions/test_managed_function.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
# limitations under the License.
1414

1515
import pandas
16+
import pyarrow
1617
import pytest
1718

19+
import bigframes
1820
from bigframes.functions import _function_session as bff_session
1921
from bigframes.functions._utils import get_python_version
2022
import bigframes.pandas as bpd
@@ -164,3 +166,161 @@ def func(x, y):
164166
cleanup_function_assets(
165167
session.bqclient, session.cloudfunctionsclient, managed_func
166168
)
169+
170+
171+
@pytest.mark.parametrize(
172+
"array_dtype",
173+
[
174+
bool,
175+
int,
176+
float,
177+
str,
178+
],
179+
)
180+
@pytest.mark.skipif(
181+
get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS,
182+
reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}",
183+
)
184+
def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype):
185+
try:
186+
187+
@session.udf(dataset=dataset_id)
188+
def featurize(x: int) -> list[array_dtype]: # type: ignore
189+
return [array_dtype(i) for i in [x, x + 1, x + 2]]
190+
191+
scalars_df, scalars_pandas_df = scalars_dfs
192+
193+
bf_int64_col = scalars_df["int64_too"]
194+
bf_result = bf_int64_col.apply(featurize).to_pandas()
195+
196+
pd_int64_col = scalars_pandas_df["int64_too"]
197+
pd_result = pd_int64_col.apply(featurize)
198+
199+
# Ignore any dtype disparity.
200+
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
201+
202+
finally:
203+
# Clean up the gcp assets created for the managed function.
204+
cleanup_function_assets(
205+
featurize, session.bqclient, session.cloudfunctionsclient
206+
)
207+
208+
209+
@pytest.mark.skipif(
210+
get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS,
211+
reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}",
212+
)
213+
def test_managed_function_binop_array_output(session, scalars_dfs, dataset_id):
214+
try:
215+
216+
def func(x, y):
217+
return [len(x), abs(y % 4)]
218+
219+
managed_func = session.udf(
220+
input_types=[str, int],
221+
output_type=list[int],
222+
dataset=dataset_id,
223+
)(func)
224+
225+
scalars_df, scalars_pandas_df = scalars_dfs
226+
227+
scalars_df = scalars_df.dropna()
228+
scalars_pandas_df = scalars_pandas_df.dropna()
229+
bf_result = (
230+
scalars_df["string_col"]
231+
.combine(scalars_df["int64_col"], managed_func)
232+
.to_pandas()
233+
)
234+
pd_result = scalars_pandas_df["string_col"].combine(
235+
scalars_pandas_df["int64_col"], func
236+
)
237+
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
238+
finally:
239+
# Clean up the gcp assets created for the managed function.
240+
cleanup_function_assets(
241+
managed_func, session.bqclient, session.cloudfunctionsclient
242+
)
243+
244+
245+
@pytest.mark.skipif(
246+
get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS,
247+
reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}",
248+
)
249+
def test_manage_function_df_apply_axis_1_array_output(session):
250+
bf_df = bigframes.dataframe.DataFrame(
251+
{
252+
"Id": [1, 2, 3],
253+
"Age": [22.5, 23, 23.5],
254+
"Name": ["alpha", "beta", "gamma"],
255+
}
256+
)
257+
258+
expected_dtypes = (
259+
bigframes.dtypes.INT_DTYPE,
260+
bigframes.dtypes.FLOAT_DTYPE,
261+
bigframes.dtypes.STRING_DTYPE,
262+
)
263+
264+
# Assert the dataframe dtypes.
265+
assert tuple(bf_df.dtypes) == expected_dtypes
266+
267+
try:
268+
269+
@session.udf(input_types=[int, float, str], output_type=list[str])
270+
def foo(x, y, z):
271+
return [str(x), str(y), z]
272+
273+
assert getattr(foo, "is_row_processor") is False
274+
assert getattr(foo, "input_dtypes") == expected_dtypes
275+
assert getattr(foo, "output_dtype") == pandas.ArrowDtype(
276+
pyarrow.list_(
277+
bigframes.dtypes.bigframes_dtype_to_arrow_dtype(
278+
bigframes.dtypes.STRING_DTYPE
279+
)
280+
)
281+
)
282+
assert getattr(foo, "output_dtype") == getattr(
283+
foo, "bigframes_bigquery_function_output_dtype"
284+
)
285+
286+
# Fails to apply on dataframe with incompatible number of columns.
287+
with pytest.raises(
288+
ValueError,
289+
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$",
290+
):
291+
bf_df[["Id", "Age"]].apply(foo, axis=1)
292+
293+
with pytest.raises(
294+
ValueError,
295+
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$",
296+
):
297+
bf_df.assign(Country="lalaland").apply(foo, axis=1)
298+
299+
# Fails to apply on dataframe with incompatible column datatypes.
300+
with pytest.raises(
301+
ValueError,
302+
match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
303+
):
304+
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)
305+
306+
# Successfully applies to dataframe with matching number of columns.
307+
# and their datatypes.
308+
bf_result = bf_df.apply(foo, axis=1).to_pandas()
309+
310+
# Since this scenario is not pandas-like, let's handcraft the
311+
# expected result.
312+
expected_result = pandas.Series(
313+
[
314+
["1", "22.5", "alpha"],
315+
["2", "23.0", "beta"],
316+
["3", "23.5", "gamma"],
317+
]
318+
)
319+
320+
pandas.testing.assert_series_equal(
321+
expected_result, bf_result, check_dtype=False, check_index_type=False
322+
)
323+
324+
finally:
325+
# Clean up the gcp assets created for the managed function.
326+
cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient)

tests/system/large/functions/test_remote_function.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,19 +1912,19 @@ def foo(x, y, z):
19121912
# Fails to apply on dataframe with incompatible number of columns
19131913
with pytest.raises(
19141914
ValueError,
1915-
match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$",
1915+
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$",
19161916
):
19171917
bf_df[["Id", "Age"]].apply(foo, axis=1)
19181918
with pytest.raises(
19191919
ValueError,
1920-
match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$",
1920+
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$",
19211921
):
19221922
bf_df.assign(Country="lalaland").apply(foo, axis=1)
19231923

19241924
# Fails to apply on dataframe with incompatible column datatypes
19251925
with pytest.raises(
19261926
ValueError,
1927-
match="^Remote function takes arguments of types .* but DataFrame dtypes are .*",
1927+
match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
19281928
):
19291929
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)
19301930

@@ -1993,19 +1993,19 @@ def foo(x, y, z):
19931993
# Fails to apply on dataframe with incompatible number of columns
19941994
with pytest.raises(
19951995
ValueError,
1996-
match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$",
1996+
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$",
19971997
):
19981998
bf_df[["Id", "Age"]].apply(foo, axis=1)
19991999
with pytest.raises(
20002000
ValueError,
2001-
match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$",
2001+
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$",
20022002
):
20032003
bf_df.assign(Country="lalaland").apply(foo, axis=1)
20042004

20052005
# Fails to apply on dataframe with incompatible column datatypes
20062006
with pytest.raises(
20072007
ValueError,
2008-
match="^Remote function takes arguments of types .* but DataFrame dtypes are .*",
2008+
match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
20092009
):
20102010
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)
20112011

@@ -2057,19 +2057,19 @@ def foo(x):
20572057
# Fails to apply on dataframe with incompatible number of columns
20582058
with pytest.raises(
20592059
ValueError,
2060-
match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$",
2060+
match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 0 columns\\.$",
20612061
):
20622062
bf_df[[]].apply(foo, axis=1)
20632063
with pytest.raises(
20642064
ValueError,
2065-
match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$",
2065+
match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 2 columns\\.$",
20662066
):
20672067
bf_df.assign(Country="lalaland").apply(foo, axis=1)
20682068

20692069
# Fails to apply on dataframe with incompatible column datatypes
20702070
with pytest.raises(
20712071
ValueError,
2072-
match="^Remote function takes arguments of types .* but DataFrame dtypes are .*",
2072+
match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
20732073
):
20742074
bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1)
20752075

0 commit comments

Comments
 (0)