-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
POC: New UDF methods option #43678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: New UDF methods option #43678
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,10 @@ | |
|
||
import numpy as np | ||
|
||
from pandas._config import option_context | ||
from pandas._config import ( | ||
get_option, | ||
option_context, | ||
) | ||
|
||
from pandas._libs import lib | ||
from pandas._typing import ( | ||
|
@@ -82,6 +85,7 @@ def frame_apply( | |
result_type: str | None = None, | ||
args=None, | ||
kwargs=None, | ||
renamer=None, | ||
) -> FrameApply: | ||
"""construct and return a row or column based frame apply object""" | ||
axis = obj._get_axis_number(axis) | ||
|
@@ -98,6 +102,7 @@ def frame_apply( | |
result_type=result_type, | ||
args=args, | ||
kwargs=kwargs, | ||
renamer=renamer, | ||
) | ||
|
||
|
||
|
@@ -112,6 +117,7 @@ def __init__( | |
result_type: str | None, | ||
args, | ||
kwargs, | ||
renamer=None, | ||
): | ||
self.obj = obj | ||
self.raw = raw | ||
|
@@ -141,6 +147,7 @@ def f(x): | |
|
||
self.orig_f: AggFuncType = func | ||
self.f: AggFuncType = f | ||
self.renamer = renamer | ||
|
||
@abc.abstractmethod | ||
def apply(self) -> DataFrame | Series: | ||
|
@@ -164,10 +171,16 @@ def agg(self) -> DataFrame | Series | None: | |
return self.apply_str() | ||
|
||
if is_dict_like(arg): | ||
return self.agg_dict_like() | ||
if get_option("new_udf_methods"): | ||
return self.new_dict_like("agg") | ||
else: | ||
return self.agg_dict_like() | ||
elif is_list_like(arg): | ||
# we require a list, but not a 'str' | ||
return self.agg_list_like() | ||
if get_option("new_udf_methods"): | ||
return self.new_list_like("agg") | ||
else: | ||
return self.agg_list_like() | ||
|
||
if callable(arg): | ||
f = com.get_cython_func(arg) | ||
|
@@ -408,6 +421,70 @@ def agg_list_like(self) -> DataFrame | Series: | |
) | ||
return concatenated.reindex(full_ordered_index, copy=False) | ||
|
||
def new_list_like(self, method: str) -> DataFrame | Series: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be something other than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed - splitting this up, some discussion in the first part: #43736. "future" has been suggested as a replacement for "new". |
||
""" | ||
Compute aggregation in the case of a list-like argument. | ||
|
||
Returns | ||
------- | ||
Result of aggregation. | ||
""" | ||
from pandas.core.reshape.concat import concat | ||
|
||
obj = self.obj | ||
arg = cast(List[AggFuncTypeBase], self.f) | ||
|
||
results = [] | ||
keys = [] | ||
result_dim = None | ||
|
||
for a in arg: | ||
name = None | ||
try: | ||
if isinstance(a, (tuple, list)): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prob worth having a function to compute a single arg (and then just call in the list) |
||
# Handle (name, value) pairs | ||
name, a = a | ||
new_res = getattr(obj, method)(a) | ||
if result_dim is None: | ||
result_dim = getattr(new_res, "ndim", 0) | ||
elif getattr(new_res, "ndim", 0) != result_dim: | ||
raise ValueError( | ||
"cannot combine transform and aggregation operations" | ||
) | ||
except TypeError: | ||
pass | ||
else: | ||
results.append(new_res) | ||
|
||
# make sure we find a good name | ||
if name is None: | ||
name = com.get_callable_name(a) or a | ||
keys.append(name) | ||
|
||
# if we are empty | ||
if not len(results): | ||
raise ValueError("no results") | ||
|
||
try: | ||
concatenated = concat(results, keys=keys, axis=1, sort=False) | ||
except TypeError: | ||
# we are concatting non-NDFrame objects, | ||
# e.g. a list of scalars | ||
from pandas import Series | ||
|
||
result = Series(results, index=keys, name=obj.name) | ||
return result | ||
else: | ||
# Concat uses the first index to determine the final indexing order. | ||
# The union of a shorter first index with the other indices causes | ||
# the index sorting to be different from the order of the aggregating | ||
# functions. Reindex if this is the case. | ||
index_size = concatenated.index.size | ||
full_ordered_index = next( | ||
result.index for result in results if result.index.size == index_size | ||
) | ||
return concatenated.reindex(full_ordered_index, copy=False) | ||
|
||
def agg_dict_like(self) -> DataFrame | Series: | ||
""" | ||
Compute aggregation in the case of a dict-like argument. | ||
|
@@ -486,6 +563,86 @@ def agg_dict_like(self) -> DataFrame | Series: | |
|
||
return result | ||
|
||
def new_dict_like(self, method: str) -> DataFrame | Series: | ||
""" | ||
Compute aggregation in the case of a dict-like argument. | ||
|
||
Returns | ||
------- | ||
Result of aggregation. | ||
""" | ||
from pandas import Index | ||
from pandas.core.reshape.concat import concat | ||
|
||
obj = self.obj | ||
arg = cast(AggFuncTypeDict, self.f) | ||
|
||
if not isinstance(obj, SelectionMixin): | ||
# i.e. obj is Series or DataFrame | ||
selected_obj = obj | ||
selection = None | ||
else: | ||
selected_obj = obj._selected_obj | ||
selection = obj._selection | ||
|
||
arg = self.normalize_dictlike_arg("agg", selected_obj, arg) | ||
|
||
if selected_obj.ndim == 1: | ||
# key only used for output | ||
colg = obj._gotitem(selection, ndim=1) | ||
results = {key: getattr(colg, method)(how) for key, how in arg.items()} | ||
|
||
else: | ||
# key used for column selection and output | ||
results = { | ||
key: getattr(obj._gotitem(key, ndim=1), method)(how) | ||
for key, how in arg.items() | ||
} | ||
if self.renamer is not None: | ||
for key, columns in self.renamer.items(): | ||
results[key].columns = columns | ||
|
||
# Avoid making two isinstance calls in all and any below | ||
if isinstance(results, dict): | ||
is_ndframe = [isinstance(r, ABCNDFrame) for r in results.values()] | ||
else: | ||
is_ndframe = [isinstance(r, ABCNDFrame) for r in results] | ||
|
||
# combine results | ||
result: DataFrame | Series | ||
if all(is_ndframe): | ||
keys_to_use: Iterable[Hashable] | ||
keys_to_use = [k for k in arg.keys() if not results[k].empty] | ||
keys_to_use = keys_to_use if keys_to_use != [] else arg.keys() | ||
if selected_obj.ndim == 2: | ||
# keys are columns, so we can preserve names | ||
ktu = Index(keys_to_use) | ||
ktu._set_names(selected_obj.columns.names) | ||
keys_to_use = ktu | ||
keys = None if selected_obj.ndim == 1 else keys_to_use | ||
result = concat({k: results[k] for k in keys_to_use}, keys=keys, axis=1) | ||
elif any(is_ndframe): | ||
# There is a mix of NDFrames and scalars | ||
raise ValueError( | ||
"cannot perform both aggregation " | ||
"and transformation operations " | ||
"simultaneously" | ||
) | ||
else: | ||
from pandas import Series | ||
|
||
# we have a dict of scalars | ||
# GH 36212 use name only if obj is a series | ||
if obj.ndim == 1: | ||
obj = cast("Series", obj) | ||
name = obj.name | ||
else: | ||
name = None | ||
|
||
result = Series(results, index=arg.keys(), name=name) | ||
|
||
return result | ||
|
||
def apply_str(self) -> DataFrame | Series: | ||
""" | ||
Compute apply in case of a string. | ||
|
@@ -522,6 +679,35 @@ def apply_multiple(self) -> DataFrame | Series: | |
""" | ||
return self.obj.aggregate(self.f, self.axis, *self.args, **self.kwargs) | ||
|
||
def new_apply_multiple(self) -> DataFrame | Series: | ||
""" | ||
Compute apply in case of a list-like or dict-like. | ||
|
||
Returns | ||
------- | ||
result: Series, DataFrame, or None | ||
Result when self.f is a list-like or dict-like, None otherwise. | ||
""" | ||
obj = self.obj | ||
axis = self.axis | ||
|
||
self.obj = obj if axis == 0 else obj.T | ||
self.axis = 0 | ||
|
||
try: | ||
if is_dict_like(self.f): | ||
result = self.new_dict_like("apply") | ||
else: | ||
result = self.new_list_like("apply") | ||
finally: | ||
self.obj = obj | ||
self.axis = axis | ||
|
||
if axis == 1: | ||
result = result.T if result is not None else result | ||
|
||
return result | ||
|
||
def normalize_dictlike_arg( | ||
self, how: str, obj: DataFrame | Series, func: AggFuncTypeDict | ||
) -> AggFuncTypeDict: | ||
|
@@ -661,7 +847,10 @@ def apply(self) -> DataFrame | Series: | |
"""compute the results""" | ||
# dispatch to agg | ||
if is_list_like(self.f): | ||
return self.apply_multiple() | ||
if get_option("new_udf_methods"): | ||
return self.new_apply_multiple() | ||
else: | ||
return self.apply_multiple() | ||
|
||
# all empty | ||
if len(self.columns) == 0 and len(self.index) == 0: | ||
|
@@ -1039,7 +1228,10 @@ def apply(self) -> DataFrame | Series: | |
|
||
# dispatch to agg | ||
if is_list_like(self.f): | ||
return self.apply_multiple() | ||
if get_option("new_udf_methods"): | ||
return self.new_apply_multiple() | ||
else: | ||
return self.apply_multiple() | ||
|
||
if isinstance(self.f, str): | ||
# if we are a string, try to dispatch | ||
|
@@ -1172,7 +1364,13 @@ def transform(self): | |
|
||
def reconstruct_func( | ||
func: AggFuncType | None, **kwargs | ||
) -> tuple[bool, AggFuncType | None, list[str] | None, list[int] | None]: | ||
) -> tuple[ | ||
bool, | ||
AggFuncType | None, | ||
list[str] | None, | ||
list[int] | None, | ||
dict[str, list[str]] | None, | ||
]: | ||
""" | ||
This is the internal function to reconstruct func given if there is relabeling | ||
or not and also normalize the keyword to get new order of columns. | ||
|
@@ -1204,14 +1402,16 @@ def reconstruct_func( | |
Examples | ||
-------- | ||
>>> reconstruct_func(None, **{"foo": ("col", "min")}) | ||
(True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0])) | ||
(True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]), | ||
defaultdict(<class 'list'>, {'col': ['foo']})) | ||
|
||
>>> reconstruct_func("min") | ||
(False, 'min', None, None) | ||
(False, 'min', None, None, None) | ||
""" | ||
relabeling = func is None and is_multi_agg_with_relabel(**kwargs) | ||
columns: list[str] | None = None | ||
order: list[int] | None = None | ||
renamer: dict[str, list[str]] | None = None | ||
|
||
if not relabeling: | ||
if isinstance(func, list) and len(func) > len(set(func)): | ||
|
@@ -1227,9 +1427,9 @@ def reconstruct_func( | |
raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).") | ||
|
||
if relabeling: | ||
func, columns, order = normalize_keyword_aggregation(kwargs) | ||
func, columns, order, renamer = normalize_keyword_aggregation(kwargs) | ||
|
||
return relabeling, func, columns, order | ||
return relabeling, func, columns, order, renamer | ||
|
||
|
||
def is_multi_agg_with_relabel(**kwargs) -> bool: | ||
|
@@ -1258,7 +1458,9 @@ def is_multi_agg_with_relabel(**kwargs) -> bool: | |
) | ||
|
||
|
||
def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[int]]: | ||
def normalize_keyword_aggregation( | ||
kwargs: dict, | ||
) -> tuple[dict, list[str], list[int], dict[str, list]]: | ||
""" | ||
Normalize user-provided "named aggregation" kwargs. | ||
Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs | ||
|
@@ -1280,7 +1482,8 @@ def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[i | |
Examples | ||
-------- | ||
>>> normalize_keyword_aggregation({"output": ("input", "sum")}) | ||
(defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0])) | ||
(defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]), | ||
defaultdict(<class 'list'>, {'input': ['output']})) | ||
""" | ||
from pandas.core.indexes.base import Index | ||
|
||
|
@@ -1290,11 +1493,13 @@ def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[i | |
# May be hitting https://github.com/python/mypy/issues/5958 | ||
# saying it doesn't have an attribute __name__ | ||
aggspec: DefaultDict = defaultdict(list) | ||
renamer: DefaultDict = defaultdict(list) | ||
order = [] | ||
columns, pairs = list(zip(*kwargs.items())) | ||
|
||
for column, aggfunc in pairs: | ||
for name, (column, aggfunc) in zip(kwargs, pairs): | ||
aggspec[column].append(aggfunc) | ||
renamer[column].append(name) | ||
order.append((column, com.get_callable_name(aggfunc) or aggfunc)) | ||
|
||
# uniquify aggfunc name if duplicated in order list | ||
|
@@ -1314,7 +1519,7 @@ def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[i | |
col_idx_order = Index(uniquified_aggspec).get_indexer(uniquified_order) | ||
# error: Incompatible return value type (got "Tuple[defaultdict[Any, Any], | ||
# Any, ndarray]", expected "Tuple[Dict[Any, Any], List[str], List[int]]") | ||
return aggspec, columns, col_idx_order # type: ignore[return-value] | ||
return aggspec, columns, col_idx_order, renamer # type: ignore[return-value] | ||
|
||
|
||
def _make_unique_kwarg_list( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would move before args