diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 6cc03455da..58eaa7fedf 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -50,6 +50,3 @@ if [[ -n "${NOX_SESSION:-}" ]]; then else python3 -m nox --stop-on-first-error fi - -# Prevent kokoro from trying to collect many mb of artifacts, wasting several minutes -sudo rm -rf "${KOKORO_ARTIFACTS_DIR?}"/* diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 35b6ff4ddf..004b84e5a5 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4199,13 +4199,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): udf_input_dtypes = getattr(func, "input_dtypes") if len(udf_input_dtypes) != len(self.columns): raise ValueError( - f"BigFrames BigQuery function takes {len(udf_input_dtypes)}" - f" arguments but DataFrame has {len(self.columns)} columns." + f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." ) if udf_input_dtypes != tuple(self.dtypes.to_list()): raise ValueError( - f"BigFrames BigQuery function takes arguments of types " - f"{udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." + f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." ) series_list = [self[col] for col in self.columns] diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 2a211ad7c1..673e52a37e 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -914,7 +914,6 @@ def wrapper(func): func = cloudpickle.loads(cloudpickle.dumps(func)) self._try_delattr(func, "bigframes_bigquery_function") - self._try_delattr(func, "bigframes_bigquery_function_output_dtype") self._try_delattr(func, "input_dtypes") self._try_delattr(func, "output_dtype") self._try_delattr(func, "is_row_processor") @@ -974,10 +973,6 @@ def wrapper(func): ibis_signature.output_type ) ) - # Managed function directly supports certain output types which are - # not supported in remote function (e.g. list output). Thus no more - # processing for 'bigframes_bigquery_function_output_dtype'. - func.bigframes_bigquery_function_output_dtype = func.output_dtype func.is_row_processor = is_row_processor func.ibis_node = node diff --git a/bigframes/operations/remote_function_ops.py b/bigframes/operations/remote_function_ops.py index 51cfccbc41..8505fd1607 100644 --- a/bigframes/operations/remote_function_ops.py +++ b/bigframes/operations/remote_function_ops.py @@ -29,12 +29,11 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # The output dtype should be set to a valid Dtype by @udf decorator, - # @remote_function decorator, or read_gbq_function method. + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + else: + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") @dataclasses.dataclass(frozen=True) @@ -47,12 +46,11 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # The output dtype should be set to a valid Dtype by @udf decorator, - # @remote_function decorator, or read_gbq_function method. + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + else: + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") @dataclasses.dataclass(frozen=True) @@ -65,9 +63,8 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # The output dtype should be set to a valid Dtype by @udf decorator, - # @remote_function decorator, or read_gbq_function method. + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + else: + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") diff --git a/noxfile.py b/noxfile.py index 55cb46c69f..ca391a702a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -259,8 +259,7 @@ def mypy(session): set( [ "mypy", - # TODO: update to latest pandas-stubs once we resolve bigframes issues. - "pandas-stubs<=2.2.3.241126", + "pandas-stubs", "types-protobuf", "types-python-dateutil", "types-requests", diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 0af6810fa9..e91fff9f5a 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -164,60 +164,45 @@ def func(x, y): ], ) def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype): - try: - - @session.udf(dataset=dataset_id) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] + @session.udf(dataset=dataset_id) + def featurize(x: int) -> list[array_dtype]: # type: ignore + return [array_dtype(i) for i in [x, x + 1, x + 2]] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # Ignore any dtype disparity. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) + # Ignore any dtype disparity. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) def test_managed_function_binop_array_output(session, scalars_dfs, dataset_id): - try: - - def func(x, y): - return [len(x), abs(y % 4)] - - managed_func = session.udf( - input_types=[str, int], - output_type=list[int], - dataset=dataset_id, - )(func) - - scalars_df, scalars_pandas_df = scalars_dfs - - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], managed_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - managed_func, session.bqclient, session.cloudfunctionsclient - ) + def func(x, y): + return [len(x), abs(y % 4)] + + managed_func = session.udf( + input_types=[str, int], + output_type=list[int], + dataset=dataset_id, + )(func) + + scalars_df, scalars_pandas_df = scalars_dfs + + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], managed_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) def test_manage_function_df_apply_axis_1_array_output(session): @@ -238,63 +223,57 @@ def test_manage_function_df_apply_axis_1_array_output(session): # Assert the dataframe dtypes. assert tuple(bf_df.dtypes) == expected_dtypes - try: + @session.udf(input_types=[int, float, str], output_type=list[str]) + def foo(x, y, z): + return [str(x), str(y), z] - @session.udf(input_types=[int, float, str], output_type=list[str]) - def foo(x, y, z): - return [str(x), str(y), z] - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - assert getattr(foo, "output_dtype") == pandas.ArrowDtype( - pyarrow.list_( - bigframes.dtypes.bigframes_dtype_to_arrow_dtype( - bigframes.dtypes.STRING_DTYPE - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + assert getattr(foo, "output_dtype") == pandas.ArrowDtype( + pyarrow.list_( + bigframes.dtypes.bigframes_dtype_to_arrow_dtype( + bigframes.dtypes.STRING_DTYPE ) ) - assert getattr(foo, "output_dtype") == getattr( - foo, "bigframes_bigquery_function_output_dtype" - ) - - # Fails to apply on dataframe with incompatible number of columns. - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes. - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns. - # and their datatypes. - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result. - expected_result = pandas.Series( - [ - ["1", "22.5", "alpha"], - ["2", "23.0", "beta"], - ["3", "23.5", "gamma"], - ] - ) + ) + assert getattr(foo, "output_dtype") == getattr( + foo, "bigframes_bigquery_function_output_dtype" + ) - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible number of columns. + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes. + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns. + # and their datatypes. + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result. + expected_result = pandas.Series( + [ + ["1", "22.5", "alpha"], + ["2", "23.0", "beta"], + ["3", "23.5", "gamma"], + ] + ) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 0d7f888306..904d9eab50 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -108,49 +108,44 @@ def test_remote_function_multiply_with_ibis( dataset_id, bq_cf_connection, ): - try: - - @session.remote_function( - [int, int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - ) - def multiply(x, y): - return x * y - - _, dataset_name, table_name = scalars_table_id.split(".") - if not ibis_client.dataset: - ibis_client.dataset = dataset_name - - col_name = "int64_col" - table = ibis_client.tables[table_name] - table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) - sql = table.compile() - pandas_df_orig = bigquery_client.query(sql).to_dataframe() - - col = table[col_name] - col_2x = multiply(col, 2).name("int64_col_2x") - col_square = multiply(col, col).name("int64_col_square") - table = table.mutate([col_2x, col_square]) - sql = table.compile() - pandas_df_new = bigquery_client.query(sql).to_dataframe() - - pandas.testing.assert_series_equal( - pandas_df_orig[col_name] * 2, - pandas_df_new["int64_col_2x"], - check_names=False, - ) + @session.remote_function( + [int, int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + ) + def multiply(x, y): + return x * y + + _, dataset_name, table_name = scalars_table_id.split(".") + if not ibis_client.dataset: + ibis_client.dataset = dataset_name + + col_name = "int64_col" + table = ibis_client.tables[table_name] + table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) + sql = table.compile() + pandas_df_orig = bigquery_client.query(sql).to_dataframe() + + col = table[col_name] + col_2x = multiply(col, 2).name("int64_col_2x") + col_square = multiply(col, col).name("int64_col_square") + table = table.mutate([col_2x, col_square]) + sql = table.compile() + pandas_df_new = bigquery_client.query(sql).to_dataframe() + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name] * 2, + pandas_df_new["int64_col_2x"], + check_names=False, + ) - pandas.testing.assert_series_equal( - pandas_df_orig[col_name] * pandas_df_orig[col_name], - pandas_df_new["int64_col_square"], - check_names=False, - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(multiply, bigquery_client, session.cloudfunctionsclient) + pandas.testing.assert_series_equal( + pandas_df_orig[col_name] * pandas_df_orig[col_name], + pandas_df_new["int64_col_square"], + check_names=False, + ) @pytest.mark.flaky(retries=2, delay=120) @@ -162,208 +157,175 @@ def test_remote_function_stringify_with_ibis( dataset_id, bq_cf_connection, ): - try: - - @session.remote_function( - [int], - str, - dataset_id, - bq_cf_connection, - reuse=False, - ) - def stringify(x): - return f"I got {x}" - - # Function should work locally. - assert stringify(42) == "I got 42" - - _, dataset_name, table_name = scalars_table_id.split(".") - if not ibis_client.dataset: - ibis_client.dataset = dataset_name - - col_name = "int64_col" - table = ibis_client.tables[table_name] - table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) - sql = table.compile() - pandas_df_orig = bigquery_client.query(sql).to_dataframe() - - col = table[col_name] - col_2x = stringify.ibis_node(col).name("int64_str_col") - table = table.mutate([col_2x]) - sql = table.compile() - pandas_df_new = bigquery_client.query(sql).to_dataframe() - - pandas.testing.assert_series_equal( - pandas_df_orig[col_name].apply(lambda x: f"I got {x}"), - pandas_df_new["int64_str_col"], - check_names=False, - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - stringify, bigquery_client, session.cloudfunctionsclient - ) + @session.remote_function( + [int], + str, + dataset_id, + bq_cf_connection, + reuse=False, + ) + def stringify(x): + return f"I got {x}" + + # Function should work locally. + assert stringify(42) == "I got 42" + + _, dataset_name, table_name = scalars_table_id.split(".") + if not ibis_client.dataset: + ibis_client.dataset = dataset_name + + col_name = "int64_col" + table = ibis_client.tables[table_name] + table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) + sql = table.compile() + pandas_df_orig = bigquery_client.query(sql).to_dataframe() + + col = table[col_name] + col_2x = stringify.ibis_node(col).name("int64_str_col") + table = table.mutate([col_2x]) + sql = table.compile() + pandas_df_new = bigquery_client.query(sql).to_dataframe() + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name].apply(lambda x: f"I got {x}"), + pandas_df_new["int64_str_col"], + check_names=False, + ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_binop(session, scalars_dfs, dataset_id, bq_cf_connection): - try: - - def func(x, y): - return x * abs(y % 4) + def func(x, y): + return x * abs(y % 4) - remote_func = session.remote_function( - [str, int], - str, - dataset_id, - bq_cf_connection, - reuse=False, - )(func) + remote_func = session.remote_function( + [str, int], + str, + dataset_id, + bq_cf_connection, + reuse=False, + )(func) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], remote_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_func, session.bqclient, session.cloudfunctionsclient - ) + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], remote_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_binop_array_output( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: + def func(x, y): + return [len(x), abs(y % 4)] - def func(x, y): - return [len(x), abs(y % 4)] - - remote_func = session.remote_function( - [str, int], - list[int], - dataset_id, - bq_cf_connection, - reuse=False, - )(func) + remote_func = session.remote_function( + [str, int], + list[int], + dataset_id, + bq_cf_connection, + reuse=False, + )(func) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], remote_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_func, session.bqclient, session.cloudfunctionsclient - ) + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], remote_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_decorator_with_bigframes_series( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - @session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - ) - def square(x): - return x * x + @session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + ) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_explicit_with_bigframes_series( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def add_one(x): - return x + 1 + def add_one(x): + return x + 1 - remote_add_one = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - )(add_one) + remote_add_one = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + )(add_one) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(remote_add_one) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(remote_add_one) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(add_one) - # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(add_one) + # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_add_one, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.parametrize( @@ -375,25 +337,18 @@ def add_one(x): ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_input_types(session, scalars_dfs, input_types): - try: + def add_one(x): + return x + 1 - def add_one(x): - return x + 1 + remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) + assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) - remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) - assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df.int64_too.map(remote_add_one).to_pandas() + pd_result = scalars_pandas_df.int64_too.map(add_one) - bf_result = scalars_df.int64_too.map(remote_add_one).to_pandas() - pd_result = scalars_pandas_df.int64_too.map(add_one) - - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_add_one, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -403,192 +358,168 @@ def test_remote_function_explicit_dataset_not_created( dataset_id_not_created, bq_cf_connection, ): - try: - - @session.remote_function( - [int], - int, - dataset_id_not_created, - bq_cf_connection, - reuse=False, - ) - def square(x): - return x * x + @session.remote_function( + [int], + int, + dataset_id_not_created, + bq_cf_connection, + reuse=False, + ) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_var( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - POSITIVE_SIGN = 1 - NEGATIVE_SIGN = -1 - NO_SIGN = 0 - - def sign(num): - if num > 0: - return POSITIVE_SIGN - elif num < 0: - return NEGATIVE_SIGN - return NO_SIGN - - remote_sign = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - )(sign) + POSITIVE_SIGN = 1 + NEGATIVE_SIGN = -1 + NO_SIGN = 0 + + def sign(num): + if num > 0: + return POSITIVE_SIGN + elif num < 0: + return NEGATIVE_SIGN + return NO_SIGN + + remote_sign = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + )(sign) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(remote_sign) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(remote_sign) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(sign) - # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(sign) + # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_sign, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_import( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - import math as mymath + import math as mymath - def circumference(radius): - return 2 * mymath.pi * radius + def circumference(radius): + return 2 * mymath.pi * radius - remote_circumference = session.remote_function( - [float], - float, - dataset_id, - bq_cf_connection, - reuse=False, - )(circumference) + remote_circumference = session.remote_function( + [float], + float, + dataset_id, + bq_cf_connection, + reuse=False, + )(circumference) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_float64_col = scalars_df["float64_col"] - bf_float64_col_filter = bf_float64_col.notnull() - bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] - bf_result_col = bf_float64_col_filtered.apply(remote_circumference) - bf_result = ( - bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_float64_col = scalars_df["float64_col"] + bf_float64_col_filter = bf_float64_col.notnull() + bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] + bf_result_col = bf_float64_col_filtered.apply(remote_circumference) + bf_result = ( + bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_float64_col = scalars_pandas_df["float64_col"] - pd_float64_col_filter = pd_float64_col.notnull() - pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] - pd_result_col = pd_float64_col_filtered.apply(circumference) - # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. - # pd_float64_col_filtered.dtype is Float64Dtype() - # pd_float64_col_filtered.apply(lambda x: x).dtype is float64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Float64Dtype()) - pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) + pd_float64_col = scalars_pandas_df["float64_col"] + pd_float64_col_filter = pd_float64_col.notnull() + pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] + pd_result_col = pd_float64_col_filtered.apply(circumference) + # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. + # pd_float64_col_filtered.dtype is Float64Dtype() + # pd_float64_col_filtered.apply(lambda x: x).dtype is float64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Float64Dtype()) + pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_circumference, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_global_var_and_import( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def find_team(num): - boundary = (math.pi + math.e) / 2 - if num >= boundary: - return _team_euler - return _team_pi - - remote_find_team = session.remote_function( - [float], - str, - dataset_id, - bq_cf_connection, - reuse=False, - )(find_team) + def find_team(num): + boundary = (math.pi + math.e) / 2 + if num >= boundary: + return _team_euler + return _team_pi + + remote_find_team = session.remote_function( + [float], + str, + dataset_id, + bq_cf_connection, + reuse=False, + )(find_team) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_float64_col = scalars_df["float64_col"] - bf_float64_col_filter = bf_float64_col.notnull() - bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] - bf_result_col = bf_float64_col_filtered.apply(remote_find_team) - bf_result = ( - bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_float64_col = scalars_df["float64_col"] + bf_float64_col_filter = bf_float64_col.notnull() + bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] + bf_result_col = bf_float64_col_filtered.apply(remote_find_team) + bf_result = ( + bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_float64_col = scalars_pandas_df["float64_col"] - pd_float64_col_filter = pd_float64_col.notnull() - pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] - pd_result_col = pd_float64_col_filtered.apply(find_team) - # TODO(shobs): Figure if the dtype mismatch is by design: - # bf_result.dtype: string[pyarrow] - # pd_result.dtype: dtype('O'). - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.StringDtype(storage="pyarrow")) - pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) + pd_float64_col = scalars_pandas_df["float64_col"] + pd_float64_col_filter = pd_float64_col.notnull() + pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] + pd_result_col = pd_float64_col_filtered.apply(find_team) + # TODO(shobs): Figure if the dtype mismatch is by design: + # bf_result.dtype: string[pyarrow] + # pd_result.dtype: dtype('O'). + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.StringDtype(storage="pyarrow")) + pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_find_team, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -598,259 +529,230 @@ def test_remote_function_restore_with_bigframes_series( dataset_id, bq_cf_connection, ): - try: + def add_one(x): + return x + 1 - def add_one(x): - return x + 1 + # Make a unique udf + add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) - # Make a unique udf - add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) + # Expected cloud function name for the unique udf + package_requirements = bff_utils._get_updated_package_requirements() + add_one_uniq_hash = bff_utils._get_hash(add_one_uniq, package_requirements) + add_one_uniq_cf_name = bff_utils.get_cloud_function_name( + add_one_uniq_hash, session.session_id + ) - # Expected cloud function name for the unique udf - package_requirements = bff_utils._get_updated_package_requirements() - add_one_uniq_hash = bff_utils._get_hash(add_one_uniq, package_requirements) - add_one_uniq_cf_name = bff_utils.get_cloud_function_name( - add_one_uniq_hash, session.session_id + # There should be no cloud function yet for the unique udf + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) + ) + assert len(cloud_functions) == 0 - # There should be no cloud function yet for the unique udf - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) + # The first time both the cloud function and the bq remote function don't + # exist and would be created + remote_add_one = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=True, + )(add_one_uniq) + + # There should have been excactly one cloud function created at this point + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) - assert len(cloud_functions) == 0 + ) + assert len(cloud_functions) == 1 - # The first time both the cloud function and the bq remote function don't - # exist and would be created - remote_add_one = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=True, - )(add_one_uniq) - - # There should have been excactly one cloud function created at this point - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) - ) - assert len(cloud_functions) == 1 + # We will test this twice + def inner_test(): + scalars_df, scalars_pandas_df = scalars_dfs - # We will test this twice - def inner_test(): - scalars_df, scalars_pandas_df = scalars_dfs + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(remote_add_one) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(remote_add_one) - bf_result = ( - bf_int64_col_filtered.to_frame() - .assign(result=bf_result_col) - .to_pandas() - ) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(add_one_uniq) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(add_one_uniq) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + assert_pandas_df_equal(bf_result, pd_result) - assert_pandas_df_equal(bf_result, pd_result) + # Test that the remote function works as expected + inner_test() - # Test that the remote function works as expected - inner_test() + # Let's delete the cloud function while not touching the bq remote function + delete_operation = delete_cloud_function( + session.cloudfunctionsclient, cloud_functions[0].name + ) + delete_operation.result() + assert delete_operation.done() - # Let's delete the cloud function while not touching the bq remote function - delete_operation = delete_cloud_function( - session.cloudfunctionsclient, cloud_functions[0].name - ) - delete_operation.result() - assert delete_operation.done() - - # There should be no cloud functions at this point for the uniq udf - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) + # There should be no cloud functions at this point for the uniq udf + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) - assert len(cloud_functions) == 0 + ) + assert len(cloud_functions) == 0 - # The second time bigframes detects that the required cloud function doesn't - # exist even though the remote function exists, and goes ahead and recreates - # the cloud function - remote_add_one = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=True, - )(add_one_uniq) - - # There should be excactly one cloud function again - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) + # The second time bigframes detects that the required cloud function doesn't + # exist even though the remote function exists, and goes ahead and recreates + # the cloud function + remote_add_one = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=True, + )(add_one_uniq) + + # There should be excactly one cloud function again + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) - assert len(cloud_functions) == 1 + ) + assert len(cloud_functions) == 1 - # Test again after the cloud function is restored that the remote function - # works as expected - inner_test() + # Test again after the cloud function is restored that the remote function + # works as expected + inner_test() - # clean up the temp code - shutil.rmtree(add_one_uniq_dir) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_add_one, session.bqclient, session.cloudfunctionsclient - ) + # clean up the temp code + shutil.rmtree(add_one_uniq_dir) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_default_value( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def is_odd(num): - flag = False - try: - flag = num % 2 == 1 - except TypeError: - pass - return flag + def is_odd(num): + flag = False + try: + flag = num % 2 == 1 + except TypeError: + pass + return flag - is_odd_remote = session.remote_function( - [int], - bool, - dataset_id, - bq_cf_connection, - reuse=False, - )(is_odd) + is_odd_remote = session.remote_function( + [int], + bool, + dataset_id, + bq_cf_connection, + reuse=False, + )(is_odd) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_result_col = bf_int64_col.mask(is_odd_remote) - bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + bf_int64_col = scalars_df["int64_col"] + bf_result_col = bf_int64_col.mask(is_odd_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() - pd_int64_col = scalars_pandas_df["int64_col"] - pd_result_col = pd_int64_col.mask(is_odd) - pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_result_col = pd_int64_col.mask(is_odd) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - is_odd_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_custom_value( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def is_odd(num): - flag = False - try: - flag = num % 2 == 1 - except TypeError: - pass - return flag + def is_odd(num): + flag = False + try: + flag = num % 2 == 1 + except TypeError: + pass + return flag - is_odd_remote = session.remote_function( - [int], - bool, - dataset_id, - bq_cf_connection, - reuse=False, - )(is_odd) + is_odd_remote = session.remote_function( + [int], + bool, + dataset_id, + bq_cf_connection, + reuse=False, + )(is_odd) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - # TODO(shobs): Revisit this test when NA handling of pandas' Series.mask is - # fixed https://github.com/pandas-dev/pandas/issues/52955, - # for now filter out the nulls and test the rest - bf_int64_col = scalars_df["int64_col"] - bf_result_col = bf_int64_col[bf_int64_col.notnull()].mask(is_odd_remote, -1) - bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + # TODO(shobs): Revisit this test when NA handling of pandas' Series.mask is + # fixed https://github.com/pandas-dev/pandas/issues/52955, + # for now filter out the nulls and test the rest + bf_int64_col = scalars_df["int64_col"] + bf_result_col = bf_int64_col[bf_int64_col.notnull()].mask(is_odd_remote, -1) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() - pd_int64_col = scalars_pandas_df["int64_col"] - pd_result_col = pd_int64_col[pd_int64_col.notnull()].mask(is_odd, -1) - pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_result_col = pd_int64_col[pd_int64_col.notnull()].mask(is_odd, -1) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - is_odd_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_lambda(session, scalars_dfs, dataset_id, bq_cf_connection): - try: - add_one_lambda = lambda x: x + 1 # noqa: E731 + add_one_lambda = lambda x: x + 1 # noqa: E731 - add_one_lambda_remote = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - )(add_one_lambda) + add_one_lambda_remote = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + )(add_one_lambda) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(add_one_lambda_remote) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(add_one_lambda_remote) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(add_one_lambda) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(add_one_lambda) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - add_one_lambda_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -914,45 +816,38 @@ def square(x): def test_remote_function_with_external_package_dependencies( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: + def pd_np_foo(x): + import numpy as mynp + import pandas as mypd - def pd_np_foo(x): - import numpy as mynp - import pandas as mypd + return mypd.Series([x, mynp.sqrt(mynp.abs(x))]).sum() - return mypd.Series([x, mynp.sqrt(mynp.abs(x))]).sum() - - # Create the remote function with the name provided explicitly - pd_np_foo_remote = session.remote_function( - [int], - float, - dataset_id, - bq_cf_connection, - reuse=False, - packages=["numpy", "pandas >= 2.0.0"], - )(pd_np_foo) + # Create the remote function with the name provided explicitly + pd_np_foo_remote = session.remote_function( + [int], + float, + dataset_id, + bq_cf_connection, + reuse=False, + packages=["numpy", "pandas >= 2.0.0"], + )(pd_np_foo) - # The behavior of the created remote function should be as expected - scalars_df, scalars_pandas_df = scalars_dfs + # The behavior of the created remote function should be as expected + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_too"] - bf_result_col = bf_int64_col.apply(pd_np_foo_remote) - bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result_col = bf_int64_col.apply(pd_np_foo_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result_col = pd_int64_col.apply(pd_np_foo) - pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result_col = pd_int64_col.apply(pd_np_foo) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) - # pandas result is non-nullable type float64, make it Float64 before - # comparing for the purpose of this test - pd_result.result = pd_result.result.astype(pandas.Float64Dtype()) + # pandas result is non-nullable type float64, make it Float64 before + # comparing for the purpose of this test + pd_result.result = pd_result.result.astype(pandas.Float64Dtype()) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - pd_np_foo_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -1126,143 +1021,125 @@ def test_remote_function_via_session_context_connection_setter( context.bq_connection = bq_cf_connection session = bigframes.connect(context) - try: - # Without an explicit bigquery connection, the one present in Session, - # set via context setter would be used. Without an explicit `reuse` the - # default behavior of reuse=True will take effect. Please note that the - # udf is same as the one used in other tests in this file so the underlying - # cloud function would be common with reuse=True. Since we are using a - # unique dataset_id, even though the cloud function would be reused, the bq - # remote function would still be created, making use of the bq connection - # set in the BigQueryOptions above. - @session.remote_function([int], int, dataset=dataset_id, reuse=False) - def square(x): - return x * x + # Without an explicit bigquery connection, the one present in Session, + # set via context setter would be used. Without an explicit `reuse` the + # default behavior of reuse=True will take effect. Please note that the + # udf is same as the one used in other tests in this file so the underlying + # cloud function would be common with reuse=True. Since we are using a + # unique dataset_id, even though the cloud function would be reused, the bq + # remote function would still be created, making use of the bq connection + # set in the BigQueryOptions above. + @session.remote_function([int], int, dataset=dataset_id, reuse=False) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_default_connection(session, scalars_dfs, dataset_id): - try: - - @session.remote_function([int], int, dataset=dataset_id, reuse=False) - def square(x): - return x * x + @session.remote_function([int], int, dataset=dataset_id, reuse=False) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_runtime_error(session, scalars_dfs, dataset_id): - try: - - @session.remote_function([int], int, dataset=dataset_id, reuse=False) - def square(x): - return x * x + @session.remote_function([int], int, dataset=dataset_id, reuse=False) + def square(x): + return x * x - scalars_df, _ = scalars_dfs + scalars_df, _ = scalars_dfs - with pytest.raises( - google.api_core.exceptions.BadRequest, - match="400.*errorMessage.*unsupported operand type", - ): - # int64_col has nulls which should cause error in square - scalars_df["int64_col"].apply(square).to_pandas() - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + with pytest.raises( + google.api_core.exceptions.BadRequest, + match="400.*errorMessage.*unsupported operand type", + ): + # int64_col has nulls which should cause error in square + scalars_df["int64_col"].apply(square).to_pandas() @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_anonymous_dataset(session, scalars_dfs): - try: - # This usage of remote_function is expected to create the remote - # function in the bigframes session's anonymous dataset. Use reuse=False - # param to make sure parallel instances of the test don't step over each - # other due to the common anonymous dataset. - @session.remote_function([int], int, reuse=False) - def square(x): - return x * x - - assert ( - bigquery.Routine(square.bigframes_remote_function).dataset_id - == session._anonymous_dataset.dataset_id - ) + # This usage of remote_function is expected to create the remote + # function in the bigframes session's anonymous dataset. Use reuse=False + # param to make sure parallel instances of the test don't step over each + # other due to the common anonymous dataset. + @session.remote_function([int], int, reuse=False) + def square(x): + return x * x + + assert ( + bigquery.Routine(square.bigframes_remote_function).dataset_id + == session._anonymous_dataset.dataset_id + ) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -1282,9 +1159,10 @@ def test_remote_function_via_session_custom_sa(scalars_dfs): "bigframes-dev-perf-1@bigframes-dev-perf.iam.gserviceaccount.com" ) - rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - - try: + # Use context manager so that temp artifacts are automatically cleaned up + with bigframes.Session( + context=bigframes.BigQueryOptions(project=project) + ) as rf_session: @rf_session.remote_function( [int], int, reuse=False, cloud_function_service_account=gcf_service_account @@ -1311,11 +1189,6 @@ def square_num(x): name=square_num.bigframes_cloud_function ) assert gcf.service_config.service_account_email == gcf_service_account - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_num, rf_session.bqclient, rf_session.cloudfunctionsclient - ) @pytest.mark.parametrize( @@ -1370,8 +1243,9 @@ def test_remote_function_with_gcf_cmek(): "projects/bigframes-dev-perf/locations/us-central1/repositories/rf-artifacts" ) - session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - try: + with bigframes.Session( + context=bigframes.BigQueryOptions(project=project) + ) as session: @session.remote_function( [int], @@ -1410,12 +1284,6 @@ def square_num(x): blob = bucket.get_blob(gcf.build_config.source.storage_source.object_) assert blob.kms_key_name.startswith(cmek) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_num, session.bqclient, session.cloudfunctionsclient - ) - @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_via_session_vpc(scalars_dfs): @@ -1439,9 +1307,9 @@ def test_remote_function_via_session_vpc(scalars_dfs): project = "bigframes-dev-perf" gcf_vpc_connector = "bigframes-vpc" - rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - - try: + with bigframes.Session( + context=bigframes.BigQueryOptions(project=project) + ) as rf_session: def square_num(x): if x is None: @@ -1469,11 +1337,6 @@ def square_num(x): name=square_num_remote.bigframes_cloud_function ) assert gcf.service_config.vpc_connector == gcf_vpc_connector - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_num_remote, rf_session.bqclient, rf_session.cloudfunctionsclient - ) @pytest.mark.parametrize( @@ -1485,31 +1348,22 @@ def square_num(x): ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_max_batching_rows(session, scalars_dfs, max_batching_rows): - try: - - def square(x): - return x * x + def square(x): + return x * x - square_remote = session.remote_function( - [int], int, reuse=False, max_batching_rows=max_batching_rows - )(square) + square_remote = session.remote_function( + [int], int, reuse=False, max_batching_rows=max_batching_rows + )(square) - bq_routine = session.bqclient.get_routine( - square_remote.bigframes_remote_function - ) - assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows + bq_routine = session.bqclient.get_routine(square_remote.bigframes_remote_function) + assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.parametrize( @@ -1524,32 +1378,25 @@ def square(x): def test_remote_function_gcf_timeout( session, scalars_dfs, timeout_args, effective_gcf_timeout ): - try: + def square(x): + return x * x - def square(x): - return x * x - - square_remote = session.remote_function( - [int], int, reuse=False, **timeout_args - )(square) + square_remote = session.remote_function([int], int, reuse=False, **timeout_args)( + square + ) - # Assert that the GCF is created with the intended maximum timeout - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.timeout_seconds == effective_gcf_timeout + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.timeout_seconds == effective_gcf_timeout - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1573,84 +1420,71 @@ def square(x): def test_remote_function_max_instances( session, scalars_dfs, max_instances_args, expected_max_instances ): - try: - - def square(x): - return x * x - - square_remote = session.remote_function( - [int], int, reuse=False, **max_instances_args - )(square) + def square(x): + return x * x - # Assert that the GCF is created with the intended max instance count - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.max_instance_count == expected_max_instances - - scalars_df, scalars_pandas_df = scalars_dfs + square_remote = session.remote_function( + [int], int, reuse=False, **max_instances_args + )(square) - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + # Assert that the GCF is created with the intended max instance count + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.max_instance_count == expected_max_instances - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1(session, scalars_dfs): columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] scalars_df, scalars_pandas_df = scalars_dfs - try: - def serialize_row(row): - custom = { - "name": row.name, - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], - } + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } - return str( - { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "table": row.to_json(orient="table"), - "custom": custom, - } - ) + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) - serialize_row_remote = session.remote_function( - bigframes.series.Series, str, reuse=False - )(serialize_row) + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) - assert getattr(serialize_row_remote, "is_row_processor") + assert getattr(serialize_row_remote, "is_row_processor") - bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() - pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - serialize_row_reuse = session.read_gbq_function( - serialize_row_remote.bigframes_remote_function, is_row_processor=True - ) - bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - serialize_row_remote, session.bqclient, session.cloudfunctionsclient - ) + # Let's make sure the read_gbq_function path works for this function + serialize_row_reuse = session.read_gbq_function( + serialize_row_remote.bigframes_remote_function, is_row_processor=True + ) + bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1658,40 +1492,31 @@ def test_df_apply_axis_1_aggregates(session, scalars_dfs): columns = ["int64_col", "int64_too", "float64_col"] scalars_df, scalars_pandas_df = scalars_dfs - try: - - def analyze(row): - return str( - { - "dtype": row.dtype, - "count": row.count(), - "min": row.max(), - "max": row.max(), - "mean": row.mean(), - "std": row.std(), - "var": row.var(), - } - ) + def analyze(row): + return str( + { + "dtype": row.dtype, + "count": row.count(), + "min": row.max(), + "max": row.max(), + "mean": row.mean(), + "std": row.std(), + "var": row.var(), + } + ) - analyze_remote = session.remote_function( - bigframes.series.Series, str, reuse=False - )(analyze) + analyze_remote = session.remote_function(bigframes.series.Series, str, reuse=False)( + analyze + ) - assert getattr(analyze_remote, "is_row_processor") + assert getattr(analyze_remote, "is_row_processor") - bf_result = ( - scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() - ) - pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) + bf_result = scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - analyze_remote, session.bqclient, session.cloudfunctionsclient - ) + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.parametrize( @@ -1774,44 +1599,37 @@ def analyze(row): def test_df_apply_axis_1_complex(session, pd_df): bf_df = session.read_pandas(pd_df) - try: - - def serialize_row(row): - custom = { - "name": row.name, - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "custom": custom, } - return str( - { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "custom": custom, - } - ) + ) - serialize_row_remote = session.remote_function( - bigframes.series.Series, str, reuse=False - )(serialize_row) + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) - assert getattr(serialize_row_remote, "is_row_processor") + assert getattr(serialize_row_remote, "is_row_processor") - bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() - pd_result = pd_df.apply(serialize_row, axis=1) + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) - # ignore known dtype difference between pandas and bigframes - pandas.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - serialize_row_remote, session.bqclient, session.cloudfunctionsclient - ) + # ignore known dtype difference between pandas and bigframes + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) @pytest.mark.flaky(retries=2, delay=120) @@ -1839,42 +1657,35 @@ def test_df_apply_axis_1_na_nan_inf(session): pd_df = bf_df.to_pandas() - try: - - def float_parser(row): - import numpy as mynp - import pandas as mypd + def float_parser(row): + import numpy as mynp + import pandas as mypd - if row["text"] == "pandas na": - return mypd.NA - if row["text"] == "numpy nan": - return mynp.nan - return float(row["text"]) + if row["text"] == "pandas na": + return mypd.NA + if row["text"] == "numpy nan": + return mynp.nan + return float(row["text"]) - float_parser_remote = session.remote_function( - bigframes.series.Series, float, reuse=False - )(float_parser) + float_parser_remote = session.remote_function( + bigframes.series.Series, float, reuse=False + )(float_parser) - assert getattr(float_parser_remote, "is_row_processor") + assert getattr(float_parser_remote, "is_row_processor") - pd_result = pd_df.apply(float_parser, axis=1) - bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() + pd_result = pd_df.apply(float_parser, axis=1) + bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() - # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's also assert that the data is consistent in this round trip - # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their - # expected values in BQ - bq_result = bf_df["num"].to_pandas() - bq_result.name = None - pandas.testing.assert_series_equal(bq_result, bf_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - float_parser_remote, session.bqclient, session.cloudfunctionsclient - ) + # Let's also assert that the data is consistent in this round trip + # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their + # expected values in BQ + bq_result = bf_df["num"].to_pandas() + bq_result.name = None + pandas.testing.assert_series_equal(bq_result, bf_result) @pytest.mark.parametrize( @@ -1892,30 +1703,23 @@ def float_parser(row): def test_remote_function_gcf_memory( session, scalars_dfs, memory_mib_args, expected_memory ): - try: + def square(x: int) -> int: + return x * x - def square(x: int) -> int: - return x * x + square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) - square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) - - # Assert that the GCF is created with the intended memory - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.available_memory == expected_memory + # Assert that the GCF is created with the intended memory + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.available_memory == expected_memory - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.parametrize( @@ -2092,61 +1896,56 @@ def test_df_apply_axis_1_multiple_params(session): # Assert the dataframe dtypes assert tuple(bf_df.dtypes) == expected_dtypes - try: + @session.remote_function([int, float, str], str, reuse=False) + def foo(x, y, z): + return f"I got {x}, {y} and {z}" - @session.remote_function([int, float, str], str, reuse=False) - def foo(x, y, z): - return f"I got {x}, {y} and {z}" - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - - # Fails to apply on dataframe with incompatible number of columns - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns - # and their datatypes - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result - expected_result = pandas.Series( - [ - "I got 1, 22.5 and alpha", - "I got 2, 23 and beta", - "I got 3, 23.5 and gamma", - ] - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) - # Let's make sure the read_gbq_function path works for this function - foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) - bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1, 22.5 and alpha", + "I got 2, 23 and beta", + "I got 3, 23.5 and gamma", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + + # Let's make sure the read_gbq_function path works for this function + foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) def test_df_apply_axis_1_multiple_params_array_output(session): @@ -2167,72 +1966,67 @@ def test_df_apply_axis_1_multiple_params_array_output(session): # Assert the dataframe dtypes assert tuple(bf_df.dtypes) == expected_dtypes - try: - - @session.remote_function([int, float, str], list[str], reuse=False) - def foo(x, y, z): - return [str(x), str(y), z] + @session.remote_function([int, float, str], list[str], reuse=False) + def foo(x, y, z): + return [str(x), str(y), z] - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - assert getattr(foo, "output_dtype") == pandas.ArrowDtype( - pyarrow.list_( - bigframes.dtypes.bigframes_dtype_to_arrow_dtype( - bigframes.dtypes.STRING_DTYPE - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + assert getattr(foo, "output_dtype") == pandas.ArrowDtype( + pyarrow.list_( + bigframes.dtypes.bigframes_dtype_to_arrow_dtype( + bigframes.dtypes.STRING_DTYPE ) ) - assert ( - getattr(foo, "bigframes_bigquery_function_output_dtype") - == bigframes.dtypes.STRING_DTYPE - ) + ) + assert ( + getattr(foo, "bigframes_bigquery_function_output_dtype") + == bigframes.dtypes.STRING_DTYPE + ) - # Fails to apply on dataframe with incompatible number of columns - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns - # and their datatypes - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result - expected_result = pandas.Series( - [ - ["1", "22.5", "alpha"], - ["2", "23", "beta"], - ["3", "23.5", "gamma"], - ] - ) + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - # Let's make sure the read_gbq_function path works for this function - foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) - bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + ["1", "22.5", "alpha"], + ["2", "23", "beta"], + ["3", "23.5", "gamma"], + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + + # Let's make sure the read_gbq_function path works for this function + foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) def test_df_apply_axis_1_single_param_non_series(session): @@ -2247,94 +2041,83 @@ def test_df_apply_axis_1_single_param_non_series(session): # Assert the dataframe dtypes assert tuple(bf_df.dtypes) == expected_dtypes - try: + @session.remote_function([int], str, reuse=False) + def foo(x): + return f"I got {x}" - @session.remote_function([int], str, reuse=False) - def foo(x): - return f"I got {x}" - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - - # Fails to apply on dataframe with incompatible number of columns - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 0 columns\\.$", - ): - bf_df[[]].apply(foo, axis=1) - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 2 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns - # and their datatypes - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result - expected_result = pandas.Series( - [ - "I got 1", - "I got 2", - "I got 3", - ] - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", + ): + bf_df[[]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1", + "I got 2", + "I got 3", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_array_output(session, scalars_dfs): columns = ["int64_col", "int64_too"] scalars_df, scalars_pandas_df = scalars_dfs - try: - @session.remote_function(reuse=False) - def generate_stats(row: pandas.Series) -> list[int]: - import pandas as pd + @session.remote_function(reuse=False) + def generate_stats(row: pandas.Series) -> list[int]: + import pandas as pd - sum = row["int64_too"] - avg = row["int64_too"] - if pd.notna(row["int64_col"]): - sum += row["int64_col"] - avg = round((avg + row["int64_col"]) / 2) - return [sum, avg] + sum = row["int64_too"] + avg = row["int64_too"] + if pd.notna(row["int64_col"]): + sum += row["int64_col"] + avg = round((avg + row["int64_col"]) / 2) + return [sum, avg] - assert getattr(generate_stats, "is_row_processor") + assert getattr(generate_stats, "is_row_processor") - bf_result = scalars_df[columns].apply(generate_stats, axis=1).to_pandas() - pd_result = scalars_pandas_df[columns].apply(generate_stats, axis=1) + bf_result = scalars_df[columns].apply(generate_stats, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(generate_stats, axis=1) - # bf_result.dtype is 'list[pyarrow]' while pd_result.dtype - # is 'object', ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'list[pyarrow]' while pd_result.dtype + # is 'object', ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - generate_stats_reuse = session.read_gbq_function( - generate_stats.bigframes_remote_function, - is_row_processor=True, - ) - bf_result = scalars_df[columns].apply(generate_stats_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - generate_stats, session.bqclient, session.cloudfunctionsclient - ) + # Let's make sure the read_gbq_function path works for this function + generate_stats_reuse = session.read_gbq_function( + generate_stats.bigframes_remote_function, + is_row_processor=True, + ) + bf_result = scalars_df[columns].apply(generate_stats_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.parametrize( @@ -2376,7 +2159,6 @@ def generate_stats(row: pandas.Series) -> list[int]: ), ], ) -@pytest.mark.flaky(retries=2, delay=120) def test_remote_function_ingress_settings( session, scalars_dfs, @@ -2384,16 +2166,15 @@ def test_remote_function_ingress_settings( effective_ingress_settings, expect_default_ingress_setting_warning, ): - try: - # Verify the function raises the expected security warning message. - with warnings.catch_warnings(record=True) as record: + # Verify the function raises the expected security warning message. + with warnings.catch_warnings(record=True) as record: - def square(x: int) -> int: - return x * x + def square(x: int) -> int: + return x * x - square_remote = session.remote_function( - reuse=False, **ingress_settings_args - )(square) + square_remote = session.remote_function(reuse=False, **ingress_settings_args)( + square + ) default_ingress_setting_warnings = [ warn @@ -2408,23 +2189,18 @@ def square(x: int) -> int: 1 if expect_default_ingress_setting_warning else 0 ) - # Assert that the GCF is created with the intended maximum timeout - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.ingress_settings == effective_ingress_settings + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.ingress_settings == effective_ingress_settings - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -2458,59 +2234,48 @@ def add_one(x: int) -> int: temporary_bigquery_remote_function = None temporary_cloud_run_function = None - try: - with session_creator() as session: - # create a temporary remote function - add_one_remote_temp = session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - )(add_one) + with session_creator() as session: + # create a temporary remote function + add_one_remote_temp = session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + )(add_one) - temporary_bigquery_remote_function = ( - add_one_remote_temp.bigframes_remote_function - ) - assert temporary_bigquery_remote_function is not None - assert ( - session.bqclient.get_routine(temporary_bigquery_remote_function) - is not None - ) + temporary_bigquery_remote_function = ( + add_one_remote_temp.bigframes_remote_function + ) + assert temporary_bigquery_remote_function is not None + assert ( + session.bqclient.get_routine(temporary_bigquery_remote_function) is not None + ) - temporary_cloud_run_function = add_one_remote_temp.bigframes_cloud_function - assert temporary_cloud_run_function is not None - assert ( - session.cloudfunctionsclient.get_function( - name=temporary_cloud_run_function - ) - is not None - ) + temporary_cloud_run_function = add_one_remote_temp.bigframes_cloud_function + assert temporary_cloud_run_function is not None + assert ( + session.cloudfunctionsclient.get_function(name=temporary_cloud_run_function) + is not None + ) - bf_result = scalars_df["int64_too"].apply(add_one_remote_temp).to_pandas() - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + bf_result = scalars_df["int64_too"].apply(add_one_remote_temp).to_pandas() + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - # outside the with statement context manager the temporary BQ remote - # function and the underlying cloud run function should have been - # cleaned up - assert temporary_bigquery_remote_function is not None - with pytest.raises(google.api_core.exceptions.NotFound): - session.bqclient.get_routine(temporary_bigquery_remote_function) - # the deletion of cloud function happens in a non-blocking way, ensure that - # it either exists in a being-deleted state, or is already deleted - assert temporary_cloud_run_function is not None - try: - gcf = session.cloudfunctionsclient.get_function( - name=temporary_cloud_run_function - ) - assert gcf.state is functions_v2.Function.State.DELETING - except google.cloud.exceptions.NotFound: - pass - finally: - # clean up the gcp assets created for the temporary remote function, - # just in case it was not explicitly cleaned up in the try clause due - # to assertion failure or exception earlier than that - cleanup_function_assets( - add_one_remote_temp, session.bqclient, session.cloudfunctionsclient + # outside the with statement context manager the temporary BQ remote + # function and the underlying cloud run function should have been + # cleaned up + assert temporary_bigquery_remote_function is not None + with pytest.raises(google.api_core.exceptions.NotFound): + session.bqclient.get_routine(temporary_bigquery_remote_function) + # the deletion of cloud function happens in a non-blocking way, ensure that + # it either exists in a being-deleted state, or is already deleted + assert temporary_cloud_run_function is not None + try: + gcf = session.cloudfunctionsclient.get_function( + name=temporary_cloud_run_function ) + assert gcf.state is functions_v2.Function.State.DELETING + except google.cloud.exceptions.NotFound: + pass @pytest.mark.parametrize( @@ -2603,110 +2368,87 @@ def add_one(x: int) -> int: def test_remote_function_array_output( session, scalars_dfs, dataset_id, bq_cf_connection, array_dtype ): - try: - - @session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - ) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] + @session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + ) + def featurize(x: int) -> list[array_dtype]: # type: ignore + return [array_dtype(i) for i in [x, x + 1, x + 2]] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # ignore any dtype disparity - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # ignore any dtype disparity + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - featurize_reuse = session.read_gbq_function( - featurize.bigframes_remote_function # type: ignore - ) - bf_result = scalars_df["int64_too"].apply(featurize_reuse).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) + # Let's make sure the read_gbq_function path works for this function + featurize_reuse = session.read_gbq_function( + featurize.bigframes_remote_function # type: ignore + ) + bf_result = scalars_df["int64_too"].apply(featurize_reuse).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_array_output_partial_ordering_mode( unordered_session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - @unordered_session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - ) - def featurize(x: float) -> list[float]: # type: ignore - return [x, x + 1, x + 2] + @unordered_session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + ) + def featurize(x: float) -> list[float]: # type: ignore + return [x, x + 1, x + 2] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["float64_col"].dropna() - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["float64_col"].dropna() + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["float64_col"].dropna() - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["float64_col"].dropna() + pd_result = pd_int64_col.apply(featurize) - # ignore any dtype disparity - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # ignore any dtype disparity + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - featurize_reuse = unordered_session.read_gbq_function( - featurize.bigframes_remote_function # type: ignore - ) - bf_int64_col = scalars_df["float64_col"].dropna() - bf_result = bf_int64_col.apply(featurize_reuse).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - featurize, - unordered_session.bqclient, - unordered_session.cloudfunctionsclient, - ) + # Let's make sure the read_gbq_function path works for this function + featurize_reuse = unordered_session.read_gbq_function( + featurize.bigframes_remote_function # type: ignore + ) + bf_int64_col = scalars_df["float64_col"].dropna() + bf_result = bf_int64_col.apply(featurize_reuse).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_array_output_multiindex( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - @session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - ) - def featurize(x: int) -> list[float]: - return [x, x + 0.5, x + 0.33] + @session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + ) + def featurize(x: int) -> list[float]: + return [x, x + 0.5, x + 0.33] - scalars_df, scalars_pandas_df = scalars_dfs - multiindex_cols = ["rowindex", "string_col"] - scalars_df = scalars_df.reset_index().set_index(multiindex_cols) - scalars_pandas_df = scalars_pandas_df.reset_index().set_index(multiindex_cols) + scalars_df, scalars_pandas_df = scalars_dfs + multiindex_cols = ["rowindex", "string_col"] + scalars_df = scalars_df.reset_index().set_index(multiindex_cols) + scalars_pandas_df = scalars_pandas_df.reset_index().set_index(multiindex_cols) - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # ignore any dtype disparity - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) + # ignore any dtype disparity + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index b0d89b4cd4..44bc78cee6 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -64,9 +64,6 @@ def foo(x): assert hasattr(foo, "bigframes_bigquery_function") assert hasattr(foo, "ibis_node") - assert hasattr(foo, "input_dtypes") - assert hasattr(foo, "output_dtype") - assert hasattr(foo, "bigframes_bigquery_function_output_dtype") scalars_df, scalars_pandas_df = scalars_dfs