Skip to content

Commit ae10be6

Browse files
committed
Make is_monotonic/is_monotonic_decreasing distributed
1 parent da3740d commit ae10be6

File tree

3 files changed

+93
-35
lines changed

3 files changed

+93
-35
lines changed

databricks/koalas/base.py

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -319,10 +319,10 @@ def is_monotonic(self):
319319
"""
320320
Return boolean if values in the object are monotonically increasing.
321321
322-
.. note:: the current implementation of is_monotonic_increasing uses Spark's
323-
Window without specifying partition specification. This leads to move all data into
324-
single partition in single machine and could cause serious
325-
performance degradation. Avoid this method against very large dataset.
322+
.. note:: the current implementation of is_monotonic requires to shuffle
323+
and aggregate multiple times to check the order locally and globally,
324+
which is potentially expensive. In case of multi-index, all data are
325+
transferred to single node which can easily cause out-of-memory error currently.
326326
327327
Returns
328328
-------
@@ -385,12 +385,7 @@ def is_monotonic(self):
385385
>>> midx.is_monotonic
386386
False
387387
"""
388-
return self._is_monotonic().all()
389-
390-
def _is_monotonic(self):
391-
col = self._scol
392-
window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1, -1)
393-
return self._with_new_scol((col >= F.lag(col, 1).over(window)) & col.isNotNull())
388+
return self._is_monotonic("increasing")
394389

395390
is_monotonic_increasing = is_monotonic
396391

@@ -399,10 +394,10 @@ def is_monotonic_decreasing(self):
399394
"""
400395
Return boolean if values in the object are monotonically decreasing.
401396
402-
.. note:: the current implementation of is_monotonic_decreasing uses Spark's
403-
Window without specifying partition specification. This leads to move all data into
404-
single partition in single machine and could cause serious
405-
performance degradation. Avoid this method against very large dataset.
397+
.. note:: the current implementation of is_monotonic_decreasing requires to shuffle
398+
and aggregate multiple times to check the order locally and globally,
399+
which is potentially expensive. In case of multi-index, all data are transferred
400+
to single node which can easily cause out-of-memory error currently.
406401
407402
Returns
408403
-------
@@ -465,12 +460,80 @@ def is_monotonic_decreasing(self):
465460
>>> midx.is_monotonic_decreasing
466461
True
467462
"""
468-
return self._is_monotonic_decreasing().all()
463+
return self._is_monotonic("decreasing")
469464

