diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index 97294fc02834b..3d2273b6d7324 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -180,6 +180,33 @@ def time_quantile(self, constructor, window, dtype, percentile, interpolation): self.roll.quantile(percentile, interpolation=interpolation) +class Rank: + params = ( + ["DataFrame", "Series"], + [10, 1000], + ["int", "float"], + [True, False], + [True, False], + ["min", "max", "average"], + ) + param_names = [ + "constructor", + "window", + "dtype", + "percentile", + "ascending", + "method", + ] + + def setup(self, constructor, window, dtype, percentile, ascending, method): + N = 10 ** 5 + arr = np.random.random(N).astype(dtype) + self.roll = getattr(pd, constructor)(arr).rolling(window) + + def time_rank(self, constructor, window, dtype, percentile, ascending, method): + self.roll.rank(pct=percentile, ascending=ascending, method=method) + + class PeakMemFixedWindowMinMax: params = ["min", "max"] diff --git a/doc/source/reference/window.rst b/doc/source/reference/window.rst index a255b3ae8081e..5e230a533625f 100644 --- a/doc/source/reference/window.rst +++ b/doc/source/reference/window.rst @@ -35,6 +35,7 @@ Rolling window functions Rolling.aggregate Rolling.quantile Rolling.sem + Rolling.rank .. _api.functions_window: @@ -75,6 +76,7 @@ Expanding window functions Expanding.aggregate Expanding.quantile Expanding.sem + Expanding.rank .. _api.functions_ewm: diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 8755ae851d474..6046d4bf6ec4b 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -94,6 +94,21 @@ Multithreaded CSV reading with a new CSV Engine based on pyarrow :func:`pandas.read_csv` now accepts ``engine="pyarrow"`` (requires at least ``pyarrow`` 0.17.0) as an argument, allowing for faster csv parsing on multicore machines with pyarrow installed. See the :doc:`I/O docs ` for more info. (:issue:`23697`) +.. _whatsnew_140.enhancements.window_rank: + +Rank function for rolling and expanding windows +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Added ``rank`` function to :class:`Rolling` and :class:`Expanding`. The new function supports the ``method``, ``ascending``, and ``pct`` flags of :meth:`DataFrame.rank`. The ``method`` argument supports ``min``, ``max``, and ``average`` ranking methods. +Example: + +.. ipython:: python + + s = pd.Series([1, 4, 2, 3, 5, 3]) + s.rolling(3).rank() + + s.rolling(3).rank(method="max") + .. _whatsnew_140.enhancements.other: Other enhancements diff --git a/pandas/_libs/algos.pxd b/pandas/_libs/algos.pxd index 7e87f4767c86d..4f7cc9345ed30 100644 --- a/pandas/_libs/algos.pxd +++ b/pandas/_libs/algos.pxd @@ -2,3 +2,11 @@ from pandas._libs.util cimport numeric cdef numeric kth_smallest_c(numeric* arr, Py_ssize_t k, Py_ssize_t n) nogil + +cdef enum TiebreakEnumType: + TIEBREAK_AVERAGE + TIEBREAK_MIN, + TIEBREAK_MAX + TIEBREAK_FIRST + TIEBREAK_FIRST_DESCENDING + TIEBREAK_DENSE diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index 99929c36c0929..a18cfc41d1e2e 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -66,13 +66,6 @@ cdef: float64_t NaN = np.NaN int64_t NPY_NAT = get_nat() -cdef enum TiebreakEnumType: - TIEBREAK_AVERAGE - TIEBREAK_MIN, - TIEBREAK_MAX - TIEBREAK_FIRST - TIEBREAK_FIRST_DESCENDING - TIEBREAK_DENSE tiebreakers = { "average": TIEBREAK_AVERAGE, diff --git a/pandas/_libs/src/skiplist.h b/pandas/_libs/src/skiplist.h index 1679ced174f29..5d0b144a1fe61 100644 --- a/pandas/_libs/src/skiplist.h +++ b/pandas/_libs/src/skiplist.h @@ -180,10 +180,30 @@ PANDAS_INLINE double skiplist_get(skiplist_t *skp, int i, int *ret) { return node->value; } +// Returns the lowest rank of all elements with value `value`, as opposed to the +// highest rank returned by `skiplist_insert`. +PANDAS_INLINE int skiplist_min_rank(skiplist_t *skp, double value) { + node_t *node; + int level, rank = 0; + + node = skp->head; + for (level = skp->maxlevels - 1; level >= 0; --level) { + while (_node_cmp(node->next[level], value) > 0) { + rank += node->width[level]; + node = node->next[level]; + } + } + + return rank + 1; +} + +// Returns the rank of the inserted element. When there are duplicates, +// `rank` is the highest of the group, i.e. the 'max' method of +// https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rank.html PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) { node_t *node, *prevnode, *newnode, *next_at_level; int *steps_at_level; - int size, steps, level; + int size, steps, level, rank = 0; node_t **chain; chain = skp->tmp_chain; @@ -197,6 +217,7 @@ PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) { next_at_level = node->next[level]; while (_node_cmp(next_at_level, value) >= 0) { steps_at_level[level] += node->width[level]; + rank += node->width[level]; node = next_at_level; next_at_level = node->next[level]; } @@ -230,7 +251,7 @@ PANDAS_INLINE int skiplist_insert(skiplist_t *skp, double value) { ++(skp->size); - return 1; + return rank + 1; } PANDAS_INLINE int skiplist_remove(skiplist_t *skp, double value) { diff --git a/pandas/_libs/window/aggregations.pyi b/pandas/_libs/window/aggregations.pyi index fe083fe415e4b..879809a259266 100644 --- a/pandas/_libs/window/aggregations.pyi +++ b/pandas/_libs/window/aggregations.pyi @@ -6,6 +6,8 @@ from typing import ( import numpy as np +from pandas._typing import WindowingRankType + def roll_sum( values: np.ndarray, # const float64_t[:] start: np.ndarray, # np.ndarray[np.int64] @@ -63,6 +65,15 @@ def roll_quantile( quantile: float, # float64_t interpolation: Literal["linear", "lower", "higher", "nearest", "midpoint"], ) -> np.ndarray: ... # np.ndarray[float] +def roll_rank( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + minp: int, + percentile: bool, + method: WindowingRankType, + ascending: bool, +) -> np.ndarray: ... # np.ndarray[float] def roll_apply( obj: object, start: np.ndarray, # np.ndarray[np.int64] diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index f792b653eb07b..ea52bd24a3689 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -5,6 +5,8 @@ import cython from libc.math cimport round from libcpp.deque cimport deque +from pandas._libs.algos cimport TiebreakEnumType + import numpy as np cimport numpy as cnp @@ -50,6 +52,8 @@ cdef extern from "../src/skiplist.h": double skiplist_get(skiplist_t*, int, int*) nogil int skiplist_insert(skiplist_t*, double) nogil int skiplist_remove(skiplist_t*, double) nogil + int skiplist_rank(skiplist_t*, double) nogil + int skiplist_min_rank(skiplist_t*, double) nogil cdef: float32_t MINfloat32 = np.NINF @@ -795,7 +799,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, val = values[j] if notnan(val): nobs += 1 - err = skiplist_insert(sl, val) != 1 + err = skiplist_insert(sl, val) == -1 if err: break @@ -806,7 +810,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, val = values[j] if notnan(val): nobs += 1 - err = skiplist_insert(sl, val) != 1 + err = skiplist_insert(sl, val) == -1 if err: break @@ -1139,6 +1143,122 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, return output +rolling_rank_tiebreakers = { + "average": TiebreakEnumType.TIEBREAK_AVERAGE, + "min": TiebreakEnumType.TIEBREAK_MIN, + "max": TiebreakEnumType.TIEBREAK_MAX, +} + + +def roll_rank(const float64_t[:] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, bint percentile, + str method, bint ascending) -> np.ndarray: + """ + O(N log(window)) implementation using skip list + + derived from roll_quantile + """ + cdef: + Py_ssize_t i, j, s, e, N = len(values), idx + float64_t rank_min = 0, rank = 0 + int64_t nobs = 0, win + float64_t val + skiplist_t *skiplist + float64_t[::1] output + TiebreakEnumType rank_type + + try: + rank_type = rolling_rank_tiebreakers[method] + except KeyError: + raise ValueError(f"Method '{method}' is not supported") + + is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds( + start, end + ) + # we use the Fixed/Variable Indexer here as the + # actual skiplist ops outweigh any window computation costs + output = np.empty(N, dtype=np.float64) + + win = (end - start).max() + if win == 0: + output[:] = NaN + return np.asarray(output) + skiplist = skiplist_init(win) + if skiplist == NULL: + raise MemoryError("skiplist_init failed") + + with nogil: + for i in range(N): + s = start[i] + e = end[i] + + if i == 0 or not is_monotonic_increasing_bounds: + if not is_monotonic_increasing_bounds: + nobs = 0 + skiplist_destroy(skiplist) + skiplist = skiplist_init(win) + + # setup + for j in range(s, e): + val = values[j] if ascending else -values[j] + if notnan(val): + nobs += 1 + rank = skiplist_insert(skiplist, val) + if rank == -1: + raise MemoryError("skiplist_insert failed") + if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE: + # The average rank of `val` is the sum of the ranks of all + # instances of `val` in the skip list divided by the number + # of instances. The sum of consecutive integers from 1 to N + # is N * (N + 1) / 2. + # The sum of the ranks is the sum of integers from the + # lowest rank to the highest rank, which is the sum of + # integers from 1 to the highest rank minus the sum of + # integers from 1 to one less than the lowest rank. + rank_min = skiplist_min_rank(skiplist, val) + rank = (((rank * (rank + 1) / 2) + - ((rank_min - 1) * rank_min / 2)) + / (rank - rank_min + 1)) + elif rank_type == TiebreakEnumType.TIEBREAK_MIN: + rank = skiplist_min_rank(skiplist, val) + else: + rank = NaN + + else: + # calculate deletes + for j in range(start[i - 1], s): + val = values[j] if ascending else -values[j] + if notnan(val): + skiplist_remove(skiplist, val) + nobs -= 1 + + # calculate adds + for j in range(end[i - 1], e): + val = values[j] if ascending else -values[j] + if notnan(val): + nobs += 1 + rank = skiplist_insert(skiplist, val) + if rank == -1: + raise MemoryError("skiplist_insert failed") + if rank_type == TiebreakEnumType.TIEBREAK_AVERAGE: + rank_min = skiplist_min_rank(skiplist, val) + rank = (((rank * (rank + 1) / 2) + - ((rank_min - 1) * rank_min / 2)) + / (rank - rank_min + 1)) + elif rank_type == TiebreakEnumType.TIEBREAK_MIN: + rank = skiplist_min_rank(skiplist, val) + else: + rank = NaN + if nobs >= minp: + output[i] = rank / nobs if percentile else rank + else: + output[i] = NaN + + skiplist_destroy(skiplist) + + return np.asarray(output) + + def roll_apply(object obj, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp, diff --git a/pandas/_typing.py b/pandas/_typing.py index 5077e659410e3..9ed31dc3738f3 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -219,3 +219,6 @@ PositionalIndexer = Union[ScalarIndexer, SequenceIndexer] PositionalIndexerTuple = Tuple[PositionalIndexer, PositionalIndexer] PositionalIndexer2D = Union[PositionalIndexer, PositionalIndexerTuple] + +# Windowing rank methods +WindowingRankType = Literal["average", "min", "max"] diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 45ebee6db5ad4..2f460267bfec4 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -10,6 +10,7 @@ from pandas._typing import ( Axis, FrameOrSeries, + WindowingRankType, ) if TYPE_CHECKING: @@ -564,6 +565,81 @@ def quantile( **kwargs, ) + @doc( + template_header, + ".. versionadded:: 1.4.0 \n\n", + create_section_header("Parameters"), + dedent( + """ + method : {{'average', 'min', 'max'}}, default 'average' + How to rank the group of records that have the same value (i.e. ties): + + * average: average rank of the group + * min: lowest rank in the group + * max: highest rank in the group + + ascending : bool, default True + Whether or not the elements should be ranked in ascending order. + pct : bool, default False + Whether or not to display the returned rankings in percentile + form. + """ + ).replace("\n", "", 1), + kwargs_compat, + create_section_header("Returns"), + template_returns, + create_section_header("See Also"), + template_see_also, + create_section_header("Examples"), + dedent( + """ + >>> s = pd.Series([1, 4, 2, 3, 5, 3]) + >>> s.expanding().rank() + 0 1.0 + 1 2.0 + 2 2.0 + 3 3.0 + 4 5.0 + 5 3.5 + dtype: float64 + + >>> s.expanding().rank(method="max") + 0 1.0 + 1 2.0 + 2 2.0 + 3 3.0 + 4 5.0 + 5 4.0 + dtype: float64 + + >>> s.expanding().rank(method="min") + 0 1.0 + 1 2.0 + 2 2.0 + 3 3.0 + 4 5.0 + 5 3.0 + dtype: float64 + """ + ).replace("\n", "", 1), + window_method="expanding", + aggregation_description="rank", + agg_method="rank", + ) + def rank( + self, + method: WindowingRankType = "average", + ascending: bool = True, + pct: bool = False, + **kwargs, + ): + return super().rank( + method=method, + ascending=ascending, + pct=pct, + **kwargs, + ) + @doc( template_header, create_section_header("Parameters"), diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f6e991c7d7cd2..b3592679752fd 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -28,6 +28,7 @@ ArrayLike, Axis, FrameOrSeries, + WindowingRankType, ) from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv @@ -1410,6 +1411,22 @@ def quantile(self, quantile: float, interpolation: str = "linear", **kwargs): return self._apply(window_func, name="quantile", **kwargs) + def rank( + self, + method: WindowingRankType = "average", + ascending: bool = True, + pct: bool = False, + **kwargs, + ): + window_func = partial( + window_aggregations.roll_rank, + method=method, + ascending=ascending, + percentile=pct, + ) + + return self._apply(window_func, name="rank", **kwargs) + def cov( self, other: DataFrame | Series | None = None, @@ -2161,6 +2178,81 @@ def quantile(self, quantile: float, interpolation: str = "linear", **kwargs): **kwargs, ) + @doc( + template_header, + ".. versionadded:: 1.4.0 \n\n", + create_section_header("Parameters"), + dedent( + """ + method : {{'average', 'min', 'max'}}, default 'average' + How to rank the group of records that have the same value (i.e. ties): + + * average: average rank of the group + * min: lowest rank in the group + * max: highest rank in the group + + ascending : bool, default True + Whether or not the elements should be ranked in ascending order. + pct : bool, default False + Whether or not to display the returned rankings in percentile + form. + """ + ).replace("\n", "", 1), + kwargs_compat, + create_section_header("Returns"), + template_returns, + create_section_header("See Also"), + template_see_also, + create_section_header("Examples"), + dedent( + """ + >>> s = pd.Series([1, 4, 2, 3, 5, 3]) + >>> s.rolling(3).rank() + 0 NaN + 1 NaN + 2 2.0 + 3 2.0 + 4 3.0 + 5 1.5 + dtype: float64 + + >>> s.rolling(3).rank(method="max") + 0 NaN + 1 NaN + 2 2.0 + 3 2.0 + 4 3.0 + 5 2.0 + dtype: float64 + + >>> s.rolling(3).rank(method="min") + 0 NaN + 1 NaN + 2 2.0 + 3 2.0 + 4 3.0 + 5 1.0 + dtype: float64 + """ + ).replace("\n", "", 1), + window_method="rolling", + aggregation_description="rank", + agg_method="rank", + ) + def rank( + self, + method: WindowingRankType = "average", + ascending: bool = True, + pct: bool = False, + **kwargs, + ): + return super().rank( + method=method, + ascending=ascending, + pct=pct, + **kwargs, + ) + @doc( template_header, create_section_header("Parameters"), diff --git a/pandas/tests/window/test_expanding.py b/pandas/tests/window/test_expanding.py index 1b9259fd8240e..680ac3654222a 100644 --- a/pandas/tests/window/test_expanding.py +++ b/pandas/tests/window/test_expanding.py @@ -264,3 +264,27 @@ def test_expanding_skew_kurt_numerical_stability(method): s = s + 5000 result = getattr(s.expanding(3), method)() tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize("window", [1, 3, 10, 20]) +@pytest.mark.parametrize("method", ["min", "max", "average"]) +@pytest.mark.parametrize("pct", [True, False]) +@pytest.mark.parametrize("ascending", [True, False]) +@pytest.mark.parametrize("test_data", ["default", "duplicates", "nans"]) +def test_rank(window, method, pct, ascending, test_data): + length = 20 + if test_data == "default": + ser = Series(data=np.random.rand(length)) + elif test_data == "duplicates": + ser = Series(data=np.random.choice(3, length)) + elif test_data == "nans": + ser = Series( + data=np.random.choice([1.0, 0.25, 0.75, np.nan, np.inf, -np.inf], length) + ) + + expected = ser.expanding(window).apply( + lambda x: x.rank(method=method, pct=pct, ascending=ascending).iloc[-1] + ) + result = ser.expanding(window).rank(method=method, pct=pct, ascending=ascending) + + tm.assert_series_equal(result, expected) diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 2edf22d96a9ba..ed1039223e831 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -1500,3 +1500,27 @@ def test_rolling_numeric_dtypes(): dtype="float64", ) tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize("window", [1, 3, 10, 20]) +@pytest.mark.parametrize("method", ["min", "max", "average"]) +@pytest.mark.parametrize("pct", [True, False]) +@pytest.mark.parametrize("ascending", [True, False]) +@pytest.mark.parametrize("test_data", ["default", "duplicates", "nans"]) +def test_rank(window, method, pct, ascending, test_data): + length = 20 + if test_data == "default": + ser = Series(data=np.random.rand(length)) + elif test_data == "duplicates": + ser = Series(data=np.random.choice(3, length)) + elif test_data == "nans": + ser = Series( + data=np.random.choice([1.0, 0.25, 0.75, np.nan, np.inf, -np.inf], length) + ) + + expected = ser.rolling(window).apply( + lambda x: x.rank(method=method, pct=pct, ascending=ascending).iloc[-1] + ) + result = ser.rolling(window).rank(method=method, pct=pct, ascending=ascending) + + tm.assert_series_equal(result, expected)