Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion databricks/koalas/missing/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class _MissingPandasLikeSeries(object):
align = unsupported_function("align")
argsort = unsupported_function("argsort")
asfreq = unsupported_function("asfreq")
asof = unsupported_function("asof")
at_time = unsupported_function("at_time")
autocorr = unsupported_function("autocorr")
between_time = unsupported_function("between_time")
Expand Down
82 changes: 82 additions & 0 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pandas as pd
from pandas.core.accessor import CachedAccessor
from pandas.io.formats.printing import pprint_thing
from pandas.api.types import is_list_like

from databricks.koalas.typedef import as_python_type
from pyspark import sql as spark
Expand Down Expand Up @@ -4518,6 +4519,87 @@ def repeat(self, repeats: int) -> "Series":
else:
return _col(ks.concat([kdf] * repeats))

def asof(self, where, subset=None):
"""
Return the last row(s) without any NaNs before `where`.

The last row (for each element in `where`, if list) without any
NaN is taken.

If there is no good value, NaN is returned.

Parameters
----------
where : index or array-like of indices
subset : str or array-like of str, default `None`

Returns
-------
scalar or Series

The return can be:

* scalar : when `self` is a Series and `where` is a scalar
* Series: when `self` is a Series and `where` is an array-like

Return scalar or Series

Notes
-----
Indices are assumed to be sorted. Raises if this is not the case.

Examples
--------
>>> s = ks.Series([1, 2, np.nan, 4], index=[10, 20, 30, 40])
>>> s
10 1.0
20 2.0
30 NaN
40 4.0
Name: 0, dtype: float64

A scalar `where`.

>>> s.asof(20)
2.0

For a sequence `where`, a Series is returned. The first value is
NaN, because the first element of `where` is before the first
index value.

>>> s.asof([5, 20]).sort_index()
5 NaN
20 2.0
Name: 0, dtype: float64

Missing values are not considered. The following is ``2.0``, not
NaN, even though NaN is at the index location for ``30``.

>>> s.asof(30)
2.0
"""
should_return_series = True
if isinstance(self.index, ks.MultiIndex):
raise ValueError("asof is not supported for a MultiIndex")
if isinstance(where, ks.DataFrame):
raise ValueError("where cannot be a DataFrame")
if not self.index.is_monotonic_increasing:
raise ValueError("asof requires a sorted index")
if not is_list_like(where):
should_return_series = False
where = [where]
sdf = self._internal._sdf
index_scol = self._internal.index_spark_columns[0]
results = [
sdf.where(index_scol <= index).select(F.max(self._scol)).head()[0] for index in where
]

if should_return_series:
return ks.Series(results, index=where, name=self.name)
else:
result = results[0]
return result if result is not None else np.nan

def _cum(self, func, skipna, part_cols=()):
# This is used to cummin, cummax, cumsum, etc.

Expand Down
24 changes: 24 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1372,3 +1372,27 @@ def test_take(self):
self.assertRaises(ValueError, lambda: kser.take("1"))
self.assertRaises(ValueError, lambda: kser.take({1, 2}))
self.assertRaises(ValueError, lambda: kser.take({1: None, 2: None}))

def test_asof(self):
pser = pd.Series([1, 2, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
kser = ks.from_pandas(pser)

self.assert_eq(repr(kser.asof(20)), repr(pser.asof(20)))
self.assert_eq(repr(kser.asof([5, 20]).sort_index()), repr(pser.asof([5, 20]).sort_index()))
self.assert_eq(repr(kser.asof(100)), repr(pser.asof(100)))
self.assert_eq(repr(kser.asof(-100)), repr(pser.asof(-100)))
self.assert_eq(
repr(kser.asof([-100, 100]).sort_index()), repr(pser.asof([-100, 100]).sort_index())
)

# where cannot be a DataFrame
self.assertRaises(ValueError, lambda: kser.asof(ks.DataFrame({"A": [1, 2, 3]})))
# asof is not supported for a MultiIndex
pser.index = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c"), ("y", "d")])
kser = ks.from_pandas(pser)
self.assertRaises(ValueError, lambda: kser.asof(20))
# asof requires a sorted index (More precisely, should be a monotonic increasing)
kser = ks.Series([1, 2, np.nan, 4], index=[10, 30, 20, 40], name="Koalas")
self.assertRaises(ValueError, lambda: kser.asof(20))
kser = ks.Series([1, 2, np.nan, 4], index=[40, 30, 20, 10], name="Koalas")
self.assertRaises(ValueError, lambda: kser.asof(20))
1 change: 1 addition & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ Time series-related
.. autosummary::
:toctree: api/

Series.asof
Series.shift
Series.first_valid_index

Expand Down