Skip to content

Commit b5676f3

Browse files
authored
Reduce the number of joins for mod/rmod. (#1409)
This is a follow-up of #1399. When performing mod/rmod, if the operands are series from different dataframes, we needed three joins. ```py >>> kser = ks.Series([100, None, -300, None, 500, -700], name="Koalas") >>> (kser % ks.Series([150] * 6)).to_frame().explain() == Physical Plan == *(9) Project [CASE WHEN isnotnull(__index_level_0__#317L) THEN __index_level_0__#317L ELSE __index_level_0__#228L END AS __index_level_0__#378L, (Koalas#364 % cast(0#229L as double)) AS Koalas#425] +- SortMergeJoin [__index_level_0__#317L], [__index_level_0__#228L], FullOuter :- *(7) Sort [__index_level_0__#317L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(__index_level_0__#317L, 200) : +- *(6) Project [CASE WHEN isnotnull(__index_level_0__#254L) THEN __index_level_0__#254L ELSE __index_level_0__#228L END AS __index_level_0__#317L, (Koalas#303 + cast(0#229L as double)) AS Koalas#364] : +- SortMergeJoin [__index_level_0__#254L], [__index_level_0__#228L], FullOuter : :- *(4) Sort [__index_level_0__#254L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(__index_level_0__#254L, 200) : : +- *(3) Project [CASE WHEN isnotnull(__index_level_0__#0L) THEN __index_level_0__#0L ELSE __index_level_0__#228L END AS __index_level_0__#254L, (Koalas#1 % cast(0#229L as double)) AS Koalas#303] : : +- SortMergeJoin [__index_level_0__#0L], [__index_level_0__#228L], FullOuter : : :- *(1) Sort [__index_level_0__#0L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(__index_level_0__#0L, 200) : : : +- Scan ExistingRDD[__index_level_0__#0L,Koalas#1] : : +- *(2) Sort [__index_level_0__#228L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(__index_level_0__#228L, 200) : : +- Scan ExistingRDD[__index_level_0__#228L,0#229L] : +- *(5) Sort [__index_level_0__#228L ASC NULLS FIRST], false, 0 : +- ReusedExchange [__index_level_0__#228L, 0#229L], Exchange hashpartitioning(__index_level_0__#228L, 200) +- *(8) Sort [__index_level_0__#228L ASC NULLS FIRST], false, 0 +- ReusedExchange [__index_level_0__#228L, 0#229L], Exchange hashpartitioning(__index_level_0__#228L, 200) ``` We can reduce the number to only one. ```py >>> (kser % ks.Series([150] * 6)).to_frame().explain() == Physical Plan == *(3) Project [CASE WHEN isnotnull(__index_level_0__#0L) THEN __index_level_0__#0L ELSE __index_level_0__#98L END AS __index_level_0__#118L, (((Koalas#1 % cast(0#99L as double)) + cast(0#99L as double)) % cast(0#99L as double)) AS Koalas#165] +- SortMergeJoin [__index_level_0__#0L], [__index_level_0__#98L], FullOuter :- *(1) Sort [__index_level_0__#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(__index_level_0__#0L, 200) : +- Scan ExistingRDD[__index_level_0__#0L,Koalas#1] +- *(2) Sort [__index_level_0__#98L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(__index_level_0__#98L, 200) +- Scan ExistingRDD[__index_level_0__#98L,0#99L] ```
1 parent 2554970 commit b5676f3

File tree

3 files changed

+49
-11
lines changed

3 files changed

+49
-11
lines changed

databricks/koalas/base.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import numpy as np
2525
import pandas as pd
26-
from pandas.api.types import is_list_like, is_scalar
26+
from pandas.api.types import is_list_like
2727
from pyspark import sql as spark
2828
from pyspark.sql import functions as F, Window
2929
from pyspark.sql.types import DoubleType, FloatType, LongType, StringType, TimestampType
@@ -188,11 +188,10 @@ def __sub__(self, other):
188188
__truediv__ = _numpy_column_op(spark.Column.__truediv__)
189189

190190
def __mod__(self, other):
191-
if is_scalar(other):
192-
return self._with_new_scol((self._scol % other + other) % other)
193-
else:
194-
result_spark = _column_op(spark.Column.__mod__)(self, other)
195-
return _column_op(spark.Column.__mod__)(result_spark + other, other)
191+
def mod(left, right):
192+
return ((left % right) + right) % right
193+
194+
return _column_op(mod)(self, other)
196195

197196
def __radd__(self, other):
198197
# Handle 'literal' + df['col']
@@ -217,11 +216,10 @@ def __rfloordiv__(self, other):
217216
)
218217