470-
def _is_monotonic_decreasing(self):
471-
col = self._scol
472-
window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1, -1)
473-
return self._with_new_scol((col <= F.lag(col, 1).over(window)) & col.isNotNull())
465+
def _is_locally_monotonic_spark_column(self, order):
466+
window = (
467+
Window.partitionBy(F.col("__partition_id"))
468+
.orderBy(NATURAL_ORDER_COLUMN_NAME)
469+
.rowsBetween(-1, -1)
470+
)
471+
472+
if order == "increasing":
473+
return (F.col("__origin") >= F.lag(F.col("__origin"), 1).over(window)) & F.col(
474+
"__origin"
475+
).isNotNull()
476+
else:
477+
return (F.col("__origin") <= F.lag(F.col("__origin"), 1).over(window)) & F.col(
478+
"__origin"
479+
).isNotNull()
480+
481+
def _is_monotonic(self, order):
482+
assert order in ("increasing", "decreasing")
483+
484+
sdf = self._internal.spark_frame
485+
486+
sdf = (
487+
sdf.select(
488+
F.spark_partition_id().alias(
489+
"__partition_id"
490+
), # Make sure we use the same partition id in the whole job.
491+
F.col(NATURAL_ORDER_COLUMN_NAME),
492+
self._scol.alias("__origin"),
493+
)
494+
.select(
495+
F.col("__partition_id"),
496+
F.col("__origin"),
497+
self._is_locally_monotonic_spark_column(order).alias(
498+
"__comparison_within_partition"
499+
),
500+
)
501+
.groupby(F.col("__partition_id"))
502+
.agg(
503+
F.min(F.col("__origin")).alias("__partition_min"),
504+
F.max(F.col("__origin")).alias("__partition_max"),
505+
F.min(F.coalesce(F.col("__comparison_within_partition"), F.lit(True))).alias(
506+
"__comparison_within_partition"
507+
),
508+
)
509+
)
510+
511+
# Now we're windowing the aggregation results without partition specification.
512+
# The number of rows here will be as the same of partitions, which is expected
513+
# to be small.
514+
window = Window.orderBy(F.col("__partition_id")).rowsBetween(-1, -1)
515+
if order == "increasing":
516+
comparison_col = F.col("__partition_min") >= F.lag(F.col("__partition_max"), 1).over(
517+
window
518+
)
519+
else:
520+
comparison_col = F.col("__partition_min") <= F.lag(F.col("__partition_max"), 1).over(
521+
window
522+
)
523+
524+
sdf = sdf.select(
525+
comparison_col.alias("__comparison_between_partitions"),
526+
F.col("__comparison_within_partition"),
527+
)
528+
529+
ret = sdf.select(
530+
F.min(F.coalesce(F.col("__comparison_between_partitions"), F.lit(True)))
531+
& F.min(F.coalesce(F.col("__comparison_within_partition"), F.lit(True)))
532+
).collect()[0][0]
533+
if ret is None:
534+
return True
535+
else:
536+
return ret
474537

475538
@property
476539
def ndim(self):

databricks/koalas/indexes.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1930,7 +1930,13 @@ def _comparator_for_monotonic_increasing(data_type):
19301930
else:
19311931
return compare_null_last
19321932

1933-
def _is_monotonic(self):
1933+
def _is_monotonic(self, order):
1934+
if order == "increasing":
1935+
return self._is_monotonic_increasing().all()
1936+
else:
1937+
return self._is_monotonic_decreasing().all()
1938+
1939+
def _is_monotonic_increasing(self):
19341940
scol = self._scol
19351941
window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-1, -1)
19361942
prev = F.lag(scol, 1).over(window)

databricks/koalas/indexing.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -568,30 +568,19 @@ def _select_rows(self, rows_sel):
568568
if (start is None and rows_sel.start is not None) or (
569569
stop is None and rows_sel.stop is not None
570570
):
571-
inc, dec = (
572-
sdf.select(
573-
index_column._is_monotonic()._scol.alias("__increasing__"),
574-
index_column._is_monotonic_decreasing()._scol.alias("__decreasing__"),
575-
)
576-
.select(
577-
F.min(F.coalesce("__increasing__", F.lit(True))),
578-
F.min(F.coalesce("__decreasing__", F.lit(True))),
579-
)
580-
.first()
581-
)
582571
if start is None and rows_sel.start is not None:
583572
start = rows_sel.start
584-
if inc is not False:
573+
if index_column.is_monotonic_increasing is not False:
585574
cond.append(index_column._scol >= F.lit(start).cast(index_data_type))
586-
elif dec is not False:
575+
elif index_column.is_monotonic_decreasing is not False:
587576
cond.append(index_column._scol <= F.lit(start).cast(index_data_type))
588577
else:
589578
raise KeyError(rows_sel.start)
590579
if stop is None and rows_sel.stop is not None:
591580
stop = rows_sel.stop
592-
if inc is not False:
581+
if index_column.is_monotonic_increasing is not False:
593582
cond.append(index_column._scol <= F.lit(stop).cast(index_data_type))
594-
elif dec is not False:
583+
elif index_column.is_monotonic_decreasing is not False:
595584
cond.append(index_column._scol >= F.lit(stop).cast(index_data_type))
596585
else:
597586
raise KeyError(rows_sel.stop)

0 commit comments

Comments
 (0)