Skip to content

test: simplify function cleanup during testing #1475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
3 changes: 0 additions & 3 deletions .kokoro/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,3 @@ if [[ -n "${NOX_SESSION:-}" ]]; then
else
python3 -m nox --stop-on-first-error
fi

# Prevent kokoro from trying to collect many mb of artifacts, wasting several minutes
sudo rm -rf "${KOKORO_ARTIFACTS_DIR?}"/*
6 changes: 2 additions & 4 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4199,13 +4199,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
udf_input_dtypes = getattr(func, "input_dtypes")
if len(udf_input_dtypes) != len(self.columns):
raise ValueError(
f"BigFrames BigQuery function takes {len(udf_input_dtypes)}"
f" arguments but DataFrame has {len(self.columns)} columns."
f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns."
)
if udf_input_dtypes != tuple(self.dtypes.to_list()):
raise ValueError(
f"BigFrames BigQuery function takes arguments of types "
f"{udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}."
f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}."
)

series_list = [self[col] for col in self.columns]
Expand Down
5 changes: 0 additions & 5 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,6 @@ def wrapper(func):
func = cloudpickle.loads(cloudpickle.dumps(func))

self._try_delattr(func, "bigframes_bigquery_function")
self._try_delattr(func, "bigframes_bigquery_function_output_dtype")
self._try_delattr(func, "input_dtypes")
self._try_delattr(func, "output_dtype")
self._try_delattr(func, "is_row_processor")
Expand Down Expand Up @@ -974,10 +973,6 @@ def wrapper(func):
ibis_signature.output_type
)
)
# Managed function directly supports certain output types which are
# not supported in remote function (e.g. list output). Thus no more
# processing for 'bigframes_bigquery_function_output_dtype'.
func.bigframes_bigquery_function_output_dtype = func.output_dtype
func.is_row_processor = is_row_processor
func.ibis_node = node

Expand Down
21 changes: 9 additions & 12 deletions bigframes/operations/remote_function_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ def expensive(self) -> bool:
return True

def output_type(self, *input_types):
# The output dtype should be set to a valid Dtype by @udf decorator,
# @remote_function decorator, or read_gbq_function method.
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
return self.func.bigframes_bigquery_function_output_dtype

raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
else:
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")


@dataclasses.dataclass(frozen=True)
Expand All @@ -47,12 +46,11 @@ def expensive(self) -> bool:
return True

def output_type(self, *input_types):
# The output dtype should be set to a valid Dtype by @udf decorator,
# @remote_function decorator, or read_gbq_function method.
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
return self.func.bigframes_bigquery_function_output_dtype

raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
else:
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")


@dataclasses.dataclass(frozen=True)
Expand All @@ -65,9 +63,8 @@ def expensive(self) -> bool:
return True

def output_type(self, *input_types):
# The output dtype should be set to a valid Dtype by @udf decorator,
# @remote_function decorator, or read_gbq_function method.
# This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method
if hasattr(self.func, "bigframes_bigquery_function_output_dtype"):
return self.func.bigframes_bigquery_function_output_dtype

raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
else:
raise AttributeError("bigframes_bigquery_function_output_dtype not defined")
3 changes: 1 addition & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ def mypy(session):
set(
[
"mypy",
# TODO: update to latest pandas-stubs once we resolve bigframes issues.
"pandas-stubs<=2.2.3.241126",
"pandas-stubs",
"types-protobuf",
"types-python-dateutil",
"types-requests",
Expand Down
183 changes: 81 additions & 102 deletions tests/system/large/functions/test_managed_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,60 +164,45 @@ def func(x, y):
],
)
def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype):
try:

@session.udf(dataset=dataset_id)
def featurize(x: int) -> list[array_dtype]: # type: ignore
return [array_dtype(i) for i in [x, x + 1, x + 2]]
@session.udf(dataset=dataset_id)
def featurize(x: int) -> list[array_dtype]: # type: ignore
return [array_dtype(i) for i in [x, x + 1, x + 2]]

scalars_df, scalars_pandas_df = scalars_dfs
scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_too"]
bf_result = bf_int64_col.apply(featurize).to_pandas()
bf_int64_col = scalars_df["int64_too"]
bf_result = bf_int64_col.apply(featurize).to_pandas()

pd_int64_col = scalars_pandas_df["int64_too"]
pd_result = pd_int64_col.apply(featurize)
pd_int64_col = scalars_pandas_df["int64_too"]
pd_result = pd_int64_col.apply(featurize)

# Ignore any dtype disparity.
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)

finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(
featurize, session.bqclient, session.cloudfunctionsclient
)
# Ignore any dtype disparity.
pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


def test_managed_function_binop_array_output(session, scalars_dfs, dataset_id):
try:

def func(x, y):
return [len(x), abs(y % 4)]

managed_func = session.udf(
input_types=[str, int],
output_type=list[int],
dataset=dataset_id,
)(func)

scalars_df, scalars_pandas_df = scalars_dfs

scalars_df = scalars_df.dropna()
scalars_pandas_df = scalars_pandas_df.dropna()
bf_result = (
scalars_df["string_col"]
.combine(scalars_df["int64_col"], managed_func)
.to_pandas()
)
pd_result = scalars_pandas_df["string_col"].combine(
scalars_pandas_df["int64_col"], func
)
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(
managed_func, session.bqclient, session.cloudfunctionsclient
)
def func(x, y):
return [len(x), abs(y % 4)]

managed_func = session.udf(
input_types=[str, int],
output_type=list[int],
dataset=dataset_id,
)(func)

scalars_df, scalars_pandas_df = scalars_dfs

scalars_df = scalars_df.dropna()
scalars_pandas_df = scalars_pandas_df.dropna()
bf_result = (
scalars_df["string_col"]
.combine(scalars_df["int64_col"], managed_func)
.to_pandas()
)
pd_result = scalars_pandas_df["string_col"].combine(
scalars_pandas_df["int64_col"], func
)
pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)


def test_manage_function_df_apply_axis_1_array_output(session):
Expand All @@ -238,63 +223,57 @@ def test_manage_function_df_apply_axis_1_array_output(session):
# Assert the dataframe dtypes.
assert tuple(bf_df.dtypes) == expected_dtypes

try:
@session.udf(input_types=[int, float, str], output_type=list[str])
def foo(x, y, z):
return [str(x), str(y), z]

@session.udf(input_types=[int, float, str], output_type=list[str])
def foo(x, y, z):
return [str(x), str(y), z]

assert getattr(foo, "is_row_processor") is False
assert getattr(foo, "input_dtypes") == expected_dtypes
assert getattr(foo, "output_dtype") == pandas.ArrowDtype(
pyarrow.list_(
bigframes.dtypes.bigframes_dtype_to_arrow_dtype(
bigframes.dtypes.STRING_DTYPE
)
assert getattr(foo, "is_row_processor") is False
assert getattr(foo, "input_dtypes") == expected_dtypes
assert getattr(foo, "output_dtype") == pandas.ArrowDtype(
pyarrow.list_(
bigframes.dtypes.bigframes_dtype_to_arrow_dtype(
bigframes.dtypes.STRING_DTYPE
)
)
assert getattr(foo, "output_dtype") == getattr(
foo, "bigframes_bigquery_function_output_dtype"
)

# Fails to apply on dataframe with incompatible number of columns.
with pytest.raises(
ValueError,
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$",
):
bf_df[["Id", "Age"]].apply(foo, axis=1)

with pytest.raises(
ValueError,
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$",
):
bf_df.assign(Country="lalaland").apply(foo, axis=1)

# Fails to apply on dataframe with incompatible column datatypes.
with pytest.raises(
ValueError,
match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
):
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)

# Successfully applies to dataframe with matching number of columns.
# and their datatypes.
bf_result = bf_df.apply(foo, axis=1).to_pandas()

# Since this scenario is not pandas-like, let's handcraft the
# expected result.
expected_result = pandas.Series(
[
["1", "22.5", "alpha"],
["2", "23.0", "beta"],
["3", "23.5", "gamma"],
]
)
)
assert getattr(foo, "output_dtype") == getattr(
foo, "bigframes_bigquery_function_output_dtype"
)

pandas.testing.assert_series_equal(
expected_result, bf_result, check_dtype=False, check_index_type=False
)
# Fails to apply on dataframe with incompatible number of columns.
with pytest.raises(
ValueError,
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$",
):
bf_df[["Id", "Age"]].apply(foo, axis=1)

with pytest.raises(
ValueError,
match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$",
):
bf_df.assign(Country="lalaland").apply(foo, axis=1)

# Fails to apply on dataframe with incompatible column datatypes.
with pytest.raises(
ValueError,
match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*",
):
bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1)

# Successfully applies to dataframe with matching number of columns.
# and their datatypes.
bf_result = bf_df.apply(foo, axis=1).to_pandas()

# Since this scenario is not pandas-like, let's handcraft the
# expected result.
expected_result = pandas.Series(
[
["1", "22.5", "alpha"],
["2", "23.0", "beta"],
["3", "23.5", "gamma"],
]
)

finally:
# Clean up the gcp assets created for the managed function.
cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient)
pandas.testing.assert_series_equal(
expected_result, bf_result, check_dtype=False, check_index_type=False
)
Loading
Loading