Skip to content

Commit 8287f97

Browse files
feat: Experimental transpilation of unannotated python callables
1 parent 384724c commit 8287f97

6 files changed

Lines changed: 410 additions & 24 deletions

File tree

packages/bigframes/bigframes/_config/experiment_options.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(self):
2828
self._semantic_operators: bool = False
2929
self._ai_operators: bool = False
3030
self._sql_compiler: Literal["legacy", "stable", "experimental"] = "stable"
31+
self._enable_python_transpiler: bool = False
3132

3233
@property
3334
def semantic_operators(self) -> bool:
@@ -166,3 +167,17 @@ def blob_display_height(self, value: Optional[int]):
166167
warnings.warn(msg, category=bfe.ApiDeprecationWarning)
167168

168169
bigframes.options.display.blob_display_height = value
170+
171+
@property
172+
def enable_python_transpiler(self) -> bool:
173+
return self._enable_python_transpiler
174+
175+
@enable_python_transpiler.setter
176+
def enable_python_transpiler(self, value: bool):
177+
if value:
178+
msg = bfe.format_message(
179+
"Python transpiler is an unstable, experimental feature, and not yet fully "
180+
"validated, use at your own risk."
181+
)
182+
warnings.warn(msg, category=bfe.PreviewWarning)
183+
self._enable_python_transpiler = value

packages/bigframes/bigframes/dataframe.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4692,13 +4692,17 @@ def _prepare_export(
46924692
return array_value, id_overrides
46934693

46944694
def map(self, func, na_action: Optional[str] = None) -> DataFrame:
4695-
if not isinstance(func, bigframes.functions.Udf):
4695+
from bigframes._config import options
4696+
4697+
if not isinstance(func, bigframes.functions.Udf) and not (
4698+
options.experiments.enable_python_transpiler and callable(func)
4699+
):
46964700
raise TypeError("the first argument must be callable")
46974701

46984702
if na_action not in {None, "ignore"}:
46994703
raise ValueError(f"na_action={na_action} not supported")
47004704

4701-
expr = ops.func_to_op(func).as_expr(ex.free_var("input"))
4705+
expr = ops.func_to_expr(func).apply(ex.free_var("input"))
47024706
if na_action == "ignore":
47034707
# True case, predicate, False case
47044708
expr = ops.where_op.as_expr(
@@ -4718,11 +4722,74 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
47184722
)
47194723
warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning)
47204724

4721-
if not isinstance(func, bigframes.functions.Udf):
4725+
from bigframes._config import options
4726+
4727+
if not isinstance(func, bigframes.functions.Udf) and not (
4728+
options.experiments.enable_python_transpiler and callable(func)
4729+
):
47224730
raise ValueError(
47234731
"For axis=1 a BigFrames BigQuery function must be used."
47244732
)
47254733

4734+
if (
4735+
not isinstance(func, bigframes.functions.Udf)
4736+
and options.experiments.enable_python_transpiler
4737+
and callable(func)
4738+
):
4739+
from bigframes.operations.to_op import CallableExpression
4740+
4741+
callable_expr = CallableExpression.from_callable(
4742+
func, unpack_mode=False
4743+
)
4744+
4745+
# Bind the extra arguments (args and kwargs) starting from parameter 1
4746+
bindings = {}
4747+
# Positional arguments:
4748+
for idx, val in enumerate(args):
4749+
param_name = callable_expr.arg_specs[idx + 1].name
4750+
bindings[param_name] = val
4751+
# Keyword arguments:
4752+
for key, val in kwargs.items():
4753+
bindings[key] = val
4754+
4755+
# Bind defaults for other parameters (excluding the first 'row' parameter)
4756+
for spec in callable_expr.arg_specs[1:]:
4757+
if (
4758+
spec.name not in bindings
4759+
and spec.default_value is not inspect.Parameter.empty
4760+
):
4761+
bindings[spec.name] = spec.default_value
4762+
4763+
# Wrap all values in bindings as expressions
4764+
def to_expr(val):
4765+
if isinstance(val, ex.Expression):
4766+
return val
4767+
return ex.const(val)
4768+
4769+
bindings = {k: to_expr(v) for k, v in bindings.items()}
4770+
4771+
# Now bind these variables in the expression!
4772+
expr = callable_expr.expr.bind_variables(
4773+
bindings, allow_partial_bindings=True
4774+
)
4775+
4776+
# Now bind the remaining free variables to the DataFrame columns:
4777+
col_bindings = {}
4778+
block = self._get_block()
4779+
for col in self.columns:
4780+
if col in expr.free_variables:
4781+
col_id = block.resolve_label_exact(col)
4782+
if col_id is not None:
4783+
col_bindings[col] = ex.deref(col_id)
4784+
4785+
expr = expr.bind_variables(col_bindings)
4786+
4787+
# Project the expression on the DataFrame block to get a new Series!
4788+
block, result_id = self._get_block().project_expr(expr)
4789+
from bigframes.series import Series
4790+
4791+
return Series(block.select_column(result_id))
4792+
47264793
if func.udf_def.signature.is_row_processor:
47274794
# Early check whether the dataframe dtypes are currently supported
47284795
# in the bigquery function
@@ -4777,7 +4844,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
47774844

