From c1d6af11deff4540f780f2c8689f1232f259a82e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 12:09:06 -0700 Subject: [PATCH 01/14] Add groupby expanding indexer --- pandas/core/window/indexers.py | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index a21521f4ce8bb..f8a7a321a832b 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -334,3 +334,63 @@ def get_window_bounds( start = np.concatenate([start, np.array([end[-1]] * offset)]) end = np.concatenate([end, np.array([end[-1]] * offset)]) return start, end + + +class GroupbyExpandingIndexer(BaseIndexer): + """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" + + def __init__( + self, + index_array: Optional[np.ndarray], + groupby_indicies: Dict, + **kwargs, + ): + """ + Parameters + ---------- + **kwargs : + keyword arguments that will be available when get_window_bounds is called + """ + self.groupby_indicies = groupby_indicies + super().__init__(index_array, **kwargs) + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + # 1) For each group, get the indices that belong to the group + # 2) Use the indices to calculate the start & end bounds of the window + # 3) Append the window bounds in group order + start_arrays = [] + end_arrays = [] + window_indicies_start = 0 + for key, indices in self.groupby_indicies.items(): + start, end = ExpandingIndexer().get_window_bounds( + len(indices), min_periods, center, closed + ) + + # Cannot use groupby_indicies as they might not be monotonic with the object + # we're rolling over + window_indicies = np.arange( + window_indicies_start, window_indicies_start + len(indices) + ) + window_indicies_start += len(indices) + # Extend as we'll be slicing window like [start, end) + window_indicies = np.append( + window_indicies, [window_indicies[-1] + 1] + ).astype(np.int64) + start_arrays.append(window_indicies.take(ensure_platform_int(start))) + end_arrays.append(window_indicies.take(ensure_platform_int(end))) + start = np.concatenate(start_arrays) + end = np.concatenate(end_arrays) + # GH 35552: Need to adjust start and end based on the nans appended to values + # when center=True + if num_values > len(start): + offset = num_values - len(start) + start = np.concatenate([start, np.array([end[-1]] * offset)]) + end = np.concatenate([end, np.array([end[-1]] * offset)]) + return start, end From 37cefba215e2ad180188971a7d55df75cdcecccd Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 19:00:16 -0700 Subject: [PATCH 02/14] Impliment apply for ExpandingGroupby --- pandas/core/window/expanding.py | 70 ++++++++++++++++++++++++++++++++- pandas/core/window/indexers.py | 6 --- 2 files changed, 69 insertions(+), 7 deletions(-) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 319944fd48eae..7ef03275a7d5c 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -1,10 +1,13 @@ from textwrap import dedent -from typing import Dict, Optional +from typing import Callable, Dict, Optional from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, doc +import pandas.core.common as com +from pandas.core.indexes.api import MultiIndex from pandas.core.window.common import WindowGroupByMixin, _doc_template, _shared_docs +from pandas.core.window.indexers import GroupbyExpandingIndexer from pandas.core.window.rolling import RollingAndExpandingMixin @@ -256,3 +259,68 @@ class ExpandingGroupby(WindowGroupByMixin, Expanding): @property def _constructor(self): return Expanding + + def _apply( + self, + func: Callable, + center: bool, + require_min_periods: int = 0, + floor: int = 1, + is_weighted: bool = False, + name: Optional[str] = None, + use_numba_cache: bool = False, + **kwargs, + ): + result = Expanding._apply( + self, + func, + center, + require_min_periods, + floor, + is_weighted, + name, + use_numba_cache, + **kwargs, + ) + # Cannot use _wrap_outputs because we calculate the result all at once + # Compose MultiIndex result from grouping levels then rolling level + # Aggregate the MultiIndex data as tuples then the level names + grouped_object_index = self.obj.index + grouped_index_name = [*grouped_object_index.names] + groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings] + result_index_names = groupby_keys + grouped_index_name + + result_index_data = [] + for key, values in self._groupby.grouper.indices.items(): + for value in values: + data = [ + *com.maybe_make_list(key), + *com.maybe_make_list(grouped_object_index[value]), + ] + result_index_data.append(tuple(data)) + + result_index = MultiIndex.from_tuples( + result_index_data, names=result_index_names + ) + result.index = result_index + return result + + def _get_window_indexer(self, window: int) -> GroupbyExpandingIndexer: + """ + Return an indexer class that will compute the window start and end bounds + + Parameters + ---------- + window : int + window size for FixedWindowIndexer + + Returns + ------- + GroupbyRollingIndexer + """ + index_array = self.obj.index.asi8 + window_indexer = GroupbyExpandingIndexer( + index_array=index_array, + groupby_indicies=self._groupby.indices, + ) + return window_indexer diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index f8a7a321a832b..1f65bb7d5c24c 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -387,10 +387,4 @@ def get_window_bounds( end_arrays.append(window_indicies.take(ensure_platform_int(end))) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) - # GH 35552: Need to adjust start and end based on the nans appended to values - # when center=True - if num_values > len(start): - offset = num_values - len(start) - start = np.concatenate([start, np.array([end[-1]] * offset)]) - end = np.concatenate([end, np.array([end[-1]] * offset)]) return start, end From 77deb508832f9d4cf77e9354dd2d6ac5508d0ef8 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 21:01:53 -0700 Subject: [PATCH 03/14] Get most tests to work except numerical precision test --- pandas/core/window/expanding.py | 27 +++++++++++++++++++++++++++ pandas/core/window/indexers.py | 1 - 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 7ef03275a7d5c..51938cf689512 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -1,6 +1,9 @@ from textwrap import dedent from typing import Callable, Dict, Optional +import numpy as np + +from pandas._typing import FrameOrSeries from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, doc @@ -305,6 +308,20 @@ def _apply( result.index = result_index return result + def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: + """ + Split data into blocks & return conformed data. + """ + # Ensure the object we're rolling over is monotonically sorted relative + # to the groups + # GH 36197 + if not obj.empty: + groupby_order = np.concatenate( + list(self._groupby.grouper.indices.values()) + ).astype(np.int64) + obj = obj.take(groupby_order) + return super()._create_data(obj) + def _get_window_indexer(self, window: int) -> GroupbyExpandingIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -324,3 +341,13 @@ def _get_window_indexer(self, window: int) -> GroupbyExpandingIndexer: groupby_indicies=self._groupby.indices, ) return window_indexer + + def _get_cython_func_type(self, func: str) -> Callable: + """ + Return the cython function type. + + RollingGroupby needs to always use "variable" algorithms since processing + the data in group order may not be monotonic with the data which + "fixed" algorithms assume + """ + return self._get_roll_func(f"{func}_variable") diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index 1f65bb7d5c24c..400beb87d6a78 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -372,7 +372,6 @@ def get_window_bounds( start, end = ExpandingIndexer().get_window_bounds( len(indices), min_periods, center, closed ) - # Cannot use groupby_indicies as they might not be monotonic with the object # we're rolling over window_indicies = np.arange( From 43d26822b7a3bdde37603facee4b2eabc9e5ba71 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 21:05:29 -0700 Subject: [PATCH 04/14] Make signature of GroupbyExpanding indexer match base class --- pandas/core/window/expanding.py | 2 -- pandas/core/window/indexers.py | 9 +++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 51938cf689512..03c717fda696a 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -335,9 +335,7 @@ def _get_window_indexer(self, window: int) -> GroupbyExpandingIndexer: ------- GroupbyRollingIndexer """ - index_array = self.obj.index.asi8 window_indexer = GroupbyExpandingIndexer( - index_array=index_array, groupby_indicies=self._groupby.indices, ) return window_indexer diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index 400beb87d6a78..eeaad9f3c7ca3 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -341,8 +341,9 @@ class GroupbyExpandingIndexer(BaseIndexer): def __init__( self, - index_array: Optional[np.ndarray], - groupby_indicies: Dict, + index_array: Optional[np.ndarray] = None, + window_size: int = 0, + groupby_indicies: Optional[Dict] = None, **kwargs, ): """ @@ -351,8 +352,8 @@ def __init__( **kwargs : keyword arguments that will be available when get_window_bounds is called """ - self.groupby_indicies = groupby_indicies - super().__init__(index_array, **kwargs) + self.groupby_indicies = groupby_indicies or {} + super().__init__(index_array, window_size, **kwargs) @Appender(get_window_bounds_doc) def get_window_bounds( From 409bda8824771bd58baa23bdd770790773373e0d Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 21:41:34 -0700 Subject: [PATCH 05/14] Share groupbyindexer implementation --- pandas/core/window/expanding.py | 11 +++-- pandas/core/window/indexers.py | 84 +++++++++------------------------ pandas/core/window/rolling.py | 10 ++-- 3 files changed, 32 insertions(+), 73 deletions(-) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 03c717fda696a..599809b6b346e 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -10,7 +10,7 @@ import pandas.core.common as com from pandas.core.indexes.api import MultiIndex from pandas.core.window.common import WindowGroupByMixin, _doc_template, _shared_docs -from pandas.core.window.indexers import GroupbyExpandingIndexer +from pandas.core.window.indexers import ExpandingIndexer, GroupbyIndexer from pandas.core.window.rolling import RollingAndExpandingMixin @@ -322,21 +322,22 @@ def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: obj = obj.take(groupby_order) return super()._create_data(obj) - def _get_window_indexer(self, window: int) -> GroupbyExpandingIndexer: + def _get_window_indexer(self, window: int) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds Parameters ---------- window : int - window size for FixedWindowIndexer + window size for FixedWindowIndexer (unused) Returns ------- - GroupbyRollingIndexer + GroupbyIndexer """ - window_indexer = GroupbyExpandingIndexer( + window_indexer = GroupbyIndexer( groupby_indicies=self._groupby.indices, + window_indexer=ExpandingIndexer, ) return window_indexer diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index eeaad9f3c7ca3..62e953c09327c 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -259,26 +259,38 @@ def get_window_bounds( return start, end -class GroupbyRollingIndexer(BaseIndexer): +class GroupbyIndexer(BaseIndexer): """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" def __init__( self, - index_array: Optional[np.ndarray], - window_size: int, - groupby_indicies: Dict, - rolling_indexer: Type[BaseIndexer], - indexer_kwargs: Optional[Dict], + index_array: Optional[np.ndarray] = None, + window_size: int = 0, + groupby_indicies: Optional[Dict] = None, + window_indexer: Type[BaseIndexer] = BaseIndexer, + indexer_kwargs: Optional[Dict] = None, **kwargs, ): """ Parameters ---------- + index_array : np.ndarray or None + np.ndarray of the index of the original object that we are performing + a chained groupby operation over. This index has been pre-sorted relative to + the groups + window_size : int + window size during the windowing operation + groupby_indicies : dict or None + dict of {group label: [positional index of rows belonging to the group]} + window_indexer : BaseIndexer + BaseIndexer class determining the start and end bounds of each group + indexer_kwargs : dict or None + Custom kwargs to be passed to window_indexer **kwargs : keyword arguments that will be available when get_window_bounds is called """ - self.groupby_indicies = groupby_indicies - self.rolling_indexer = rolling_indexer + self.groupby_indicies = groupby_indicies or {} + self.window_indexer = window_indexer self.indexer_kwargs = indexer_kwargs or {} super().__init__( index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs @@ -303,7 +315,7 @@ def get_window_bounds( index_array = self.index_array.take(ensure_platform_int(indices)) else: index_array = self.index_array - indexer = self.rolling_indexer( + indexer = self.window_indexer( index_array=index_array, window_size=self.window_size, **self.indexer_kwargs, @@ -334,57 +346,3 @@ def get_window_bounds( start = np.concatenate([start, np.array([end[-1]] * offset)]) end = np.concatenate([end, np.array([end[-1]] * offset)]) return start, end - - -class GroupbyExpandingIndexer(BaseIndexer): - """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" - - def __init__( - self, - index_array: Optional[np.ndarray] = None, - window_size: int = 0, - groupby_indicies: Optional[Dict] = None, - **kwargs, - ): - """ - Parameters - ---------- - **kwargs : - keyword arguments that will be available when get_window_bounds is called - """ - self.groupby_indicies = groupby_indicies or {} - super().__init__(index_array, window_size, **kwargs) - - @Appender(get_window_bounds_doc) - def get_window_bounds( - self, - num_values: int = 0, - min_periods: Optional[int] = None, - center: Optional[bool] = None, - closed: Optional[str] = None, - ) -> Tuple[np.ndarray, np.ndarray]: - # 1) For each group, get the indices that belong to the group - # 2) Use the indices to calculate the start & end bounds of the window - # 3) Append the window bounds in group order - start_arrays = [] - end_arrays = [] - window_indicies_start = 0 - for key, indices in self.groupby_indicies.items(): - start, end = ExpandingIndexer().get_window_bounds( - len(indices), min_periods, center, closed - ) - # Cannot use groupby_indicies as they might not be monotonic with the object - # we're rolling over - window_indicies = np.arange( - window_indicies_start, window_indicies_start + len(indices) - ) - window_indicies_start += len(indices) - # Extend as we'll be slicing window like [start, end) - window_indicies = np.append( - window_indicies, [window_indicies[-1] + 1] - ).astype(np.int64) - start_arrays.append(window_indicies.take(ensure_platform_int(start))) - end_arrays.append(window_indicies.take(ensure_platform_int(end))) - start = np.concatenate(start_arrays) - end = np.concatenate(end_arrays) - return start, end diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 335fc3db5cd86..4aab8bdeabe22 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -62,7 +62,7 @@ from pandas.core.window.indexers import ( BaseIndexer, FixedWindowIndexer, - GroupbyRollingIndexer, + GroupbyIndexer, VariableWindowIndexer, ) from pandas.core.window.numba_ import generate_numba_apply_func @@ -2253,7 +2253,7 @@ def _get_cython_func_type(self, func: str) -> Callable: """ return self._get_roll_func(f"{func}_variable") - def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: + def _get_window_indexer(self, window: int) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -2264,7 +2264,7 @@ def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: Returns ------- - GroupbyRollingIndexer + GroupbyIndexer """ rolling_indexer: Type[BaseIndexer] indexer_kwargs: Optional[Dict] = None @@ -2280,11 +2280,11 @@ def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: else: rolling_indexer = FixedWindowIndexer index_array = None - window_indexer = GroupbyRollingIndexer( + window_indexer = GroupbyIndexer( index_array=index_array, window_size=window, groupby_indicies=self._groupby.indices, - rolling_indexer=rolling_indexer, + window_indexer=rolling_indexer, indexer_kwargs=indexer_kwargs, ) return window_indexer From 0dacaa5a7896d5c63cb9cb3cbd027e9c97f40ea0 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 9 Oct 2020 16:52:52 -0700 Subject: [PATCH 06/14] Remove need to select variable algorithm --- pandas/core/window/expanding.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 599809b6b346e..b9afd1da6eb99 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -340,13 +340,3 @@ def _get_window_indexer(self, window: int) -> GroupbyIndexer: window_indexer=ExpandingIndexer, ) return window_indexer - - def _get_cython_func_type(self, func: str) -> Callable: - """ - Return the cython function type. - - RollingGroupby needs to always use "variable" algorithms since processing - the data in group order may not be monotonic with the data which - "fixed" algorithms assume - """ - return self._get_roll_func(f"{func}_variable") From ed211e7da04aa67fc795a915f075e7297f370cde Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 00:39:26 -0700 Subject: [PATCH 07/14] Create new subclass for shared window groupby algorithms --- pandas/_libs/window/aggregations.pyx | 2 +- pandas/core/window/common.py | 65 --------- pandas/core/window/expanding.py | 74 +---------- pandas/core/window/rolling.py | 192 +++++++++++++++------------ 4 files changed, 115 insertions(+), 218 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 937f7d8df7728..bba8b4b2432a9 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -302,7 +302,7 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs, result = ssqdm_x / (nobs - ddof) # Fix for numerical imprecision. # Can be result < 0 once Kahan Summation is implemented - if result < 1e-15: + if result < 1e-14: result = 0 else: result = NaN diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index aa71c44f75ead..938f1846230cb 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -1,13 +1,11 @@ """Common utility functions for rolling operations""" from collections import defaultdict -from typing import Callable, Optional import warnings import numpy as np from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries -from pandas.core.groupby.base import GotItemMixin from pandas.core.indexes.api import MultiIndex from pandas.core.shared_docs import _shared_docs @@ -27,69 +25,6 @@ """ -def _dispatch(name: str, *args, **kwargs): - """ - Dispatch to apply. - """ - - def outer(self, *args, **kwargs): - def f(x): - x = self._shallow_copy(x, groupby=self._groupby) - return getattr(x, name)(*args, **kwargs) - - return self._groupby.apply(f) - - outer.__name__ = name - return outer - - -class WindowGroupByMixin(GotItemMixin): - """ - Provide the groupby facilities. - """ - - def __init__(self, obj, *args, **kwargs): - kwargs.pop("parent", None) - groupby = kwargs.pop("groupby", None) - if groupby is None: - groupby, obj = obj, obj._selected_obj - self._groupby = groupby - self._groupby.mutated = True - self._groupby.grouper.mutated = True - super().__init__(obj, *args, **kwargs) - - corr = _dispatch("corr", other=None, pairwise=None) - cov = _dispatch("cov", other=None, pairwise=None) - - def _apply( - self, - func: Callable, - require_min_periods: int = 0, - floor: int = 1, - is_weighted: bool = False, - name: Optional[str] = None, - use_numba_cache: bool = False, - **kwargs, - ): - """ - Dispatch to apply; we are stripping all of the _apply kwargs and - performing the original function call on the grouped object. - """ - kwargs.pop("floor", None) - kwargs.pop("original_func", None) - - # TODO: can we de-duplicate with _dispatch? - def f(x, name=name, *args): - x = self._shallow_copy(x) - - if isinstance(name, str): - return getattr(x, name)(*args, **kwargs) - - return x.apply(name, *args, **kwargs) - - return self._groupby.apply(f) - - def flex_binary_moment(arg1, arg2, f, pairwise=False): if not ( diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index b9afd1da6eb99..f5e909190f4ff 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -1,17 +1,14 @@ from textwrap import dedent -from typing import Callable, Dict, Optional +from typing import Dict, Optional import numpy as np -from pandas._typing import FrameOrSeries from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, doc -import pandas.core.common as com -from pandas.core.indexes.api import MultiIndex -from pandas.core.window.common import WindowGroupByMixin, _doc_template, _shared_docs +from pandas.core.window.common import _doc_template, _shared_docs from pandas.core.window.indexers import ExpandingIndexer, GroupbyIndexer -from pandas.core.window.rolling import RollingAndExpandingMixin +from pandas.core.window.rolling import BaseWindowGroupby, RollingAndExpandingMixin class Expanding(RollingAndExpandingMixin): @@ -254,74 +251,11 @@ def corr(self, other=None, pairwise=None, **kwargs): return super().corr(other=other, pairwise=pairwise, **kwargs) -class ExpandingGroupby(WindowGroupByMixin, Expanding): +class ExpandingGroupby(BaseWindowGroupby, Expanding): """ Provide a expanding groupby implementation. """ - @property - def _constructor(self): - return Expanding - - def _apply( - self, - func: Callable, - center: bool, - require_min_periods: int = 0, - floor: int = 1, - is_weighted: bool = False, - name: Optional[str] = None, - use_numba_cache: bool = False, - **kwargs, - ): - result = Expanding._apply( - self, - func, - center, - require_min_periods, - floor, - is_weighted, - name, - use_numba_cache, - **kwargs, - ) - # Cannot use _wrap_outputs because we calculate the result all at once - # Compose MultiIndex result from grouping levels then rolling level - # Aggregate the MultiIndex data as tuples then the level names - grouped_object_index = self.obj.index - grouped_index_name = [*grouped_object_index.names] - groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings] - result_index_names = groupby_keys + grouped_index_name - - result_index_data = [] - for key, values in self._groupby.grouper.indices.items(): - for value in values: - data = [ - *com.maybe_make_list(key), - *com.maybe_make_list(grouped_object_index[value]), - ] - result_index_data.append(tuple(data)) - - result_index = MultiIndex.from_tuples( - result_index_data, names=result_index_names - ) - result.index = result_index - return result - - def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: - """ - Split data into blocks & return conformed data. - """ - # Ensure the object we're rolling over is monotonically sorted relative - # to the groups - # GH 36197 - if not obj.empty: - groupby_order = np.concatenate( - list(self._groupby.grouper.indices.values()) - ).astype(np.int64) - obj = obj.take(groupby_order) - return super()._create_data(obj) - def _get_window_indexer(self, window: int) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 7c0e2042a1dc7..42da59eda8106 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -51,11 +51,10 @@ from pandas.core.base import DataError, SelectionMixin import pandas.core.common as com from pandas.core.construction import extract_array -from pandas.core.groupby.base import ShallowMixin +from pandas.core.groupby.base import GotItemMixin, ShallowMixin from pandas.core.indexes.api import Index, MultiIndex from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import ( - WindowGroupByMixin, _doc_template, _shared_docs, flex_binary_moment, @@ -855,6 +854,114 @@ def aggregate(self, func, *args, **kwargs): ) +def _dispatch(name: str, *args, **kwargs): + """ + Dispatch to groupby apply. + """ + + def outer(self, *args, **kwargs): + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return getattr(x, name)(*args, **kwargs) + + return self._groupby.apply(f) + + outer.__name__ = name + return outer + + +class BaseWindowGroupby(GotItemMixin, BaseWindow): + """ + Provide the groupby windowing facilies. + """ + + def __init__(self, obj, *args, **kwargs): + kwargs.pop("parent", None) + groupby = kwargs.pop("groupby", None) + if groupby is None: + groupby, obj = obj, obj._selected_obj + self._groupby = groupby + self._groupby.mutated = True + self._groupby.grouper.mutated = True + super().__init__(obj, *args, **kwargs) + + corr = _dispatch("corr", other=None, pairwise=None) + cov = _dispatch("cov", other=None, pairwise=None) + + def _apply( + self, + func: Callable, + require_min_periods: int = 0, + floor: int = 1, + is_weighted: bool = False, + name: Optional[str] = None, + use_numba_cache: bool = False, + **kwargs, + ): + result = super()._apply( + func, + require_min_periods, + floor, + is_weighted, + name, + use_numba_cache, + **kwargs, + ) + # Compose MultiIndex result from grouping levels then rolling level + # Aggregate the MultiIndex data as tuples then the level names + grouped_object_index = self.obj.index + grouped_index_name = [*grouped_object_index.names] + groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings] + result_index_names = groupby_keys + grouped_index_name + + result_index_data = [] + for key, values in self._groupby.grouper.indices.items(): + for value in values: + data = [ + *com.maybe_make_list(key), + *com.maybe_make_list(grouped_object_index[value]), + ] + result_index_data.append(tuple(data)) + + result_index = MultiIndex.from_tuples( + result_index_data, names=result_index_names + ) + result.index = result_index + return result + + def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: + """ + Split data into blocks & return conformed data. + """ + # Ensure the object we're rolling over is monotonically sorted relative + # to the groups + # GH 36197 + if not obj.empty: + groupby_order = np.concatenate( + list(self._groupby.grouper.indices.values()) + ).astype(np.int64) + obj = obj.take(groupby_order) + return super()._create_data(obj) + + def _gotitem(self, key, ndim, subset=None): + # we are setting the index on the actual object + # here so our index is carried through to the selected obj + # when we do the splitting for the groupby + if self.on is not None: + self.obj = self.obj.set_index(self._on) + self.on = None + return super()._gotitem(key, ndim, subset=subset) + + def _validate_monotonic(self): + """ + Validate that on is monotonic; + we don't care for groupby.rolling + because we have already validated at a higher + level. + """ + pass + + class Window(BaseWindow): """ Provide rolling window calculations. @@ -2134,72 +2241,11 @@ def corr(self, other=None, pairwise=None, **kwargs): Rolling.__doc__ = Window.__doc__ -class RollingGroupby(WindowGroupByMixin, Rolling): +class RollingGroupby(BaseWindowGroupby, Rolling): """ Provide a rolling groupby implementation. """ - def _apply( - self, - func: Callable, - require_min_periods: int = 0, - floor: int = 1, - is_weighted: bool = False, - name: Optional[str] = None, - use_numba_cache: bool = False, - **kwargs, - ): - result = Rolling._apply( - self, - func, - require_min_periods, - floor, - is_weighted, - name, - use_numba_cache, - **kwargs, - ) - # Cannot use _wrap_outputs because we calculate the result all at once - # Compose MultiIndex result from grouping levels then rolling level - # Aggregate the MultiIndex data as tuples then the level names - grouped_object_index = self.obj.index - grouped_index_name = [*grouped_object_index.names] - groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings] - result_index_names = groupby_keys + grouped_index_name - - result_index_data = [] - for key, values in self._groupby.grouper.indices.items(): - for value in values: - data = [ - *com.maybe_make_list(key), - *com.maybe_make_list(grouped_object_index[value]), - ] - result_index_data.append(tuple(data)) - - result_index = MultiIndex.from_tuples( - result_index_data, names=result_index_names - ) - result.index = result_index - return result - - @property - def _constructor(self): - return Rolling - - def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: - """ - Split data into blocks & return conformed data. - """ - # Ensure the object we're rolling over is monotonically sorted relative - # to the groups - # GH 36197 - if not obj.empty: - groupby_order = np.concatenate( - list(self._groupby.grouper.indices.values()) - ).astype(np.int64) - obj = obj.take(groupby_order) - return super()._create_data(obj) - def _get_window_indexer(self, window: int) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -2235,21 +2281,3 @@ def _get_window_indexer(self, window: int) -> GroupbyIndexer: indexer_kwargs=indexer_kwargs, ) return window_indexer - - def _gotitem(self, key, ndim, subset=None): - # we are setting the index on the actual object - # here so our index is carried thru to the selected obj - # when we do the splitting for the groupby - if self.on is not None: - self.obj = self.obj.set_index(self._on) - self.on = None - return super()._gotitem(key, ndim, subset=subset) - - def _validate_monotonic(self): - """ - Validate that on is monotonic; - we don't care for groupby.rolling - because we have already validated at a higher - level. - """ - pass From 57471bea82a011ab2cda39a8817c1842ab5e5039 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 00:43:10 -0700 Subject: [PATCH 08/14] Add expanding groupby benchmark --- asv_bench/benchmarks/rolling.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index f0dd908f81043..ec917d2c027c4 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -78,10 +78,16 @@ def setup(self, constructor, dtype, method): N = 10 ** 5 arr = (100 * np.random.random(N)).astype(dtype) self.expanding = getattr(pd, constructor)(arr).expanding() + self.expanding_groupby = ( + getattr(pd, constructor)({"A": arr, "B": range(N)}).groupby("B").expanding() + ) def time_expanding(self, constructor, dtype, method): getattr(self.expanding, method)() + def time_expanding_groupby(self, constructor, dtype, method): + getattr(self.expanding_groupby, method)() + class EWMMethods: From 608e7d6735b5fd6c150c983fc4eff15bf1ee3115 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 16:44:28 -0700 Subject: [PATCH 09/14] Add benchmarks --- asv_bench/benchmarks/rolling.py | 5 ++++- pandas/core/window/rolling.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index ec917d2c027c4..226b225b47591 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -76,10 +76,13 @@ class ExpandingMethods: def setup(self, constructor, dtype, method): N = 10 ** 5 + N_groupby = 100 arr = (100 * np.random.random(N)).astype(dtype) self.expanding = getattr(pd, constructor)(arr).expanding() self.expanding_groupby = ( - getattr(pd, constructor)({"A": arr, "B": range(N)}).groupby("B").expanding() + pd.DataFrame({"A": arr[:N_groupby], "B": range(N_groupby)}) + .groupby("B") + .expanding() ) def time_expanding(self, constructor, dtype, method): diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 42da59eda8106..47507547e33b7 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -872,7 +872,7 @@ def f(x): class BaseWindowGroupby(GotItemMixin, BaseWindow): """ - Provide the groupby windowing facilies. + Provide the groupby windowing facilities. """ def __init__(self, obj, *args, **kwargs): From 859541e596df72ad790ace5916f930d684c0444d Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 17:07:39 -0700 Subject: [PATCH 10/14] Remove unneeded args passed to _apply --- pandas/core/window/rolling.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 47507547e33b7..3b8bb11372076 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -954,10 +954,7 @@ def _gotitem(self, key, ndim, subset=None): def _validate_monotonic(self): """ - Validate that on is monotonic; - we don't care for groupby.rolling - because we have already validated at a higher - level. + Validate that "on" is monotonic; already validated at a higher level. """ pass @@ -1455,13 +1452,10 @@ def apply( else: raise ValueError("engine must be either 'numba' or 'cython'") - # name=func & raw=raw for WindowGroupByMixin._apply return self._apply( apply_func, floor=0, - name=func, use_numba_cache=maybe_use_numba(engine), - raw=raw, original_func=func, args=args, kwargs=kwargs, @@ -1605,12 +1599,10 @@ def std(self, ddof=1, *args, **kwargs): def zsqrt_func(values, begin, end, min_periods): return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof)) - # ddof passed again for compat with groupby.rolling return self._apply( zsqrt_func, require_min_periods=1, name="std", - ddof=ddof, **kwargs, ) @@ -1618,12 +1610,10 @@ def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) kwargs.pop("require_min_periods", None) window_func = partial(self._get_roll_func("roll_var"), ddof=ddof) - # ddof passed again for compat with groupby.rolling return self._apply( window_func, require_min_periods=1, name="var", - ddof=ddof, **kwargs, ) From fc43e23b6a10287ea491d373033e0e0fcc9dd546 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 17:40:32 -0700 Subject: [PATCH 11/14] Remove unnecessary poping of kwargs --- pandas/core/window/rolling.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 3b8bb11372076..5a3996c94957a 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1436,8 +1436,7 @@ def apply( args = () if kwargs is None: kwargs = {} - kwargs.pop("_level", None) - kwargs.pop("floor", None) + if not is_bool(raw): raise ValueError("raw parameter must be `True` or `False`") @@ -1482,7 +1481,6 @@ def apply_func(values, begin, end, min_periods, raw=raw): def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) window_func = self._get_roll_func("roll_sum") - kwargs.pop("floor", None) return self._apply(window_func, floor=0, name="sum", **kwargs) _shared_docs["max"] = dedent( @@ -1593,7 +1591,6 @@ def median(self, **kwargs): def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) - kwargs.pop("require_min_periods", None) window_func = self._get_roll_func("roll_var") def zsqrt_func(values, begin, end, min_periods): @@ -1608,7 +1605,6 @@ def zsqrt_func(values, begin, end, min_periods): def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) - kwargs.pop("require_min_periods", None) window_func = partial(self._get_roll_func("roll_var"), ddof=ddof) return self._apply( window_func, @@ -1630,7 +1626,6 @@ def var(self, ddof=1, *args, **kwargs): def skew(self, **kwargs): window_func = self._get_roll_func("roll_skew") - kwargs.pop("require_min_periods", None) return self._apply( window_func, require_min_periods=3, @@ -1672,7 +1667,6 @@ def skew(self, **kwargs): def kurt(self, **kwargs): window_func = self._get_roll_func("roll_kurt") - kwargs.pop("require_min_periods", None) return self._apply( window_func, require_min_periods=4, From bab0a1d394a02bd9f404d4dab1c0e5a9b221f236 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 17:41:33 -0700 Subject: [PATCH 12/14] Add whatsnew --- doc/source/whatsnew/v1.2.0.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index bd3112403b31b..8b92751828090 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -313,6 +313,7 @@ Performance improvements avoiding creating these again, if created on either. This can speed up operations that depend on creating copies of existing indexes (:issue:`36840`) - Performance improvement in :meth:`RollingGroupby.count` (:issue:`35625`) - Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`) +- Performance improvement in :class:`ExpandingGroupby` (:issue:``) .. --------------------------------------------------------------------------- From 4babf565bd72bf8d46210d8848b6a5f7b0427fb9 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 17:52:14 -0700 Subject: [PATCH 13/14] Add whatsnew number --- doc/source/whatsnew/v1.2.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 8b92751828090..961dc1187ca61 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -313,7 +313,7 @@ Performance improvements avoiding creating these again, if created on either. This can speed up operations that depend on creating copies of existing indexes (:issue:`36840`) - Performance improvement in :meth:`RollingGroupby.count` (:issue:`35625`) - Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`) -- Performance improvement in :class:`ExpandingGroupby` (:issue:``) +- Performance improvement in :class:`ExpandingGroupby` (:issue:`37064`) .. --------------------------------------------------------------------------- From 5788af9bc25f8448ee90899ce4f888cd5cf524ab Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 11 Oct 2020 18:46:07 -0700 Subject: [PATCH 14/14] Lint --- pandas/core/window/expanding.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index f5e909190f4ff..c75e81fc8335a 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -1,8 +1,6 @@ from textwrap import dedent from typing import Dict, Optional -import numpy as np - from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, doc