219218
def __rmod__(self, other):
220-
if is_scalar(other):
221-
return self._with_new_scol((other % self._scol + self._scol) % self._scol)
222-
else:
223-
result_spark = _column_op(spark.Column.__mod__)(other, self)
224-
return _column_op(spark.Column.__mod__)(result_spark + self, self)
219+
def rmod(left, right):
220+
return ((right % left) + left) % left
221+
222+
return _column_op(rmod)(self, other)
225223

226224
__pow__ = _column_op(spark.Column.__pow__)
227225
__rpow__ = _column_op(spark.Column.__rpow__)

databricks/koalas/tests/test_ops_on_diff_frames.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,38 @@ def test_arithmetic_chain(self):
310310
(kser1 + kser2 * kser3).sort_index(), (pser1 + pser2 * pser3).sort_index(), almost=True
311311
)
312312

313+
def test_mod(self):
314+
pser = pd.Series([100, None, -300, None, 500, -700], name="Koalas")
315+
pser_other = pd.Series([-150] * 6)
316+
kser = ks.from_pandas(pser)
317+
kser_other = ks.from_pandas(pser_other)
318+
319+
self.assert_eq(
320+
repr(kser.mod(kser_other).sort_index()), repr(pser.mod(pser_other).rename("Koalas"))
321+
)
322+
self.assert_eq(
323+
repr(kser.mod(kser_other).sort_index()), repr(pser.mod(pser_other).rename("Koalas"))
324+
)
325+
self.assert_eq(
326+
repr(kser.mod(kser_other).sort_index()), repr(pser.mod(pser_other).rename("Koalas"))
327+
)
328+
329+
def test_rmod(self):
330+
pser = pd.Series([100, None, -300, None, 500, -700], name="Koalas")
331+
pser_other = pd.Series([-150] * 6)
332+
kser = ks.from_pandas(pser)
333+
kser_other = ks.from_pandas(pser_other)
334+
335+
self.assert_eq(
336+
repr(kser.rmod(kser_other).sort_index()), repr(pser.rmod(pser_other).rename("Koalas"))
337+
)
338+
self.assert_eq(
339+
repr(kser.rmod(kser_other).sort_index()), repr(pser.rmod(pser_other).rename("Koalas"))
340+
)
341+
self.assert_eq(
342+
repr(kser.rmod(kser_other).sort_index()), repr(pser.rmod(pser_other).rename("Koalas"))
343+
)
344+
313345
def test_getitem_boolean_series(self):
314346
pdf1 = pd.DataFrame(
315347
{"A": [0, 1, 2, 3, 4], "B": [100, 200, 300, 400, 500]}, index=[20, 10, 30, 0, 50]

databricks/koalas/tests/test_series.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,10 @@ def test_mod(self):
13811381
self.assert_eq(repr(kser.mod(0)), repr(pser.mod(0)))
13821382
self.assert_eq(repr(kser.mod(150)), repr(pser.mod(150)))
13831383

1384+
pdf = pd.DataFrame({"a": [100, None, -300, None, 500, -700], "b": [150] * 6})
1385+
kdf = ks.from_pandas(pdf)
1386+
self.assert_eq(repr(kdf.a.mod(kdf.b)), repr(pdf.a.mod(pdf.b).rename("a")))
1387+
13841388
def test_rmod(self):
13851389
pser = pd.Series([100, None, -300, None, 500, -700], name="Koalas")
13861390
kser = ks.from_pandas(pser)
@@ -1389,6 +1393,10 @@ def test_rmod(self):
13891393
self.assert_eq(repr(kser.rmod(0)), repr(pser.rmod(0)))
13901394
self.assert_eq(repr(kser.rmod(150)), repr(pser.rmod(150)))
13911395

1396+
pdf = pd.DataFrame({"a": [100, None, -300, None, 500, -700], "b": [150] * 6})
1397+
kdf = ks.from_pandas(pdf)
1398+
self.assert_eq(repr(kdf.a.rmod(kdf.b)), repr(pdf.a.rmod(pdf.b).rename("a")))
1399+
13921400
def test_asof(self):
13931401
pser = pd.Series([1, 2, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
13941402
kser = ks.from_pandas(pser)

0 commit comments

Comments
 (0)