Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions databricks/koalas/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,58 @@ def repeat(self, repeats: int) -> "Index":
else:
return ks.concat([kdf] * repeats).index

def asof(self, label):
"""
Return the label from the index, or, if not present, the previous one.

Assuming that the index is sorted, return the passed index label if it
is in the index, or return the previous index label if the passed one
is not in the index.

.. note:: This API is dependent on :meth:`Index.is_monotonic_increasing`
which can be expensive.

Parameters
----------
label : object
The label up to which the method returns the latest index label.

Returns
-------
object
The passed label if it is in the index. The previous label if the
passed label is not in the sorted index or `NaN` if there is no
such label.

Examples
--------
`Index.asof` returns the latest index label up to the passed label.

>>> idx = ks.Index(['2013-12-31', '2014-01-02', '2014-01-03'])
>>> idx.asof('2014-01-01')
'2013-12-31'

If the label is in the index, the method returns the passed label.

>>> idx.asof('2014-01-02')
'2014-01-02'

If all of the labels in the index are later than the passed label,
NaN is returned.

>>> idx.asof('1999-01-02')
nan
"""
sdf = self._internal._sdf
if self.is_monotonic_increasing:
sdf = sdf.select(self._scol).where(self._scol <= label).select(F.max(self._scol))
elif self.is_monotonic_decreasing:
sdf = sdf.select(self._scol).where(self._scol >= label).select(F.min(self._scol))
else:
raise ValueError("index must be monotonic increasing or decreasing")
result = sdf.head()[0]
return result if result is not None else np.nan

def union(self, other, sort=None):
"""
Form the union of two Index objects.
Expand Down Expand Up @@ -2366,6 +2418,11 @@ def argmax(self):
def argmin(self):
raise TypeError("reduction operation 'argmin' not allowed for this dtype")

def asof(self, label):
raise NotImplementedError(
"only the default get_loc method is currently supported for MultiIndex"
)

@property
def is_all_dates(self):
"""
Expand Down
2 changes: 0 additions & 2 deletions databricks/koalas/missing/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class _MissingPandasLikeIndex(object):

# Functions
argsort = unsupported_function("argsort")
asof = unsupported_function("asof")
asof_locs = unsupported_function("asof_locs")
delete = unsupported_function("delete")
factorize = unsupported_function("factorize")
Expand Down Expand Up @@ -111,7 +110,6 @@ class _MissingPandasLikeMultiIndex(object):

# Functions
argsort = unsupported_function("argsort")
asof = unsupported_function("asof")
asof_locs = unsupported_function("asof_locs")
delete = unsupported_function("delete")
equal_levels = unsupported_function("equal_levels")
Expand Down
5 changes: 2 additions & 3 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -3551,9 +3551,8 @@ def truncate(self, before=None, after=None, copy=True):
Truncates a sorted Series before and/or after some particular index value.
Series should have sorted index.

.. note:: the current implementation of truncate uses is_monotonic_increasing internally
This leads to move all data into single partition in single machine and could cause
serious performance degradation. Avoid this method against very large dataset.
.. note:: This API is dependent on :meth:`Index.is_monotonic_increasing`
which can be expensive.

Parameters
----------
Expand Down
27 changes: 27 additions & 0 deletions databricks/koalas/tests/test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,33 @@ def test_unique(self):
self.assert_eq(kmidx.unique().sort_values(), pmidx.unique().sort_values())
self.assert_eq(kmidx.unique().sort_values(), pmidx.unique().sort_values())

def test_asof(self):
# Increasing values
pidx = pd.Index(["2013-12-31", "2014-01-02", "2014-01-03"])
kidx = ks.from_pandas(pidx)

self.assert_eq(kidx.asof("2014-01-01"), pidx.asof("2014-01-01"))
self.assert_eq(kidx.asof("2014-01-02"), pidx.asof("2014-01-02"))
self.assert_eq(np.isnan(kidx.asof("1999-01-02")), True)
self.assert_eq(np.isnan(pidx.asof("1999-01-02")), True)

# Decreasing values
pidx = pd.Index(["2014-01-03", "2014-01-02", "2013-12-31"])
kidx = ks.from_pandas(pidx)

self.assert_eq(kidx.asof("2014-01-01"), pidx.asof("2014-01-01"))
self.assert_eq(kidx.asof("2014-01-02"), pidx.asof("2014-01-02"))
self.assert_eq(kidx.asof("1999-01-02"), pidx.asof("1999-01-02"))
self.assert_eq(np.isnan(kidx.asof("2015-01-02")), True)
self.assert_eq(np.isnan(pidx.asof("2015-01-02")), True)

# Not increasing, neither decreasing (ValueError)
kidx = ks.Index(["2013-12-31", "2015-01-02", "2014-01-03"])
self.assertRaises(ValueError, lambda: kidx.asof("2013-12-31"))

kmidx = ks.MultiIndex.from_tuples([("a", "a"), ("a", "b"), ("a", "c")])
self.assertRaises(NotImplementedError, lambda: kmidx.asof(("a", "b")))

def test_union(self):
# Index
pidx1 = pd.Index([1, 2, 3, 4])
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/indexing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Selecting
.. autosummary::
:toctree: api/

Index.asof
Index.isin

.. _api.multiindex:
Expand Down