diff --git a/pandas/_libs/window.pyx b/pandas/_libs/window.pyx index 0a986942d2a09..8de593ce36c86 100644 --- a/pandas/_libs/window.pyx +++ b/pandas/_libs/window.pyx @@ -1675,9 +1675,22 @@ def roll_generic(object obj, return output -def roll_window(ndarray[float64_t, ndim=1, cast=True] values, - ndarray[float64_t, ndim=1, cast=True] weights, - int minp, bint avg=True): +# ---------------------------------------------------------------------- +# Rolling sum and mean for weighted window + + +def roll_weighted_sum(float64_t[:] values, float64_t[:] weights, + int minp): + return _roll_weighted_sum_mean(values, weights, minp, avg=0) + + +def roll_weighted_mean(float64_t[:] values, float64_t[:] weights, + int minp): + return _roll_weighted_sum_mean(values, weights, minp, avg=1) + + +def _roll_weighted_sum_mean(float64_t[:] values, float64_t[:] weights, + int minp, bint avg): """ Assume len(weights) << len(values) """ @@ -1688,6 +1701,7 @@ def roll_window(ndarray[float64_t, ndim=1, cast=True] values, in_n = len(values) win_n = len(weights) + output = np.zeros(in_n, dtype=float) counts = np.zeros(in_n, dtype=float) if avg: @@ -1739,6 +1753,7 @@ def roll_window(ndarray[float64_t, ndim=1, cast=True] values, return output + # ---------------------------------------------------------------------- # Exponentially weighted moving average diff --git a/pandas/core/window.py b/pandas/core/window.py index 4721d6cfc6dda..13e1c8ec34275 100644 --- a/pandas/core/window.py +++ b/pandas/core/window.py @@ -5,7 +5,7 @@ from collections import defaultdict from datetime import timedelta from textwrap import dedent -from typing import List, Optional, Set +from typing import Callable, List, Optional, Set, Union import warnings import numpy as np @@ -35,7 +35,7 @@ ABCTimedeltaIndex, ) -from pandas._typing import Axis, FrameOrSeries +from pandas._typing import Axis, FrameOrSeries, Scalar from pandas.core.base import DataError, PandasObject, SelectionMixin import pandas.core.common as com from pandas.core.generic import _shared_docs @@ -173,7 +173,19 @@ def __getattr__(self, attr): def _dir_additions(self): return self.obj._dir_additions() - def _get_window(self, other=None): + def _get_window(self, other=None, **kwargs) -> int: + """ + Returns window lenght + + Parameters + ---------- + other: + ignored, exists for compatibility + + Returns + ------- + window : int + """ return self.window @property @@ -200,7 +212,7 @@ def __iter__(self): def _get_index(self) -> Optional[np.ndarray]: """ - Return index as an ndarray. + Return integer representations as an ndarray if index is frequency. Returns ------- @@ -341,6 +353,138 @@ def _center_window(self, result, window) -> np.ndarray: result = np.copy(result[tuple(lead_indexer)]) return result + def _get_roll_func( + self, cfunc: Callable, check_minp: Callable, index: np.ndarray, **kwargs + ) -> Callable: + """ + Wrap rolling function to check values passed. + + Parameters + ---------- + cfunc : callable + Cython function used to calculate rolling statistics + check_minp : callable + function to check minimum period parameter + index : ndarray + used for variable window + + Returns + ------- + func : callable + """ + + def func(arg, window, min_periods=None, closed=None): + minp = check_minp(min_periods, window) + return cfunc(arg, window, minp, index, closed, **kwargs) + + return func + + def _apply( + self, + func: Union[str, Callable], + name: Optional[str] = None, + window: Optional[Union[int, str]] = None, + center: Optional[bool] = None, + check_minp: Optional[Callable] = None, + **kwargs + ): + """ + Rolling statistical measure using supplied function. + + Designed to be used with passed-in Cython array-based functions. + + Parameters + ---------- + func : str/callable to apply + name : str, optional + name of this function + window : int/str, default to _get_window() + window lenght or offset + center : bool, default to self.center + check_minp : function, default to _use_window + **kwargs + additional arguments for rolling function and window function + + Returns + ------- + y : type of input + """ + if center is None: + center = self.center + + if check_minp is None: + check_minp = _use_window + + if window is None: + window = self._get_window(**kwargs) + + blocks, obj = self._create_blocks() + block_list = list(blocks) + index_as_array = self._get_index() + + results = [] + exclude = [] # type: List[Scalar] + for i, b in enumerate(blocks): + try: + values = self._prep_values(b.values) + + except (TypeError, NotImplementedError): + if isinstance(obj, ABCDataFrame): + exclude.extend(b.columns) + del block_list[i] + continue + else: + raise DataError("No numeric types to aggregate") + + if values.size == 0: + results.append(values.copy()) + continue + + # if we have a string function name, wrap it + if isinstance(func, str): + cfunc = getattr(libwindow, func, None) + if cfunc is None: + raise ValueError( + "we do not support this function " + "in libwindow.{func}".format(func=func) + ) + + func = self._get_roll_func(cfunc, check_minp, index_as_array, **kwargs) + + # calculation function + if center: + offset = _offset(window, center) + additional_nans = np.array([np.NaN] * offset) + + def calc(x): + return func( + np.concatenate((x, additional_nans)), + window, + min_periods=self.min_periods, + closed=self.closed, + ) + + else: + + def calc(x): + return func( + x, window, min_periods=self.min_periods, closed=self.closed + ) + + with np.errstate(all="ignore"): + if values.ndim > 1: + result = np.apply_along_axis(calc, self.axis, values) + else: + result = calc(values) + result = np.asarray(result) + + if center: + result = self._center_window(result, window) + + results.append(result) + + return self._wrap_results(results, block_list, obj, exclude) + def aggregate(self, func, *args, **kwargs): result, how = self._aggregate(func, *args, **kwargs) if result is None: @@ -645,13 +789,23 @@ def validate(self): else: raise ValueError("Invalid window {0}".format(window)) - def _prep_window(self, **kwargs): + def _get_window(self, other=None, **kwargs) -> np.ndarray: """ - Provide validation for our window type, return the window - we have already been validated. + Provide validation for the window type, return the window + which has already been validated. + + Parameters + ---------- + other: + ignored, exists for compatibility + + Returns + ------- + window : ndarray + the window, weights """ - window = self._get_window() + window = self.window if isinstance(window, (list, tuple, np.ndarray)): return com.asarray_tuplesafe(window).astype(float) elif is_integer(window): @@ -691,63 +845,14 @@ def _pop_args(win_type, arg_names, kwargs): # GH #15662. `False` makes symmetric window, rather than periodic. return sig.get_window(win_type, window, False).astype(float) - def _apply_window(self, mean=True, **kwargs): - """ - Applies a moving window of type ``window_type`` on the data. - - Parameters - ---------- - mean : bool, default True - If True computes weighted mean, else weighted sum - - Returns - ------- - y : same type as input argument - - """ - window = self._prep_window(**kwargs) - center = self.center - - blocks, obj = self._create_blocks() - block_list = list(blocks) - - results = [] - exclude = [] - for i, b in enumerate(blocks): - try: - values = self._prep_values(b.values) - - except (TypeError, NotImplementedError): - if isinstance(obj, ABCDataFrame): - exclude.extend(b.columns) - del block_list[i] - continue - else: - raise DataError("No numeric types to aggregate") - - if values.size == 0: - results.append(values.copy()) - continue - - offset = _offset(window, center) - additional_nans = np.array([np.NaN] * offset) - - def f(arg, *args, **kwargs): - minp = _use_window(self.min_periods, len(window)) - return libwindow.roll_window( - np.concatenate((arg, additional_nans)) if center else arg, - window, - minp, - avg=mean, - ) - - result = np.apply_along_axis(f, self.axis, values) - - if center: - result = self._center_window(result, window) - results.append(result) + def _get_roll_func( + self, cfunc: Callable, check_minp: Callable, index: np.ndarray, **kwargs + ) -> Callable: + def func(arg, window, min_periods=None, closed=None): + minp = check_minp(min_periods, len(window)) + return cfunc(arg, window, minp) - return self._wrap_results(results, block_list, obj, exclude) + return func _agg_see_also_doc = dedent( """ @@ -815,13 +920,13 @@ def aggregate(self, arg, *args, **kwargs): @Appender(_shared_docs["sum"]) def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) - return self._apply_window(mean=False, **kwargs) + return self._apply("roll_weighted_sum", **kwargs) @Substitution(name="window") @Appender(_shared_docs["mean"]) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) - return self._apply_window(mean=True, **kwargs) + return self._apply("roll_weighted_mean", **kwargs) class _GroupByMixin(GroupByMixin): @@ -867,105 +972,6 @@ class _Rolling(_Window): def _constructor(self): return Rolling - def _apply( - self, func, name=None, window=None, center=None, check_minp=None, **kwargs - ): - """ - Rolling statistical measure using supplied function. - - Designed to be used with passed-in Cython array-based functions. - - Parameters - ---------- - func : str/callable to apply - name : str, optional - name of this function - window : int/array, default to _get_window() - center : bool, default to self.center - check_minp : function, default to _use_window - - Returns - ------- - y : type of input - """ - if center is None: - center = self.center - if window is None: - window = self._get_window() - - if check_minp is None: - check_minp = _use_window - - blocks, obj = self._create_blocks() - block_list = list(blocks) - index_as_array = self._get_index() - - results = [] - exclude = [] - for i, b in enumerate(blocks): - try: - values = self._prep_values(b.values) - - except (TypeError, NotImplementedError): - if isinstance(obj, ABCDataFrame): - exclude.extend(b.columns) - del block_list[i] - continue - else: - raise DataError("No numeric types to aggregate") - - if values.size == 0: - results.append(values.copy()) - continue - - # if we have a string function name, wrap it - if isinstance(func, str): - cfunc = getattr(libwindow, func, None) - if cfunc is None: - raise ValueError( - "we do not support this function " - "in libwindow.{func}".format(func=func) - ) - - def func(arg, window, min_periods=None, closed=None): - minp = check_minp(min_periods, window) - # ensure we are only rolling on floats - arg = ensure_float64(arg) - return cfunc(arg, window, minp, index_as_array, closed, **kwargs) - - # calculation function - if center: - offset = _offset(window, center) - additional_nans = np.array([np.NaN] * offset) - - def calc(x): - return func( - np.concatenate((x, additional_nans)), - window, - min_periods=self.min_periods, - closed=self.closed, - ) - - else: - - def calc(x): - return func( - x, window, min_periods=self.min_periods, closed=self.closed - ) - - with np.errstate(all="ignore"): - if values.ndim > 1: - result = np.apply_along_axis(calc, self.axis, values) - else: - result = calc(values) - - if center: - result = self._center_window(result, window) - - results.append(result) - - return self._wrap_results(results, block_list, obj, exclude) - class _Rolling_and_Expanding(_Rolling): @@ -2028,7 +2034,7 @@ def __init__(self, obj, min_periods=1, center=False, axis=0, **kwargs): def _constructor(self): return Expanding - def _get_window(self, other=None): + def _get_window(self, other=None, **kwargs): """ Get the window length over which to perform some operation.