47784845
# Apply the function
47794846
result_series = rows_as_json_series._apply_nary_op(
4780-
ops.func_to_op(func),
4847+
ops.func_to_expr(func).expr.op,
47814848
list(args),
47824849
)
47834850

@@ -4837,8 +4904,8 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
48374904

48384905
series_list = [self[col] for col in self.columns]
48394906
op_list = series_list[1:] + list(args)
4840-
result_series = series_list[0]._apply_nary_op(
4841-
ops.func_to_op(func), op_list
4907+
result_series = series_list[0]._apply_callable_expr(
4908+
ops.func_to_expr(func), op_list
48424909
)
48434910
result_series.name = None
48444911

packages/bigframes/bigframes/operations/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@
229229
timestamp_add_op,
230230
timestamp_sub_op,
231231
)
232-
from bigframes.operations.to_op import func_to_op
232+
from bigframes.operations.to_op import func_to_expr
233233

234234
__all__ = [
235235
# Base ops
@@ -437,7 +437,7 @@
437437
"AIScore",
438438
"AISimilarity",
439439
# Helper functions
440-
"func_to_op",
440+
"func_to_expr",
441441
# Numpy ops mapping
442442
"NUMPY_TO_BINOP",
443443
"NUMPY_TO_OP",

packages/bigframes/bigframes/operations/to_op.py

Lines changed: 166 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,185 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from __future__ import annotations
1415

16+
import dataclasses
17+
import inspect
18+
import typing
19+
20+
import bigframes.core.expression as ex
21+
import bigframes.core.identifiers as ids
22+
import bigframes.dtypes as dtypes
23+
from bigframes._config import options
1524
from bigframes.functions import Udf
1625
from bigframes.functions.udf_def import BigqueryUdf, PythonUdf
17-
from bigframes.operations import base_ops, remote_function_ops
26+
from bigframes.operations import remote_function_ops
27+
28+
29+
@dataclasses.dataclass(frozen=True)
30+
class ArgumentSpec:
31+
"""
32+
Information about a single argument to a function
33+
"""
34+
35+
name: str
36+
default_value: typing.Any
37+
is_varargs: bool
1838

1939

20-
def func_to_op(op) -> base_ops.NaryOp:
40+
@dataclasses.dataclass(frozen=True)
41+
class CallableExpression(ex.Expression):
2142
"""
22-
Convert various bigframes, python functions into bigframes operations.
43+
Encodes a calling convention and an expression to bind arguments to.
44+
"""
45+
46+
expr: ex.Expression
47+
arg_specs: typing.Sequence[ArgumentSpec]
48+
49+
@classmethod
50+
def from_callable(
51+
cls, func: typing.Callable, unpack_mode: bool = False
52+
) -> CallableExpression:
53+
sig = inspect.signature(func)
54+
arg_specs = []
55+
for name, param in sig.parameters.items():
56+
is_varargs = param.kind == inspect.Parameter.VAR_POSITIONAL
57+
arg_specs.append(
58+
ArgumentSpec(
59+
name=name,
60+
default_value=param.default,
61+
is_varargs=is_varargs,
62+
)
63+
)
64+
65+
from bigframes.core.bytecode import dis_to_expr
66+
67+
expr = dis_to_expr(func, unpack_mode=unpack_mode)
68+
return cls(expr=expr, arg_specs=arg_specs)
69+
70+
def apply(self, *args, **kwargs) -> ex.Expression:
71+
"""
72+
Apply the arguments to the expression.
73+
74+
All args are expected to be column references, or scalars.
75+
"""
76+
bindings = {}
77+
pos_idx = 0
78+
79+
def to_expr(val):
80+
if isinstance(val, ex.Expression):
81+
return val
82+
return ex.const(val)
83+
84+
for spec in self.arg_specs:
85+
if spec.is_varargs:
86+
raise NotImplementedError(
87+
"varargs in compiled python functions is not supported"
88+
)
2389

24-
This should handle anything that might be passed to eg map, combine, other pandas methods that take a function.
90+
if pos_idx < len(args):
91+
bindings[spec.name] = to_expr(args[pos_idx])
92+
pos_idx += 1
93+
elif spec.name in kwargs:
94+
bindings[spec.name] = to_expr(kwargs[spec.name])
95+
elif spec.default_value is not inspect.Parameter.empty:
96+
bindings[spec.name] = to_expr(spec.default_value)
97+
else:
98+
raise TypeError(f"missing required argument: '{spec.name}'")
2599

26-
It should raise a TypeError if the object is not a supported type.
100+
if pos_idx < len(args):
101+
raise TypeError(
102+
f"too many positional arguments: expected {len(self.arg_specs)}, got {len(args)}"
103+
)
27104

28-
Args:
29-
op: The object to convert.
105+
return self.expr.bind_variables(bindings)
30106

31-
Returns:
32-
A bigframes operations.
107+
@property
108+
def column_references(self) -> typing.Tuple[ids.ColumnId, ...]:
109+
return self.expr.column_references
110+
111+
@property
112+
def free_variables(self) -> typing.Tuple[typing.Hashable, ...]:
113+
return self.expr.free_variables
114+
115+
@property
116+
def is_const(self) -> bool:
117+
return self.expr.is_const
118+
119+
@property
120+
def is_resolved(self) -> bool:
121+
return False
122+
123+
@property
124+
def output_type(self) -> dtypes.ExpressionType:
125+
raise ValueError(
126+
"CallableExpression does not have a fixed output type until arguments are applied."
127+
)
128+
129+
def bind_refs(
130+
self,
131+
bindings: typing.Mapping[ids.ColumnId, ex.Expression],
132+
allow_partial_bindings: bool = False,
133+
) -> CallableExpression:
134+
return dataclasses.replace(
135+
self,
136+
expr=self.expr.bind_refs(
137+
bindings, allow_partial_bindings=allow_partial_bindings
138+
),
139+
)
140+
141+
def bind_variables(
142+
self,
143+
bindings: typing.Mapping[typing.Hashable, ex.Expression],
144+
allow_partial_bindings: bool = False,
145+
) -> CallableExpression:
146+
arg_names = {spec.name for spec in self.arg_specs}
147+
filtered_bindings = {k: v for k, v in bindings.items() if k not in arg_names}
148+
return dataclasses.replace(
149+
self,
150+
expr=self.expr.bind_variables(
151+
filtered_bindings, allow_partial_bindings=allow_partial_bindings
152+
),
153+
)
154+
155+
def transform_children(
156+
self, t: typing.Callable[[ex.Expression], ex.Expression]
157+
) -> ex.Expression:
158+
new_expr = t(self.expr)
159+
if new_expr != self.expr:
160+
return dataclasses.replace(self, expr=new_expr)
161+
return self
162+
163+
164+
def func_to_expr(op, unpack_mode: bool = False) -> CallableExpression:
165+
"""
166+
Convert various bigframes, python functions into bigframes CallableExpression.
33167
"""
34-
# TODO(b/517578802): Handle numpy ufuncs, builtin functions, etc.
35168
if isinstance(op, Udf):
36169
if isinstance(op.udf_def, BigqueryUdf):
37-
return remote_function_ops.RemoteFunctionOp(function_def=op.udf_def)
170+
bq_op = remote_function_ops.RemoteFunctionOp(function_def=op.udf_def)
38171
elif isinstance(op.udf_def, PythonUdf):
39-
return remote_function_ops.PythonUdfOp(function_def=op.udf_def)
172+
bq_op = remote_function_ops.PythonUdfOp(function_def=op.udf_def)
173+
else:
174+
raise TypeError(f"Unsupported UDF definition: {op.udf_def}")
175+
176+
inputs_expr = tuple(
177+
ex.free_var(arg.name) for arg in op.udf_def.signature.inputs
178+
)
179+
expr = ex.OpExpression(bq_op, inputs_expr)
180+
181+
arg_specs = [
182+
ArgumentSpec(
183+
name=arg.name,
184+
default_value=inspect.Parameter.empty,
185+
is_varargs=False,
186+
)
187+
for arg in op.udf_def.signature.inputs
188+
]
189+
return CallableExpression(expr=expr, arg_specs=arg_specs)
190+
191+
elif options.experiments.enable_python_transpiler and callable(op):
192+
return CallableExpression.from_callable(op, unpack_mode=unpack_mode)
193+
40194
else:
41195
raise TypeError(f"Unsupported function type: {op}")

0 commit comments

Comments
 (0)