From 84e02a730345a35025c06bfc7377422f82403096 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 8 Mar 2025 02:17:58 +0000 Subject: [PATCH 1/5] test: simplify function cleanup during testing --- .../large/functions/test_remote_function.py | 2339 ++++++++--------- 1 file changed, 1041 insertions(+), 1298 deletions(-) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 350eae3783..4de560f872 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -113,49 +113,44 @@ def test_remote_function_multiply_with_ibis( dataset_id, bq_cf_connection, ): - try: - - @session.remote_function( - [int, int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - ) - def multiply(x, y): - return x * y - - _, dataset_name, table_name = scalars_table_id.split(".") - if not ibis_client.dataset: - ibis_client.dataset = dataset_name - - col_name = "int64_col" - table = ibis_client.tables[table_name] - table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) - sql = table.compile() - pandas_df_orig = bigquery_client.query(sql).to_dataframe() - - col = table[col_name] - col_2x = multiply(col, 2).name("int64_col_2x") - col_square = multiply(col, col).name("int64_col_square") - table = table.mutate([col_2x, col_square]) - sql = table.compile() - pandas_df_new = bigquery_client.query(sql).to_dataframe() - - pandas.testing.assert_series_equal( - pandas_df_orig[col_name] * 2, - pandas_df_new["int64_col_2x"], - check_names=False, - ) + @session.remote_function( + [int, int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + ) + def multiply(x, y): + return x * y + + _, dataset_name, table_name = scalars_table_id.split(".") + if not ibis_client.dataset: + ibis_client.dataset = dataset_name + + col_name = "int64_col" + table = ibis_client.tables[table_name] + table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) + sql = table.compile() + pandas_df_orig = bigquery_client.query(sql).to_dataframe() + + col = table[col_name] + col_2x = multiply(col, 2).name("int64_col_2x") + col_square = multiply(col, col).name("int64_col_square") + table = table.mutate([col_2x, col_square]) + sql = table.compile() + pandas_df_new = bigquery_client.query(sql).to_dataframe() + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name] * 2, + pandas_df_new["int64_col_2x"], + check_names=False, + ) - pandas.testing.assert_series_equal( - pandas_df_orig[col_name] * pandas_df_orig[col_name], - pandas_df_new["int64_col_square"], - check_names=False, - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(multiply, bigquery_client, session.cloudfunctionsclient) + pandas.testing.assert_series_equal( + pandas_df_orig[col_name] * pandas_df_orig[col_name], + pandas_df_new["int64_col_square"], + check_names=False, + ) @pytest.mark.flaky(retries=2, delay=120) @@ -167,208 +162,175 @@ def test_remote_function_stringify_with_ibis( dataset_id, bq_cf_connection, ): - try: - - @session.remote_function( - [int], - str, - dataset_id, - bq_cf_connection, - reuse=False, - ) - def stringify(x): - return f"I got {x}" - - # Function should work locally. - assert stringify(42) == "I got 42" - - _, dataset_name, table_name = scalars_table_id.split(".") - if not ibis_client.dataset: - ibis_client.dataset = dataset_name - - col_name = "int64_col" - table = ibis_client.tables[table_name] - table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) - sql = table.compile() - pandas_df_orig = bigquery_client.query(sql).to_dataframe() - - col = table[col_name] - col_2x = stringify.ibis_node(col).name("int64_str_col") - table = table.mutate([col_2x]) - sql = table.compile() - pandas_df_new = bigquery_client.query(sql).to_dataframe() - - pandas.testing.assert_series_equal( - pandas_df_orig[col_name].apply(lambda x: f"I got {x}"), - pandas_df_new["int64_str_col"], - check_names=False, - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - stringify, bigquery_client, session.cloudfunctionsclient - ) + @session.remote_function( + [int], + str, + dataset_id, + bq_cf_connection, + reuse=False, + ) + def stringify(x): + return f"I got {x}" + + # Function should work locally. + assert stringify(42) == "I got 42" + + _, dataset_name, table_name = scalars_table_id.split(".") + if not ibis_client.dataset: + ibis_client.dataset = dataset_name + + col_name = "int64_col" + table = ibis_client.tables[table_name] + table = table.filter(table[col_name].notnull()).order_by("rowindex").head(10) + sql = table.compile() + pandas_df_orig = bigquery_client.query(sql).to_dataframe() + + col = table[col_name] + col_2x = stringify.ibis_node(col).name("int64_str_col") + table = table.mutate([col_2x]) + sql = table.compile() + pandas_df_new = bigquery_client.query(sql).to_dataframe() + + pandas.testing.assert_series_equal( + pandas_df_orig[col_name].apply(lambda x: f"I got {x}"), + pandas_df_new["int64_str_col"], + check_names=False, + ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_binop(session, scalars_dfs, dataset_id, bq_cf_connection): - try: - - def func(x, y): - return x * abs(y % 4) + def func(x, y): + return x * abs(y % 4) - remote_func = session.remote_function( - [str, int], - str, - dataset_id, - bq_cf_connection, - reuse=False, - )(func) + remote_func = session.remote_function( + [str, int], + str, + dataset_id, + bq_cf_connection, + reuse=False, + )(func) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], remote_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_func, session.bqclient, session.cloudfunctionsclient - ) + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], remote_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_binop_array_output( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: + def func(x, y): + return [len(x), abs(y % 4)] - def func(x, y): - return [len(x), abs(y % 4)] + remote_func = session.remote_function( + [str, int], + list[int], + dataset_id, + bq_cf_connection, + reuse=False, + )(func) - remote_func = session.remote_function( - [str, int], - list[int], - dataset_id, - bq_cf_connection, - reuse=False, - )(func) - - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], remote_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_func, session.bqclient, session.cloudfunctionsclient - ) + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], remote_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_decorator_with_bigframes_series( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - @session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - ) - def square(x): - return x * x + @session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + ) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_explicit_with_bigframes_series( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def add_one(x): - return x + 1 + def add_one(x): + return x + 1 - remote_add_one = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - )(add_one) + remote_add_one = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + )(add_one) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(remote_add_one) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(remote_add_one) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(add_one) - # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(add_one) + # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_add_one, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.parametrize( @@ -380,25 +342,18 @@ def add_one(x): ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_input_types(session, scalars_dfs, input_types): - try: + def add_one(x): + return x + 1 - def add_one(x): - return x + 1 + remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) + assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) - remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) - assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df.int64_too.map(remote_add_one).to_pandas() + pd_result = scalars_pandas_df.int64_too.map(add_one) - bf_result = scalars_df.int64_too.map(remote_add_one).to_pandas() - pd_result = scalars_pandas_df.int64_too.map(add_one) - - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_add_one, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -408,192 +363,168 @@ def test_remote_function_explicit_dataset_not_created( dataset_id_not_created, bq_cf_connection, ): - try: + @session.remote_function( + [int], + int, + dataset_id_not_created, + bq_cf_connection, + reuse=False, + ) + def square(x): + return x * x - @session.remote_function( - [int], - int, - dataset_id_not_created, - bq_cf_connection, - reuse=False, - ) - def square(x): - return x * x + scalars_df, scalars_pandas_df = scalars_dfs - scalars_df, scalars_pandas_df = scalars_dfs + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_var( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - POSITIVE_SIGN = 1 - NEGATIVE_SIGN = -1 - NO_SIGN = 0 - - def sign(num): - if num > 0: - return POSITIVE_SIGN - elif num < 0: - return NEGATIVE_SIGN - return NO_SIGN - - remote_sign = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - )(sign) + POSITIVE_SIGN = 1 + NEGATIVE_SIGN = -1 + NO_SIGN = 0 + + def sign(num): + if num > 0: + return POSITIVE_SIGN + elif num < 0: + return NEGATIVE_SIGN + return NO_SIGN + + remote_sign = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + )(sign) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(remote_sign) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(remote_sign) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(sign) - # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(sign) + # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_sign, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_import( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - import math as mymath + import math as mymath - def circumference(radius): - return 2 * mymath.pi * radius + def circumference(radius): + return 2 * mymath.pi * radius - remote_circumference = session.remote_function( - [float], - float, - dataset_id, - bq_cf_connection, - reuse=False, - )(circumference) + remote_circumference = session.remote_function( + [float], + float, + dataset_id, + bq_cf_connection, + reuse=False, + )(circumference) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_float64_col = scalars_df["float64_col"] - bf_float64_col_filter = bf_float64_col.notnull() - bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] - bf_result_col = bf_float64_col_filtered.apply(remote_circumference) - bf_result = ( - bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_float64_col = scalars_df["float64_col"] + bf_float64_col_filter = bf_float64_col.notnull() + bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] + bf_result_col = bf_float64_col_filtered.apply(remote_circumference) + bf_result = ( + bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_float64_col = scalars_pandas_df["float64_col"] - pd_float64_col_filter = pd_float64_col.notnull() - pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] - pd_result_col = pd_float64_col_filtered.apply(circumference) - # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. - # pd_float64_col_filtered.dtype is Float64Dtype() - # pd_float64_col_filtered.apply(lambda x: x).dtype is float64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Float64Dtype()) - pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) + pd_float64_col = scalars_pandas_df["float64_col"] + pd_float64_col_filter = pd_float64_col.notnull() + pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] + pd_result_col = pd_float64_col_filtered.apply(circumference) + # TODO(shobs): Figure why pandas .apply() changes the dtype, e.g. + # pd_float64_col_filtered.dtype is Float64Dtype() + # pd_float64_col_filtered.apply(lambda x: x).dtype is float64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Float64Dtype()) + pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_circumference, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_global_var_and_import( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: + def find_team(num): + boundary = (math.pi + math.e) / 2 + if num >= boundary: + return _team_euler + return _team_pi + + remote_find_team = session.remote_function( + [float], + str, + dataset_id, + bq_cf_connection, + reuse=False, + )(find_team) - def find_team(num): - boundary = (math.pi + math.e) / 2 - if num >= boundary: - return _team_euler - return _team_pi + scalars_df, scalars_pandas_df = scalars_dfs - remote_find_team = session.remote_function( - [float], - str, - dataset_id, - bq_cf_connection, - reuse=False, - )(find_team) + bf_float64_col = scalars_df["float64_col"] + bf_float64_col_filter = bf_float64_col.notnull() + bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] + bf_result_col = bf_float64_col_filtered.apply(remote_find_team) + bf_result = ( + bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - scalars_df, scalars_pandas_df = scalars_dfs + pd_float64_col = scalars_pandas_df["float64_col"] + pd_float64_col_filter = pd_float64_col.notnull() + pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] + pd_result_col = pd_float64_col_filtered.apply(find_team) + # TODO(shobs): Figure if the dtype mismatch is by design: + # bf_result.dtype: string[pyarrow] + # pd_result.dtype: dtype('O'). + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.StringDtype(storage="pyarrow")) + pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) - bf_float64_col = scalars_df["float64_col"] - bf_float64_col_filter = bf_float64_col.notnull() - bf_float64_col_filtered = bf_float64_col[bf_float64_col_filter] - bf_result_col = bf_float64_col_filtered.apply(remote_find_team) - bf_result = ( - bf_float64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) - - pd_float64_col = scalars_pandas_df["float64_col"] - pd_float64_col_filter = pd_float64_col.notnull() - pd_float64_col_filtered = pd_float64_col[pd_float64_col_filter] - pd_result_col = pd_float64_col_filtered.apply(find_team) - # TODO(shobs): Figure if the dtype mismatch is by design: - # bf_result.dtype: string[pyarrow] - # pd_result.dtype: dtype('O'). - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.StringDtype(storage="pyarrow")) - pd_result = pd_float64_col_filtered.to_frame().assign(result=pd_result_col) - - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_find_team, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -603,259 +534,230 @@ def test_remote_function_restore_with_bigframes_series( dataset_id, bq_cf_connection, ): - try: + def add_one(x): + return x + 1 - def add_one(x): - return x + 1 + # Make a unique udf + add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) - # Make a unique udf - add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) + # Expected cloud function name for the unique udf + package_requirements = bff_utils._get_updated_package_requirements() + add_one_uniq_hash = bff_utils._get_hash(add_one_uniq, package_requirements) + add_one_uniq_cf_name = bff_utils.get_cloud_function_name( + add_one_uniq_hash, session.session_id + ) - # Expected cloud function name for the unique udf - package_requirements = bff_utils._get_updated_package_requirements() - add_one_uniq_hash = bff_utils._get_hash(add_one_uniq, package_requirements) - add_one_uniq_cf_name = bff_utils.get_cloud_function_name( - add_one_uniq_hash, session.session_id + # There should be no cloud function yet for the unique udf + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) + ) + assert len(cloud_functions) == 0 - # There should be no cloud function yet for the unique udf - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) + # The first time both the cloud function and the bq remote function don't + # exist and would be created + remote_add_one = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=True, + )(add_one_uniq) + + # There should have been excactly one cloud function created at this point + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) - assert len(cloud_functions) == 0 + ) + assert len(cloud_functions) == 1 - # The first time both the cloud function and the bq remote function don't - # exist and would be created - remote_add_one = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=True, - )(add_one_uniq) - - # There should have been excactly one cloud function created at this point - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) - ) - assert len(cloud_functions) == 1 + # We will test this twice + def inner_test(): + scalars_df, scalars_pandas_df = scalars_dfs - # We will test this twice - def inner_test(): - scalars_df, scalars_pandas_df = scalars_dfs + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(remote_add_one) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(remote_add_one) - bf_result = ( - bf_int64_col_filtered.to_frame() - .assign(result=bf_result_col) - .to_pandas() - ) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(add_one_uniq) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(add_one_uniq) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + assert_pandas_df_equal(bf_result, pd_result) - assert_pandas_df_equal(bf_result, pd_result) + # Test that the remote function works as expected + inner_test() - # Test that the remote function works as expected - inner_test() + # Let's delete the cloud function while not touching the bq remote function + delete_operation = delete_cloud_function( + session.cloudfunctionsclient, cloud_functions[0].name + ) + delete_operation.result() + assert delete_operation.done() - # Let's delete the cloud function while not touching the bq remote function - delete_operation = delete_cloud_function( - session.cloudfunctionsclient, cloud_functions[0].name - ) - delete_operation.result() - assert delete_operation.done() - - # There should be no cloud functions at this point for the uniq udf - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) + # There should be no cloud functions at this point for the uniq udf + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) - assert len(cloud_functions) == 0 + ) + assert len(cloud_functions) == 0 - # The second time bigframes detects that the required cloud function doesn't - # exist even though the remote function exists, and goes ahead and recreates - # the cloud function - remote_add_one = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=True, - )(add_one_uniq) - - # There should be excactly one cloud function again - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) + # The second time bigframes detects that the required cloud function doesn't + # exist even though the remote function exists, and goes ahead and recreates + # the cloud function + remote_add_one = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=True, + )(add_one_uniq) + + # There should be excactly one cloud function again + cloud_functions = list( + get_cloud_functions( + session.cloudfunctionsclient, + session.bqclient.project, + session.bqclient.location, + name=add_one_uniq_cf_name, ) - assert len(cloud_functions) == 1 + ) + assert len(cloud_functions) == 1 - # Test again after the cloud function is restored that the remote function - # works as expected - inner_test() + # Test again after the cloud function is restored that the remote function + # works as expected + inner_test() - # clean up the temp code - shutil.rmtree(add_one_uniq_dir) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - remote_add_one, session.bqclient, session.cloudfunctionsclient - ) + # clean up the temp code + shutil.rmtree(add_one_uniq_dir) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_default_value( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def is_odd(num): - flag = False - try: - flag = num % 2 == 1 - except TypeError: - pass - return flag + def is_odd(num): + flag = False + try: + flag = num % 2 == 1 + except TypeError: + pass + return flag - is_odd_remote = session.remote_function( - [int], - bool, - dataset_id, - bq_cf_connection, - reuse=False, - )(is_odd) + is_odd_remote = session.remote_function( + [int], + bool, + dataset_id, + bq_cf_connection, + reuse=False, + )(is_odd) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_result_col = bf_int64_col.mask(is_odd_remote) - bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + bf_int64_col = scalars_df["int64_col"] + bf_result_col = bf_int64_col.mask(is_odd_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() - pd_int64_col = scalars_pandas_df["int64_col"] - pd_result_col = pd_int64_col.mask(is_odd) - pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_result_col = pd_int64_col.mask(is_odd) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - is_odd_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_custom_value( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def is_odd(num): - flag = False - try: - flag = num % 2 == 1 - except TypeError: - pass - return flag + def is_odd(num): + flag = False + try: + flag = num % 2 == 1 + except TypeError: + pass + return flag - is_odd_remote = session.remote_function( - [int], - bool, - dataset_id, - bq_cf_connection, - reuse=False, - )(is_odd) + is_odd_remote = session.remote_function( + [int], + bool, + dataset_id, + bq_cf_connection, + reuse=False, + )(is_odd) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - # TODO(shobs): Revisit this test when NA handling of pandas' Series.mask is - # fixed https://github.com/pandas-dev/pandas/issues/52955, - # for now filter out the nulls and test the rest - bf_int64_col = scalars_df["int64_col"] - bf_result_col = bf_int64_col[bf_int64_col.notnull()].mask(is_odd_remote, -1) - bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + # TODO(shobs): Revisit this test when NA handling of pandas' Series.mask is + # fixed https://github.com/pandas-dev/pandas/issues/52955, + # for now filter out the nulls and test the rest + bf_int64_col = scalars_df["int64_col"] + bf_result_col = bf_int64_col[bf_int64_col.notnull()].mask(is_odd_remote, -1) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() - pd_int64_col = scalars_pandas_df["int64_col"] - pd_result_col = pd_int64_col[pd_int64_col.notnull()].mask(is_odd, -1) - pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_result_col = pd_int64_col[pd_int64_col.notnull()].mask(is_odd, -1) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - is_odd_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_lambda(session, scalars_dfs, dataset_id, bq_cf_connection): - try: - add_one_lambda = lambda x: x + 1 # noqa: E731 + add_one_lambda = lambda x: x + 1 # noqa: E731 - add_one_lambda_remote = session.remote_function( - [int], - int, - dataset_id, - bq_cf_connection, - reuse=False, - )(add_one_lambda) + add_one_lambda_remote = session.remote_function( + [int], + int, + dataset_id, + bq_cf_connection, + reuse=False, + )(add_one_lambda) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(add_one_lambda_remote) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(add_one_lambda_remote) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(add_one_lambda) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(add_one_lambda) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - add_one_lambda_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -919,45 +821,38 @@ def square(x): def test_remote_function_with_external_package_dependencies( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - def pd_np_foo(x): - import numpy as mynp - import pandas as mypd + def pd_np_foo(x): + import numpy as mynp + import pandas as mypd - return mypd.Series([x, mynp.sqrt(mynp.abs(x))]).sum() + return mypd.Series([x, mynp.sqrt(mynp.abs(x))]).sum() - # Create the remote function with the name provided explicitly - pd_np_foo_remote = session.remote_function( - [int], - float, - dataset_id, - bq_cf_connection, - reuse=False, - packages=["numpy", "pandas >= 2.0.0"], - )(pd_np_foo) + # Create the remote function with the name provided explicitly + pd_np_foo_remote = session.remote_function( + [int], + float, + dataset_id, + bq_cf_connection, + reuse=False, + packages=["numpy", "pandas >= 2.0.0"], + )(pd_np_foo) - # The behavior of the created remote function should be as expected - scalars_df, scalars_pandas_df = scalars_dfs + # The behavior of the created remote function should be as expected + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_too"] - bf_result_col = bf_int64_col.apply(pd_np_foo_remote) - bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result_col = bf_int64_col.apply(pd_np_foo_remote) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result_col = pd_int64_col.apply(pd_np_foo) - pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result_col = pd_int64_col.apply(pd_np_foo) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) - # pandas result is non-nullable type float64, make it Float64 before - # comparing for the purpose of this test - pd_result.result = pd_result.result.astype(pandas.Float64Dtype()) + # pandas result is non-nullable type float64, make it Float64 before + # comparing for the purpose of this test + pd_result.result = pd_result.result.astype(pandas.Float64Dtype()) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - pd_np_foo_remote, session.bqclient, session.cloudfunctionsclient - ) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -1131,143 +1026,125 @@ def test_remote_function_via_session_context_connection_setter( context.bq_connection = bq_cf_connection session = bigframes.connect(context) - try: - # Without an explicit bigquery connection, the one present in Session, - # set via context setter would be used. Without an explicit `reuse` the - # default behavior of reuse=True will take effect. Please note that the - # udf is same as the one used in other tests in this file so the underlying - # cloud function would be common with reuse=True. Since we are using a - # unique dataset_id, even though the cloud function would be reused, the bq - # remote function would still be created, making use of the bq connection - # set in the BigQueryOptions above. - @session.remote_function([int], int, dataset=dataset_id, reuse=False) - def square(x): - return x * x + # Without an explicit bigquery connection, the one present in Session, + # set via context setter would be used. Without an explicit `reuse` the + # default behavior of reuse=True will take effect. Please note that the + # udf is same as the one used in other tests in this file so the underlying + # cloud function would be common with reuse=True. Since we are using a + # unique dataset_id, even though the cloud function would be reused, the bq + # remote function would still be created, making use of the bq connection + # set in the BigQueryOptions above. + @session.remote_function([int], int, dataset=dataset_id, reuse=False) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_default_connection(session, scalars_dfs, dataset_id): - try: - - @session.remote_function([int], int, dataset=dataset_id, reuse=False) - def square(x): - return x * x + @session.remote_function([int], int, dataset=dataset_id, reuse=False) + def square(x): + return x * x - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_runtime_error(session, scalars_dfs, dataset_id): - try: + @session.remote_function([int], int, dataset=dataset_id, reuse=False) + def square(x): + return x * x - @session.remote_function([int], int, dataset=dataset_id, reuse=False) - def square(x): - return x * x + scalars_df, _ = scalars_dfs - scalars_df, _ = scalars_dfs - - with pytest.raises( - google.api_core.exceptions.BadRequest, - match="400.*errorMessage.*unsupported operand type", - ): - # int64_col has nulls which should cause error in square - scalars_df["int64_col"].apply(square).to_pandas() - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + with pytest.raises( + google.api_core.exceptions.BadRequest, + match="400.*errorMessage.*unsupported operand type", + ): + # int64_col has nulls which should cause error in square + scalars_df["int64_col"].apply(square).to_pandas() @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_anonymous_dataset(session, scalars_dfs): - try: - # This usage of remote_function is expected to create the remote - # function in the bigframes session's anonymous dataset. Use reuse=False - # param to make sure parallel instances of the test don't step over each - # other due to the common anonymous dataset. - @session.remote_function([int], int, reuse=False) - def square(x): - return x * x - - assert ( - bigquery.Routine(square.bigframes_remote_function).dataset_id - == session._anonymous_dataset.dataset_id - ) + # This usage of remote_function is expected to create the remote + # function in the bigframes session's anonymous dataset. Use reuse=False + # param to make sure parallel instances of the test don't step over each + # other due to the common anonymous dataset. + @session.remote_function([int], int, reuse=False) + def square(x): + return x * x + + assert ( + bigquery.Routine(square.bigframes_remote_function).dataset_id + == session._anonymous_dataset.dataset_id + ) - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_col"] - bf_int64_col_filter = bf_int64_col.notnull() - bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] - bf_result_col = bf_int64_col_filtered.apply(square) - bf_result = ( - bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() - ) + bf_int64_col = scalars_df["int64_col"] + bf_int64_col_filter = bf_int64_col.notnull() + bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter] + bf_result_col = bf_int64_col_filtered.apply(square) + bf_result = ( + bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas() + ) - pd_int64_col = scalars_pandas_df["int64_col"] - pd_int64_col_filter = pd_int64_col.notnull() - pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] - pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) - # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. - # pd_int64_col_filtered.dtype is Int64Dtype() - # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. - # For this test let's force the pandas dtype to be same as bigframes' dtype. - pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) - pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) + pd_int64_col = scalars_pandas_df["int64_col"] + pd_int64_col_filter = pd_int64_col.notnull() + pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] + pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) + # TODO(shobs): Figure why pandas .apply() changes the dtype, i.e. + # pd_int64_col_filtered.dtype is Int64Dtype() + # pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64. + # For this test let's force the pandas dtype to be same as bigframes' dtype. + pd_result_col = pd_result_col.astype(pandas.Int64Dtype()) + pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col) - assert_pandas_df_equal(bf_result, pd_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(square, session.bqclient, session.cloudfunctionsclient) + assert_pandas_df_equal(bf_result, pd_result) @pytest.mark.flaky(retries=2, delay=120) @@ -1287,9 +1164,10 @@ def test_remote_function_via_session_custom_sa(scalars_dfs): "bigframes-dev-perf-1@bigframes-dev-perf.iam.gserviceaccount.com" ) - rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - - try: + # Use context manager so that temp artifacts are automatically cleaned up + with bigframes.Session( + context=bigframes.BigQueryOptions(project=project) + ) as rf_session: @rf_session.remote_function( [int], int, reuse=False, cloud_function_service_account=gcf_service_account @@ -1316,11 +1194,6 @@ def square_num(x): name=square_num.bigframes_cloud_function ) assert gcf.service_config.service_account_email == gcf_service_account - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_num, rf_session.bqclient, rf_session.cloudfunctionsclient - ) def test_remote_function_warns_default_cloud_function_service_account(): @@ -1351,8 +1224,9 @@ def test_remote_function_with_gcf_cmek(): "projects/bigframes-dev-perf/locations/us-central1/repositories/rf-artifacts" ) - session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - try: + with bigframes.Session( + context=bigframes.BigQueryOptions(project=project) + ) as session: @session.remote_function( [int], @@ -1391,12 +1265,6 @@ def square_num(x): blob = bucket.get_blob(gcf.build_config.source.storage_source.object_) assert blob.kms_key_name.startswith(cmek) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_num, session.bqclient, session.cloudfunctionsclient - ) - @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_via_session_vpc(scalars_dfs): @@ -1420,9 +1288,9 @@ def test_remote_function_via_session_vpc(scalars_dfs): project = "bigframes-dev-perf" gcf_vpc_connector = "bigframes-vpc" - rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - - try: + with bigframes.Session( + context=bigframes.BigQueryOptions(project=project) + ) as rf_session: def square_num(x): if x is None: @@ -1450,11 +1318,6 @@ def square_num(x): name=square_num_remote.bigframes_cloud_function ) assert gcf.service_config.vpc_connector == gcf_vpc_connector - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_num_remote, rf_session.bqclient, rf_session.cloudfunctionsclient - ) @pytest.mark.parametrize( @@ -1466,31 +1329,22 @@ def square_num(x): ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_max_batching_rows(session, scalars_dfs, max_batching_rows): - try: + def square(x): + return x * x - def square(x): - return x * x - - square_remote = session.remote_function( - [int], int, reuse=False, max_batching_rows=max_batching_rows - )(square) + square_remote = session.remote_function( + [int], int, reuse=False, max_batching_rows=max_batching_rows + )(square) - bq_routine = session.bqclient.get_routine( - square_remote.bigframes_remote_function - ) - assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows + bq_routine = session.bqclient.get_routine(square_remote.bigframes_remote_function) + assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.parametrize( @@ -1505,32 +1359,25 @@ def square(x): def test_remote_function_gcf_timeout( session, scalars_dfs, timeout_args, effective_gcf_timeout ): - try: - - def square(x): - return x * x + def square(x): + return x * x - square_remote = session.remote_function( - [int], int, reuse=False, **timeout_args - )(square) + square_remote = session.remote_function([int], int, reuse=False, **timeout_args)( + square + ) - # Assert that the GCF is created with the intended maximum timeout - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.timeout_seconds == effective_gcf_timeout + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.timeout_seconds == effective_gcf_timeout - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1554,84 +1401,71 @@ def square(x): def test_remote_function_max_instances( session, scalars_dfs, max_instances_args, expected_max_instances ): - try: - - def square(x): - return x * x + def square(x): + return x * x - square_remote = session.remote_function( - [int], int, reuse=False, **max_instances_args - )(square) + square_remote = session.remote_function( + [int], int, reuse=False, **max_instances_args + )(square) - # Assert that the GCF is created with the intended max instance count - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.max_instance_count == expected_max_instances + # Assert that the GCF is created with the intended max instance count + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.max_instance_count == expected_max_instances - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1(session, scalars_dfs): columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] scalars_df, scalars_pandas_df = scalars_dfs - try: - def serialize_row(row): - custom = { - "name": row.name, - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], - } + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } - return str( - { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "table": row.to_json(orient="table"), - "custom": custom, - } - ) + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) - serialize_row_remote = session.remote_function( - bigframes.series.Series, str, reuse=False - )(serialize_row) + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) - assert getattr(serialize_row_remote, "is_row_processor") + assert getattr(serialize_row_remote, "is_row_processor") - bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() - pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - serialize_row_reuse = session.read_gbq_function( - serialize_row_remote.bigframes_remote_function, is_row_processor=True - ) - bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - serialize_row_remote, session.bqclient, session.cloudfunctionsclient - ) + # Let's make sure the read_gbq_function path works for this function + serialize_row_reuse = session.read_gbq_function( + serialize_row_remote.bigframes_remote_function, is_row_processor=True + ) + bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1639,40 +1473,31 @@ def test_df_apply_axis_1_aggregates(session, scalars_dfs): columns = ["int64_col", "int64_too", "float64_col"] scalars_df, scalars_pandas_df = scalars_dfs - try: - - def analyze(row): - return str( - { - "dtype": row.dtype, - "count": row.count(), - "min": row.max(), - "max": row.max(), - "mean": row.mean(), - "std": row.std(), - "var": row.var(), - } - ) + def analyze(row): + return str( + { + "dtype": row.dtype, + "count": row.count(), + "min": row.max(), + "max": row.max(), + "mean": row.mean(), + "std": row.std(), + "var": row.var(), + } + ) - analyze_remote = session.remote_function( - bigframes.series.Series, str, reuse=False - )(analyze) + analyze_remote = session.remote_function(bigframes.series.Series, str, reuse=False)( + analyze + ) - assert getattr(analyze_remote, "is_row_processor") + assert getattr(analyze_remote, "is_row_processor") - bf_result = ( - scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() - ) - pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) + bf_result = scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - analyze_remote, session.bqclient, session.cloudfunctionsclient - ) + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.parametrize( @@ -1755,44 +1580,37 @@ def analyze(row): def test_df_apply_axis_1_complex(session, pd_df): bf_df = session.read_pandas(pd_df) - try: - - def serialize_row(row): - custom = { - "name": row.name, - "index": [idx for idx in row.index], - "values": [ - val.item() if hasattr(val, "item") else val for val in row.values - ], + def serialize_row(row): + custom = { + "name": row.name, + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "custom": custom, } - return str( - { - "default": row.to_json(), - "split": row.to_json(orient="split"), - "records": row.to_json(orient="records"), - "index": row.to_json(orient="index"), - "custom": custom, - } - ) + ) - serialize_row_remote = session.remote_function( - bigframes.series.Series, str, reuse=False - )(serialize_row) + serialize_row_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(serialize_row) - assert getattr(serialize_row_remote, "is_row_processor") + assert getattr(serialize_row_remote, "is_row_processor") - bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() - pd_result = pd_df.apply(serialize_row, axis=1) + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) - # ignore known dtype difference between pandas and bigframes - pandas.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - serialize_row_remote, session.bqclient, session.cloudfunctionsclient - ) + # ignore known dtype difference between pandas and bigframes + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) @pytest.mark.flaky(retries=2, delay=120) @@ -1820,42 +1638,35 @@ def test_df_apply_axis_1_na_nan_inf(session): pd_df = bf_df.to_pandas() - try: - - def float_parser(row): - import numpy as mynp - import pandas as mypd + def float_parser(row): + import numpy as mynp + import pandas as mypd - if row["text"] == "pandas na": - return mypd.NA - if row["text"] == "numpy nan": - return mynp.nan - return float(row["text"]) + if row["text"] == "pandas na": + return mypd.NA + if row["text"] == "numpy nan": + return mynp.nan + return float(row["text"]) - float_parser_remote = session.remote_function( - bigframes.series.Series, float, reuse=False - )(float_parser) + float_parser_remote = session.remote_function( + bigframes.series.Series, float, reuse=False + )(float_parser) - assert getattr(float_parser_remote, "is_row_processor") + assert getattr(float_parser_remote, "is_row_processor") - pd_result = pd_df.apply(float_parser, axis=1) - bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() + pd_result = pd_df.apply(float_parser, axis=1) + bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() - # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's also assert that the data is consistent in this round trip - # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their - # expected values in BQ - bq_result = bf_df["num"].to_pandas() - bq_result.name = None - pandas.testing.assert_series_equal(bq_result, bf_result) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - float_parser_remote, session.bqclient, session.cloudfunctionsclient - ) + # Let's also assert that the data is consistent in this round trip + # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their + # expected values in BQ + bq_result = bf_df["num"].to_pandas() + bq_result.name = None + pandas.testing.assert_series_equal(bq_result, bf_result) @pytest.mark.parametrize( @@ -1873,30 +1684,23 @@ def float_parser(row): def test_remote_function_gcf_memory( session, scalars_dfs, memory_mib_args, expected_memory ): - try: - - def square(x: int) -> int: - return x * x + def square(x: int) -> int: + return x * x - square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) + square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) - # Assert that the GCF is created with the intended memory - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.available_memory == expected_memory + # Assert that the GCF is created with the intended memory + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.available_memory == expected_memory - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.parametrize( @@ -2073,61 +1877,56 @@ def test_df_apply_axis_1_multiple_params(session): # Assert the dataframe dtypes assert tuple(bf_df.dtypes) == expected_dtypes - try: + @session.remote_function([int, float, str], str, reuse=False) + def foo(x, y, z): + return f"I got {x}, {y} and {z}" - @session.remote_function([int, float, str], str, reuse=False) - def foo(x, y, z): - return f"I got {x}, {y} and {z}" - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - - # Fails to apply on dataframe with incompatible number of columns - with pytest.raises( - ValueError, - match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - with pytest.raises( - ValueError, - match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes - with pytest.raises( - ValueError, - match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns - # and their datatypes - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result - expected_result = pandas.Series( - [ - "I got 1, 22.5 and alpha", - "I got 2, 23 and beta", - "I got 3, 23.5 and gamma", - ] - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) - # Let's make sure the read_gbq_function path works for this function - foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) - bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1, 22.5 and alpha", + "I got 2, 23 and beta", + "I got 3, 23.5 and gamma", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + + # Let's make sure the read_gbq_function path works for this function + foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) def test_df_apply_axis_1_multiple_params_array_output(session): @@ -2148,72 +1947,67 @@ def test_df_apply_axis_1_multiple_params_array_output(session): # Assert the dataframe dtypes assert tuple(bf_df.dtypes) == expected_dtypes - try: - - @session.remote_function([int, float, str], list[str], reuse=False) - def foo(x, y, z): - return [str(x), str(y), z] + @session.remote_function([int, float, str], list[str], reuse=False) + def foo(x, y, z): + return [str(x), str(y), z] - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - assert getattr(foo, "output_dtype") == pandas.ArrowDtype( - pyarrow.list_( - bigframes.dtypes.bigframes_dtype_to_arrow_dtype( - bigframes.dtypes.STRING_DTYPE - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + assert getattr(foo, "output_dtype") == pandas.ArrowDtype( + pyarrow.list_( + bigframes.dtypes.bigframes_dtype_to_arrow_dtype( + bigframes.dtypes.STRING_DTYPE ) ) - assert ( - getattr(foo, "bigframes_bigquery_function_output_dtype") - == bigframes.dtypes.STRING_DTYPE - ) + ) + assert ( + getattr(foo, "bigframes_bigquery_function_output_dtype") + == bigframes.dtypes.STRING_DTYPE + ) - # Fails to apply on dataframe with incompatible number of columns - with pytest.raises( - ValueError, - match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - with pytest.raises( - ValueError, - match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes - with pytest.raises( - ValueError, - match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns - # and their datatypes - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result - expected_result = pandas.Series( - [ - ["1", "22.5", "alpha"], - ["2", "23", "beta"], - ["3", "23.5", "gamma"], - ] - ) + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + ["1", "22.5", "alpha"], + ["2", "23", "beta"], + ["3", "23.5", "gamma"], + ] + ) - # Let's make sure the read_gbq_function path works for this function - foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) - bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + + # Let's make sure the read_gbq_function path works for this function + foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) def test_df_apply_axis_1_single_param_non_series(session): @@ -2228,94 +2022,83 @@ def test_df_apply_axis_1_single_param_non_series(session): # Assert the dataframe dtypes assert tuple(bf_df.dtypes) == expected_dtypes - try: + @session.remote_function([int], str, reuse=False) + def foo(x): + return f"I got {x}" - @session.remote_function([int], str, reuse=False) - def foo(x): - return f"I got {x}" - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - - # Fails to apply on dataframe with incompatible number of columns - with pytest.raises( - ValueError, - match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", - ): - bf_df[[]].apply(foo, axis=1) - with pytest.raises( - ValueError, - match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes - with pytest.raises( - ValueError, - match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns - # and their datatypes - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result - expected_result = pandas.Series( - [ - "I got 1", - "I got 2", - "I got 3", - ] - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", + ): + bf_df[[]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1", + "I got 2", + "I got 3", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1_array_output(session, scalars_dfs): columns = ["int64_col", "int64_too"] scalars_df, scalars_pandas_df = scalars_dfs - try: - @session.remote_function(reuse=False) - def generate_stats(row: pandas.Series) -> list[int]: - import pandas as pd + @session.remote_function(reuse=False) + def generate_stats(row: pandas.Series) -> list[int]: + import pandas as pd - sum = row["int64_too"] - avg = row["int64_too"] - if pd.notna(row["int64_col"]): - sum += row["int64_col"] - avg = round((avg + row["int64_col"]) / 2) - return [sum, avg] + sum = row["int64_too"] + avg = row["int64_too"] + if pd.notna(row["int64_col"]): + sum += row["int64_col"] + avg = round((avg + row["int64_col"]) / 2) + return [sum, avg] - assert getattr(generate_stats, "is_row_processor") + assert getattr(generate_stats, "is_row_processor") - bf_result = scalars_df[columns].apply(generate_stats, axis=1).to_pandas() - pd_result = scalars_pandas_df[columns].apply(generate_stats, axis=1) + bf_result = scalars_df[columns].apply(generate_stats, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(generate_stats, axis=1) - # bf_result.dtype is 'list[pyarrow]' while pd_result.dtype - # is 'object', ignore this mismatch by using check_dtype=False. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # bf_result.dtype is 'list[pyarrow]' while pd_result.dtype + # is 'object', ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - generate_stats_reuse = session.read_gbq_function( - generate_stats.bigframes_remote_function, - is_row_processor=True, - ) - bf_result = scalars_df[columns].apply(generate_stats_reuse, axis=1).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - generate_stats, session.bqclient, session.cloudfunctionsclient - ) + # Let's make sure the read_gbq_function path works for this function + generate_stats_reuse = session.read_gbq_function( + generate_stats.bigframes_remote_function, + is_row_processor=True, + ) + bf_result = scalars_df[columns].apply(generate_stats_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.parametrize( @@ -2361,40 +2144,34 @@ def test_remote_function_ingress_settings( effective_ingress_settings, expected_warning, ): - try: - # Verify the function raises the expected security warning message. - with warnings.catch_warnings(record=True) as w: + # Verify the function raises the expected security warning message. + with warnings.catch_warnings(record=True) as w: - def square(x: int) -> int: - return x * x + def square(x: int) -> int: + return x * x - square_remote = session.remote_function( - reuse=False, **ingress_settings_args - )(square) + square_remote = session.remote_function(reuse=False, **ingress_settings_args)( + square + ) - if expected_warning is not None: - assert issubclass(w[0].category, FutureWarning) - assert "Consider using 'internal-only' for enhanced security." in str( - w[0].message - ) + if expected_warning is not None: + assert issubclass(w[0].category, FutureWarning) + assert "Consider using 'internal-only' for enhanced security." in str( + w[0].message + ) - # Assert that the GCF is created with the intended maximum timeout - gcf = session.cloudfunctionsclient.get_function( - name=square_remote.bigframes_cloud_function - ) - assert gcf.service_config.ingress_settings == effective_ingress_settings + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.ingress_settings == effective_ingress_settings - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() - pd_result = scalars_pandas_df["int64_too"].apply(square) + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - square_remote, session.bqclient, session.cloudfunctionsclient - ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -2428,59 +2205,48 @@ def add_one(x: int) -> int: temporary_bigquery_remote_function = None temporary_cloud_run_function = None - try: - with session_creator() as session: - # create a temporary remote function - add_one_remote_temp = session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - )(add_one) + with session_creator() as session: + # create a temporary remote function + add_one_remote_temp = session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + )(add_one) - temporary_bigquery_remote_function = ( - add_one_remote_temp.bigframes_remote_function - ) - assert temporary_bigquery_remote_function is not None - assert ( - session.bqclient.get_routine(temporary_bigquery_remote_function) - is not None - ) + temporary_bigquery_remote_function = ( + add_one_remote_temp.bigframes_remote_function + ) + assert temporary_bigquery_remote_function is not None + assert ( + session.bqclient.get_routine(temporary_bigquery_remote_function) is not None + ) - temporary_cloud_run_function = add_one_remote_temp.bigframes_cloud_function - assert temporary_cloud_run_function is not None - assert ( - session.cloudfunctionsclient.get_function( - name=temporary_cloud_run_function - ) - is not None - ) + temporary_cloud_run_function = add_one_remote_temp.bigframes_cloud_function + assert temporary_cloud_run_function is not None + assert ( + session.cloudfunctionsclient.get_function(name=temporary_cloud_run_function) + is not None + ) - bf_result = scalars_df["int64_too"].apply(add_one_remote_temp).to_pandas() - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + bf_result = scalars_df["int64_too"].apply(add_one_remote_temp).to_pandas() + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - # outside the with statement context manager the temporary BQ remote - # function and the underlying cloud run function should have been - # cleaned up - assert temporary_bigquery_remote_function is not None - with pytest.raises(google.api_core.exceptions.NotFound): - session.bqclient.get_routine(temporary_bigquery_remote_function) - # the deletion of cloud function happens in a non-blocking way, ensure that - # it either exists in a being-deleted state, or is already deleted - assert temporary_cloud_run_function is not None - try: - gcf = session.cloudfunctionsclient.get_function( - name=temporary_cloud_run_function - ) - assert gcf.state is functions_v2.Function.State.DELETING - except google.cloud.exceptions.NotFound: - pass - finally: - # clean up the gcp assets created for the temporary remote function, - # just in case it was not explicitly cleaned up in the try clause due - # to assertion failure or exception earlier than that - cleanup_function_assets( - add_one_remote_temp, session.bqclient, session.cloudfunctionsclient + # outside the with statement context manager the temporary BQ remote + # function and the underlying cloud run function should have been + # cleaned up + assert temporary_bigquery_remote_function is not None + with pytest.raises(google.api_core.exceptions.NotFound): + session.bqclient.get_routine(temporary_bigquery_remote_function) + # the deletion of cloud function happens in a non-blocking way, ensure that + # it either exists in a being-deleted state, or is already deleted + assert temporary_cloud_run_function is not None + try: + gcf = session.cloudfunctionsclient.get_function( + name=temporary_cloud_run_function ) + assert gcf.state is functions_v2.Function.State.DELETING + except google.cloud.exceptions.NotFound: + pass @pytest.mark.parametrize( @@ -2573,110 +2339,87 @@ def add_one(x: int) -> int: def test_remote_function_array_output( session, scalars_dfs, dataset_id, bq_cf_connection, array_dtype ): - try: - - @session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - ) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] + @session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + ) + def featurize(x: int) -> list[array_dtype]: # type: ignore + return [array_dtype(i) for i in [x, x + 1, x + 2]] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # ignore any dtype disparity - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # ignore any dtype disparity + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - featurize_reuse = session.read_gbq_function( - featurize.bigframes_remote_function # type: ignore - ) - bf_result = scalars_df["int64_too"].apply(featurize_reuse).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) + # Let's make sure the read_gbq_function path works for this function + featurize_reuse = session.read_gbq_function( + featurize.bigframes_remote_function # type: ignore + ) + bf_result = scalars_df["int64_too"].apply(featurize_reuse).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_array_output_partial_ordering_mode( unordered_session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - @unordered_session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - ) - def featurize(x: float) -> list[float]: # type: ignore - return [x, x + 1, x + 2] + @unordered_session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + ) + def featurize(x: float) -> list[float]: # type: ignore + return [x, x + 1, x + 2] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["float64_col"].dropna() - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["float64_col"].dropna() + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["float64_col"].dropna() - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["float64_col"].dropna() + pd_result = pd_int64_col.apply(featurize) - # ignore any dtype disparity - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + # ignore any dtype disparity + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - # Let's make sure the read_gbq_function path works for this function - featurize_reuse = unordered_session.read_gbq_function( - featurize.bigframes_remote_function # type: ignore - ) - bf_int64_col = scalars_df["float64_col"].dropna() - bf_result = bf_int64_col.apply(featurize_reuse).to_pandas() - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - featurize, - unordered_session.bqclient, - unordered_session.cloudfunctionsclient, - ) + # Let's make sure the read_gbq_function path works for this function + featurize_reuse = unordered_session.read_gbq_function( + featurize.bigframes_remote_function # type: ignore + ) + bf_int64_col = scalars_df["float64_col"].dropna() + bf_result = bf_int64_col.apply(featurize_reuse).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_array_output_multiindex( session, scalars_dfs, dataset_id, bq_cf_connection ): - try: - - @session.remote_function( - dataset=dataset_id, - bigquery_connection=bq_cf_connection, - reuse=False, - ) - def featurize(x: int) -> list[float]: - return [x, x + 0.5, x + 0.33] + @session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + ) + def featurize(x: int) -> list[float]: + return [x, x + 0.5, x + 0.33] - scalars_df, scalars_pandas_df = scalars_dfs - multiindex_cols = ["rowindex", "string_col"] - scalars_df = scalars_df.reset_index().set_index(multiindex_cols) - scalars_pandas_df = scalars_pandas_df.reset_index().set_index(multiindex_cols) + scalars_df, scalars_pandas_df = scalars_dfs + multiindex_cols = ["rowindex", "string_col"] + scalars_df = scalars_df.reset_index().set_index(multiindex_cols) + scalars_pandas_df = scalars_pandas_df.reset_index().set_index(multiindex_cols) - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # ignore any dtype disparity - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - finally: - # clean up the gcp assets created for the remote function - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) + # ignore any dtype disparity + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) From 9ebb3a2bc150834279230a5920cd6acd71e75537 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 10 Mar 2025 23:46:29 +0000 Subject: [PATCH 2/5] adjust the expect warnings in the ingress settings and service account --- bigframes/functions/_function_session.py | 2 +- .../large/functions/test_remote_function.py | 76 ++++++++++++++----- 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 15c8cb979e..0d3b66c503 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -458,7 +458,7 @@ def remote_function( msg = bfe.format_message( "You have not explicitly set a user-managed `cloud_function_service_account`. " "Using the default Compute Engine service account. " - "To use Bigframes 2.0, please explicitly set `cloud_function_service_account` " + "In BigFrames 2.0, you would have to explicitly set `cloud_function_service_account` " 'either to a user-managed service account (preferred) or to `"default"` ' "to use the Compute Engine service account (discouraged). " "See, https://cloud.google.com/functions/docs/securing/function-identity." diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 4de560f872..dfda3eec1b 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -17,6 +17,7 @@ import inspect import math # must keep this at top level to test udf referring global import import os.path +import re import shutil import sys import tempfile @@ -1196,14 +1197,38 @@ def square_num(x): assert gcf.service_config.service_account_email == gcf_service_account -def test_remote_function_warns_default_cloud_function_service_account(): - project = "bigframes-dev-perf" - rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) +@pytest.mark.parametrize( + ("remote_function_args"), + [ + pytest.param( + {}, + id="no-set", + ), + pytest.param( + {"cloud_function_service_account": None}, + id="set-none", + ), + ], +) +def test_remote_function_warns_default_cloud_function_service_account( + session, remote_function_args +): + with pytest.warns(FutureWarning) as record: + session.remote_function(**remote_function_args) - with pytest.warns(FutureWarning, match="You have not explicitly set a"): - rf_session.remote_function( - cloud_function_service_account=None, # Explicitly omit service account. - ) + len( + [ + warn + for warn in record + if re.search( + ( + "You have not explicitly set a user-managed.*Using the default Compute Engine.*service account" + ), + warn.message.args[0], + re.DOTALL, + ) + ] + ) == 1 @pytest.mark.flaky(retries=2, delay=120) @@ -2102,36 +2127,40 @@ def generate_stats(row: pandas.Series) -> list[int]: @pytest.mark.parametrize( - ("ingress_settings_args", "effective_ingress_settings", "expected_warning"), + ( + "ingress_settings_args", + "effective_ingress_settings", + "expect_default_ingress_setting_warning", + ), [ pytest.param( {}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, - FutureWarning, + True, id="no-set", ), pytest.param( {"cloud_function_ingress_settings": None}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, - FutureWarning, + True, id="set-none", ), pytest.param( {"cloud_function_ingress_settings": "all"}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, - None, + False, id="set-all", ), pytest.param( {"cloud_function_ingress_settings": "internal-only"}, functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, - None, + False, id="set-internal-only", ), pytest.param( {"cloud_function_ingress_settings": "internal-and-gclb"}, functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, - None, + False, id="set-internal-and-gclb", ), ], @@ -2142,10 +2171,10 @@ def test_remote_function_ingress_settings( scalars_dfs, ingress_settings_args, effective_ingress_settings, - expected_warning, + expect_default_ingress_setting_warning, ): # Verify the function raises the expected security warning message. - with warnings.catch_warnings(record=True) as w: + with warnings.catch_warnings(record=True) as record: def square(x: int) -> int: return x * x @@ -2154,11 +2183,18 @@ def square(x: int) -> int: square ) - if expected_warning is not None: - assert issubclass(w[0].category, FutureWarning) - assert "Consider using 'internal-only' for enhanced security." in str( - w[0].message - ) + default_ingress_setting_warnings = [ + warn + for warn in record + if isinstance(warn.message, FutureWarning) + and "`cloud_function_ingress_settings` are set to 'all' by default" + in warn.message.args[0] + and "will change to 'internal-only' for enhanced security in future" + in warn.message.args[0] + ] + assert len(default_ingress_setting_warnings) == ( + 1 if expect_default_ingress_setting_warning else 0 + ) # Assert that the GCF is created with the intended maximum timeout gcf = session.cloudfunctionsclient.get_function( From 31a70c506486d286d1f1c32a079eecac5e2a3ee1 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 11 Mar 2025 00:26:05 +0000 Subject: [PATCH 3/5] Revert "Merge remote-tracking branch 'refs/remotes/github/main' into simplify-rf-cleanup-code" This reverts commit 4c271ac3cb74ff1bcaca10785db3798fec307ff7, reversing changes made to 9ebb3a2bc150834279230a5920cd6acd71e75537. --- .kokoro/build.sh | 3 - bigframes/dataframe.py | 6 +- bigframes/functions/_function_session.py | 5 - bigframes/operations/remote_function_ops.py | 21 +-- noxfile.py | 3 +- .../large/functions/test_managed_function.py | 160 ------------------ .../large/functions/test_remote_function.py | 18 +- .../small/functions/test_managed_function.py | 155 ----------------- 8 files changed, 21 insertions(+), 350 deletions(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 6cc03455da..58eaa7fedf 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -50,6 +50,3 @@ if [[ -n "${NOX_SESSION:-}" ]]; then else python3 -m nox --stop-on-first-error fi - -# Prevent kokoro from trying to collect many mb of artifacts, wasting several minutes -sudo rm -rf "${KOKORO_ARTIFACTS_DIR?}"/* diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2349e469ab..151da51792 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4199,13 +4199,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): udf_input_dtypes = getattr(func, "input_dtypes") if len(udf_input_dtypes) != len(self.columns): raise ValueError( - f"BigFrames BigQuery function takes {len(udf_input_dtypes)}" - f" arguments but DataFrame has {len(self.columns)} columns." + f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." ) if udf_input_dtypes != tuple(self.dtypes.to_list()): raise ValueError( - f"BigFrames BigQuery function takes arguments of types " - f"{udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." + f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." ) series_list = [self[col] for col in self.columns] diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index ac170448c1..0d3b66c503 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -892,7 +892,6 @@ def wrapper(func): func = cloudpickle.loads(cloudpickle.dumps(func)) self._try_delattr(func, "bigframes_bigquery_function") - self._try_delattr(func, "bigframes_bigquery_function_output_dtype") self._try_delattr(func, "input_dtypes") self._try_delattr(func, "output_dtype") self._try_delattr(func, "is_row_processor") @@ -952,10 +951,6 @@ def wrapper(func): ibis_signature.output_type ) ) - # Managed function directly supports certain output types which are - # not supported in remote function (e.g. list output). Thus no more - # processing for 'bigframes_bigquery_function_output_dtype'. - func.bigframes_bigquery_function_output_dtype = func.output_dtype func.is_row_processor = is_row_processor func.ibis_node = node diff --git a/bigframes/operations/remote_function_ops.py b/bigframes/operations/remote_function_ops.py index 51cfccbc41..8505fd1607 100644 --- a/bigframes/operations/remote_function_ops.py +++ b/bigframes/operations/remote_function_ops.py @@ -29,12 +29,11 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # The output dtype should be set to a valid Dtype by @udf decorator, - # @remote_function decorator, or read_gbq_function method. + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + else: + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") @dataclasses.dataclass(frozen=True) @@ -47,12 +46,11 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # The output dtype should be set to a valid Dtype by @udf decorator, - # @remote_function decorator, or read_gbq_function method. + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + else: + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") @dataclasses.dataclass(frozen=True) @@ -65,9 +63,8 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - # The output dtype should be set to a valid Dtype by @udf decorator, - # @remote_function decorator, or read_gbq_function method. + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "bigframes_bigquery_function_output_dtype"): return self.func.bigframes_bigquery_function_output_dtype - - raise AttributeError("bigframes_bigquery_function_output_dtype not defined") + else: + raise AttributeError("bigframes_bigquery_function_output_dtype not defined") diff --git a/noxfile.py b/noxfile.py index b95e58f4ef..ca147e171d 100644 --- a/noxfile.py +++ b/noxfile.py @@ -256,8 +256,7 @@ def mypy(session): set( [ "mypy", - # TODO: update to latest pandas-stubs once we resolve bigframes issues. - "pandas-stubs<=2.2.3.241126", + "pandas-stubs", "types-protobuf", "types-python-dateutil", "types-requests", diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 503720edcc..4db7a1c47c 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -13,10 +13,8 @@ # limitations under the License. import pandas -import pyarrow import pytest -import bigframes from bigframes.functions import _function_session as bff_session from bigframes.functions._utils import get_python_version import bigframes.pandas as bpd @@ -166,161 +164,3 @@ def func(x, y): cleanup_function_assets( session.bqclient, session.cloudfunctionsclient, managed_func ) - - -@pytest.mark.parametrize( - "array_dtype", - [ - bool, - int, - float, - str, - ], -) -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype): - try: - - @session.udf(dataset=dataset_id) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] - - scalars_df, scalars_pandas_df = scalars_dfs - - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() - - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) - - # Ignore any dtype disparity. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) - - -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -def test_managed_function_binop_array_output(session, scalars_dfs, dataset_id): - try: - - def func(x, y): - return [len(x), abs(y % 4)] - - managed_func = session.udf( - input_types=[str, int], - output_type=list[int], - dataset=dataset_id, - )(func) - - scalars_df, scalars_pandas_df = scalars_dfs - - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], managed_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - managed_func, session.bqclient, session.cloudfunctionsclient - ) - - -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -def test_manage_function_df_apply_axis_1_array_output(session): - bf_df = bigframes.dataframe.DataFrame( - { - "Id": [1, 2, 3], - "Age": [22.5, 23, 23.5], - "Name": ["alpha", "beta", "gamma"], - } - ) - - expected_dtypes = ( - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.STRING_DTYPE, - ) - - # Assert the dataframe dtypes. - assert tuple(bf_df.dtypes) == expected_dtypes - - try: - - @session.udf(input_types=[int, float, str], output_type=list[str]) - def foo(x, y, z): - return [str(x), str(y), z] - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - assert getattr(foo, "output_dtype") == pandas.ArrowDtype( - pyarrow.list_( - bigframes.dtypes.bigframes_dtype_to_arrow_dtype( - bigframes.dtypes.STRING_DTYPE - ) - ) - ) - assert getattr(foo, "output_dtype") == getattr( - foo, "bigframes_bigquery_function_output_dtype" - ) - - # Fails to apply on dataframe with incompatible number of columns. - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes. - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns. - # and their datatypes. - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result. - expected_result = pandas.Series( - [ - ["1", "22.5", "alpha"], - ["2", "23.0", "beta"], - ["3", "23.5", "gamma"], - ] - ) - - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) - - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index fb2c8c36e0..dfda3eec1b 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -1912,19 +1912,19 @@ def foo(x, y, z): # Fails to apply on dataframe with incompatible number of columns with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", ): bf_df[["Id", "Age"]].apply(foo, axis=1) with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", ): bf_df.assign(Country="lalaland").apply(foo, axis=1) # Fails to apply on dataframe with incompatible column datatypes with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", ): bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) @@ -1993,19 +1993,19 @@ def foo(x, y, z): # Fails to apply on dataframe with incompatible number of columns with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", ): bf_df[["Id", "Age"]].apply(foo, axis=1) with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", ): bf_df.assign(Country="lalaland").apply(foo, axis=1) # Fails to apply on dataframe with incompatible column datatypes with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", ): bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) @@ -2057,19 +2057,19 @@ def foo(x): # Fails to apply on dataframe with incompatible number of columns with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 0 columns\\.$", + match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", ): bf_df[[]].apply(foo, axis=1) with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes 1 arguments but DataFrame has 2 columns\\.$", + match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", ): bf_df.assign(Country="lalaland").apply(foo, axis=1) # Fails to apply on dataframe with incompatible column datatypes with pytest.raises( ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", ): bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) diff --git a/tests/system/small/functions/test_managed_function.py b/tests/system/small/functions/test_managed_function.py index e1af68512a..41a5785d01 100644 --- a/tests/system/small/functions/test_managed_function.py +++ b/tests/system/small/functions/test_managed_function.py @@ -62,9 +62,6 @@ def foo(x): assert hasattr(foo, "bigframes_bigquery_function") assert hasattr(foo, "ibis_node") - assert hasattr(foo, "input_dtypes") - assert hasattr(foo, "output_dtype") - assert hasattr(foo, "bigframes_bigquery_function_output_dtype") scalars_df, scalars_pandas_df = scalars_dfs @@ -127,88 +124,6 @@ def add(x: int, y: int) -> int: pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -@pytest.mark.parametrize( - ("typ",), - [ - pytest.param(int), - pytest.param(float), - pytest.param(bool), - pytest.param(str), - ], -) -def test_managed_function_series_apply_list_output( - typ, - scalars_dfs, - dataset_id_permanent, -): - def foo_list(x): - # The bytes() constructor expects a non-negative interger as its arg. - return [typ(abs(x)), typ(abs(x) + 1)] - - foo_list = udf( - input_types=int, - output_type=list[typ], # type: ignore - dataset=dataset_id_permanent, - name=get_function_name(foo_list), - )(foo_list) - - scalars_df, scalars_pandas_df = scalars_dfs - - bf_result_col = scalars_df["int64_too"].apply(foo_list) - bf_result = ( - scalars_df["int64_too"].to_frame().assign(result=bf_result_col).to_pandas() - ) - - pd_result_col = scalars_pandas_df["int64_too"].apply(foo_list) - pd_result = scalars_pandas_df["int64_too"].to_frame().assign(result=pd_result_col) - - # Ignore any dtype difference. - assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) - - -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -def test_managed_function_series_combine_list_output(dataset_id_permanent, scalars_dfs): - def add_list(x: int, y: int) -> list[int]: - return [x, y] - - scalars_df, scalars_pandas_df = scalars_dfs - int_col_name_with_nulls = "int64_col" - int_col_name_no_nulls = "int64_too" - bf_df = scalars_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - pd_df = scalars_pandas_df[[int_col_name_with_nulls, int_col_name_no_nulls]] - - # Make sure there are NA values in the test column. - assert any([pd.isna(val) for val in bf_df[int_col_name_with_nulls]]) - - add_list_managed_func = udf( - dataset=dataset_id_permanent, - name=get_function_name(add_list), - )(add_list) - - # After filtering out nulls the managed function application should work - # similar to pandas. - pd_filter = pd_df[int_col_name_with_nulls].notnull() - pd_result = pd_df[pd_filter][int_col_name_with_nulls].combine( - pd_df[pd_filter][int_col_name_no_nulls], add_list - ) - bf_filter = bf_df[int_col_name_with_nulls].notnull() - bf_result = ( - bf_df[bf_filter][int_col_name_with_nulls] - .combine(bf_df[bf_filter][int_col_name_no_nulls], add_list_managed_func) - .to_pandas() - ) - - # Ignore any dtype difference. - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - - @pytest.mark.skipif( get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", @@ -282,73 +197,3 @@ def add_ints(x, y): pd.testing.assert_series_equal( pd_result, bf_result, check_dtype=False, check_exact=True ) - - -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -def test_managed_function_dataframe_map_list_output(scalars_dfs, dataset_id_permanent): - def add_one_list(x): - return [x + 1] * 3 - - mf_add_one_list = udf( - input_types=[int], - output_type=list[int], - dataset=dataset_id_permanent, - name=get_function_name(add_one_list), - )(add_one_list) - - scalars_df, scalars_pandas_df = scalars_dfs - int64_cols = ["int64_col", "int64_too"] - - bf_int64_df = scalars_df[int64_cols] - bf_int64_df_filtered = bf_int64_df.dropna() - bf_result = bf_int64_df_filtered.map(mf_add_one_list).to_pandas() - - pd_int64_df = scalars_pandas_df[int64_cols] - pd_int64_df_filtered = pd_int64_df.dropna() - pd_result = pd_int64_df_filtered.map(add_one_list) - - # Ignore any dtype difference. - assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) - - -@pytest.mark.skipif( - get_python_version() not in bff_session._MANAGED_FUNC_PYTHON_VERSIONS, - reason=f"Supported version: {bff_session._MANAGED_FUNC_PYTHON_VERSIONS}", -) -def test_managed_function_dataframe_apply_axis_1_list_output( - session, scalars_dfs, dataset_id_permanent -): - scalars_df, scalars_pandas_df = scalars_dfs - series = scalars_df["int64_too"] - series_pandas = scalars_pandas_df["int64_too"] - - def add_ints_list(x, y): - return [x + y] * 2 - - add_ints_list_mf = session.udf( - input_types=[int, int], - output_type=list[int], - dataset=dataset_id_permanent, - name=get_function_name(add_ints_list, is_row_processor=True), - )(add_ints_list) - assert add_ints_list_mf.bigframes_bigquery_function # type: ignore - - with pytest.warns( - bigframes.exceptions.PreviewWarning, - match="axis=1 scenario is in preview.", - ): - bf_result = ( - bpd.DataFrame({"x": series, "y": series}) - .apply(add_ints_list_mf, axis=1) - .to_pandas() - ) - - pd_result = pd.DataFrame({"x": series_pandas, "y": series_pandas}).apply( - lambda row: add_ints_list(row["x"], row["y"]), axis=1 - ) - - # Ignore any dtype difference. - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) From 9f5bb977fe18d433addc639f6aa3fb0f561c4919 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 11 Mar 2025 00:26:22 +0000 Subject: [PATCH 4/5] Revert "adjust the expect warnings in the ingress settings and service account" This reverts commit 9ebb3a2bc150834279230a5920cd6acd71e75537. --- bigframes/functions/_function_session.py | 2 +- .../large/functions/test_remote_function.py | 76 +++++-------------- 2 files changed, 21 insertions(+), 57 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 0d3b66c503..15c8cb979e 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -458,7 +458,7 @@ def remote_function( msg = bfe.format_message( "You have not explicitly set a user-managed `cloud_function_service_account`. " "Using the default Compute Engine service account. " - "In BigFrames 2.0, you would have to explicitly set `cloud_function_service_account` " + "To use Bigframes 2.0, please explicitly set `cloud_function_service_account` " 'either to a user-managed service account (preferred) or to `"default"` ' "to use the Compute Engine service account (discouraged). " "See, https://cloud.google.com/functions/docs/securing/function-identity." diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index dfda3eec1b..4de560f872 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -17,7 +17,6 @@ import inspect import math # must keep this at top level to test udf referring global import import os.path -import re import shutil import sys import tempfile @@ -1197,38 +1196,14 @@ def square_num(x): assert gcf.service_config.service_account_email == gcf_service_account -@pytest.mark.parametrize( - ("remote_function_args"), - [ - pytest.param( - {}, - id="no-set", - ), - pytest.param( - {"cloud_function_service_account": None}, - id="set-none", - ), - ], -) -def test_remote_function_warns_default_cloud_function_service_account( - session, remote_function_args -): - with pytest.warns(FutureWarning) as record: - session.remote_function(**remote_function_args) +def test_remote_function_warns_default_cloud_function_service_account(): + project = "bigframes-dev-perf" + rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) - len( - [ - warn - for warn in record - if re.search( - ( - "You have not explicitly set a user-managed.*Using the default Compute Engine.*service account" - ), - warn.message.args[0], - re.DOTALL, - ) - ] - ) == 1 + with pytest.warns(FutureWarning, match="You have not explicitly set a"): + rf_session.remote_function( + cloud_function_service_account=None, # Explicitly omit service account. + ) @pytest.mark.flaky(retries=2, delay=120) @@ -2127,40 +2102,36 @@ def generate_stats(row: pandas.Series) -> list[int]: @pytest.mark.parametrize( - ( - "ingress_settings_args", - "effective_ingress_settings", - "expect_default_ingress_setting_warning", - ), + ("ingress_settings_args", "effective_ingress_settings", "expected_warning"), [ pytest.param( {}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, - True, + FutureWarning, id="no-set", ), pytest.param( {"cloud_function_ingress_settings": None}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, - True, + FutureWarning, id="set-none", ), pytest.param( {"cloud_function_ingress_settings": "all"}, functions_v2.ServiceConfig.IngressSettings.ALLOW_ALL, - False, + None, id="set-all", ), pytest.param( {"cloud_function_ingress_settings": "internal-only"}, functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_ONLY, - False, + None, id="set-internal-only", ), pytest.param( {"cloud_function_ingress_settings": "internal-and-gclb"}, functions_v2.ServiceConfig.IngressSettings.ALLOW_INTERNAL_AND_GCLB, - False, + None, id="set-internal-and-gclb", ), ], @@ -2171,10 +2142,10 @@ def test_remote_function_ingress_settings( scalars_dfs, ingress_settings_args, effective_ingress_settings, - expect_default_ingress_setting_warning, + expected_warning, ): # Verify the function raises the expected security warning message. - with warnings.catch_warnings(record=True) as record: + with warnings.catch_warnings(record=True) as w: def square(x: int) -> int: return x * x @@ -2183,18 +2154,11 @@ def square(x: int) -> int: square ) - default_ingress_setting_warnings = [ - warn - for warn in record - if isinstance(warn.message, FutureWarning) - and "`cloud_function_ingress_settings` are set to 'all' by default" - in warn.message.args[0] - and "will change to 'internal-only' for enhanced security in future" - in warn.message.args[0] - ] - assert len(default_ingress_setting_warnings) == ( - 1 if expect_default_ingress_setting_warning else 0 - ) + if expected_warning is not None: + assert issubclass(w[0].category, FutureWarning) + assert "Consider using 'internal-only' for enhanced security." in str( + w[0].message + ) # Assert that the GCF is created with the intended maximum timeout gcf = session.cloudfunctionsclient.get_function( From 726911864efbf4df9b04eb5cc9b5d06dab3b884c Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 17 Mar 2025 19:43:46 +0000 Subject: [PATCH 5/5] remove explicit cleanup from managed udf tests --- .../large/functions/test_managed_function.py | 183 ++++++++---------- 1 file changed, 81 insertions(+), 102 deletions(-) diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 0af6810fa9..e91fff9f5a 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -164,60 +164,45 @@ def func(x, y): ], ) def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype): - try: - - @session.udf(dataset=dataset_id) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] + @session.udf(dataset=dataset_id) + def featurize(x: int) -> list[array_dtype]: # type: ignore + return [array_dtype(i) for i in [x, x + 1, x + 2]] - scalars_df, scalars_pandas_df = scalars_dfs + scalars_df, scalars_pandas_df = scalars_dfs - bf_int64_col = scalars_df["int64_too"] - bf_result = bf_int64_col.apply(featurize).to_pandas() + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurize).to_pandas() - pd_int64_col = scalars_pandas_df["int64_too"] - pd_result = pd_int64_col.apply(featurize) + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurize) - # Ignore any dtype disparity. - pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) - - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - featurize, session.bqclient, session.cloudfunctionsclient - ) + # Ignore any dtype disparity. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) def test_managed_function_binop_array_output(session, scalars_dfs, dataset_id): - try: - - def func(x, y): - return [len(x), abs(y % 4)] - - managed_func = session.udf( - input_types=[str, int], - output_type=list[int], - dataset=dataset_id, - )(func) - - scalars_df, scalars_pandas_df = scalars_dfs - - scalars_df = scalars_df.dropna() - scalars_pandas_df = scalars_pandas_df.dropna() - bf_result = ( - scalars_df["string_col"] - .combine(scalars_df["int64_col"], managed_func) - .to_pandas() - ) - pd_result = scalars_pandas_df["string_col"].combine( - scalars_pandas_df["int64_col"], func - ) - pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets( - managed_func, session.bqclient, session.cloudfunctionsclient - ) + def func(x, y): + return [len(x), abs(y % 4)] + + managed_func = session.udf( + input_types=[str, int], + output_type=list[int], + dataset=dataset_id, + )(func) + + scalars_df, scalars_pandas_df = scalars_dfs + + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], managed_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) def test_manage_function_df_apply_axis_1_array_output(session): @@ -238,63 +223,57 @@ def test_manage_function_df_apply_axis_1_array_output(session): # Assert the dataframe dtypes. assert tuple(bf_df.dtypes) == expected_dtypes - try: + @session.udf(input_types=[int, float, str], output_type=list[str]) + def foo(x, y, z): + return [str(x), str(y), z] - @session.udf(input_types=[int, float, str], output_type=list[str]) - def foo(x, y, z): - return [str(x), str(y), z] - - assert getattr(foo, "is_row_processor") is False - assert getattr(foo, "input_dtypes") == expected_dtypes - assert getattr(foo, "output_dtype") == pandas.ArrowDtype( - pyarrow.list_( - bigframes.dtypes.bigframes_dtype_to_arrow_dtype( - bigframes.dtypes.STRING_DTYPE - ) + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + assert getattr(foo, "output_dtype") == pandas.ArrowDtype( + pyarrow.list_( + bigframes.dtypes.bigframes_dtype_to_arrow_dtype( + bigframes.dtypes.STRING_DTYPE ) ) - assert getattr(foo, "output_dtype") == getattr( - foo, "bigframes_bigquery_function_output_dtype" - ) - - # Fails to apply on dataframe with incompatible number of columns. - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", - ): - bf_df[["Id", "Age"]].apply(foo, axis=1) - - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", - ): - bf_df.assign(Country="lalaland").apply(foo, axis=1) - - # Fails to apply on dataframe with incompatible column datatypes. - with pytest.raises( - ValueError, - match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", - ): - bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) - - # Successfully applies to dataframe with matching number of columns. - # and their datatypes. - bf_result = bf_df.apply(foo, axis=1).to_pandas() - - # Since this scenario is not pandas-like, let's handcraft the - # expected result. - expected_result = pandas.Series( - [ - ["1", "22.5", "alpha"], - ["2", "23.0", "beta"], - ["3", "23.5", "gamma"], - ] - ) + ) + assert getattr(foo, "output_dtype") == getattr( + foo, "bigframes_bigquery_function_output_dtype" + ) - pandas.testing.assert_series_equal( - expected_result, bf_result, check_dtype=False, check_index_type=False - ) + # Fails to apply on dataframe with incompatible number of columns. + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes. + with pytest.raises( + ValueError, + match="^BigFrames BigQuery function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns. + # and their datatypes. + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result. + expected_result = pandas.Series( + [ + ["1", "22.5", "alpha"], + ["2", "23.0", "beta"], + ["3", "23.5", "gamma"], + ] + ) - finally: - # Clean up the gcp assets created for the managed function. - cleanup_function_assets(foo, session.bqclient, session.cloudfunctionsclient) + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + )