Skip to content

Commit 1e32e0c

Browse files
authored
Loosen anchor restriction. (#1522)
Trying to loosen anchor restriction. When the underlying Spark DataFrame is the same and index metadata is not modified, we can work without joins.
1 parent 9f004ce commit 1e32e0c

File tree

6 files changed

+108
-62
lines changed

6 files changed

+108
-62
lines changed

databricks/koalas/frame.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -599,25 +599,45 @@ def _map_series_op(self, op, other):
599599
"however, got %s." % (op, type(other))
600600
)
601601

602-
if isinstance(other, DataFrame) and not same_anchor(self, other):
602+
if isinstance(other, DataFrame):
603603
if self._internal.column_labels_level != other._internal.column_labels_level:
604604
raise ValueError("cannot join with no overlapping index names")
605605

606-
# Different DataFrames
607-
def apply_op(kdf, this_column_labels, that_column_labels):
608-
for this_label, that_label in zip(this_column_labels, that_column_labels):
609-
yield (
610-
getattr(kdf._kser_for(this_label), op)(kdf._kser_for(that_label)),
611-
this_label,
612-
)
606+
if not same_anchor(self, other):
607+
# Different DataFrames
608+
def apply_op(kdf, this_column_labels, that_column_labels):
609+
for this_label, that_label in zip(this_column_labels, that_column_labels):
610+
yield (
611+
getattr(kdf._kser_for(this_label), op)(kdf._kser_for(that_label)),
612+
this_label,
613+
)
613614

614-
return align_diff_frames(apply_op, self, other, fillna=True, how="full")
615-
else:
616-
# DataFrame and Series
617-
if isinstance(other, DataFrame):
618-
return self._apply_series_op(lambda kser: getattr(kser, op)(other[kser.name]))
615+
return align_diff_frames(apply_op, self, other, fillna=True, how="full")
619616
else:
620-
return self._apply_series_op(lambda kser: getattr(kser, op)(other))
617+
applied = []
618+
column_labels = []
619+
for label in self._internal.column_labels:
620+
if label in other._internal.column_labels:
621+
applied.append(getattr(self._kser_for(label), op)(other._kser_for(label)))
622+
else:
623+
applied.append(
624+
F.lit(None)
625+
.cast(self._internal.spark_type_for(label))
626+
.alias(name_like_string(label))
627+
)
628+
column_labels.append(label)
629+
for label in other._internal.column_labels:
630+
if label not in column_labels:
631+
applied.append(
632+
F.lit(None)
633+
.cast(other._internal.spark_type_for(label))
634+
.alias(name_like_string(label))
635+
)
636+
column_labels.append(label)
637+
internal = self._internal.with_new_columns(applied, column_labels)
638+
return DataFrame(internal)
639+
else:
640+
return self._apply_series_op(lambda kser: getattr(kser, op)(other))
621641

622642
def __add__(self, other):
623643
return self._map_series_op("add", other)

databricks/koalas/groupby.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2175,9 +2175,13 @@ def assign_columns(kdf, this_column_labels, that_column_labels):
21752175
@staticmethod
21762176
def _resolve_grouping(kdf: DataFrame, by: List[Union[Series, Tuple[str, ...]]]) -> List[Series]:
21772177
new_by_series = []
2178-
for col_or_s in by:
2178+
for i, col_or_s in enumerate(by):
21792179
if isinstance(col_or_s, Series):
2180-
new_by_series.append(col_or_s)
2180+
if col_or_s._kdf is kdf:
2181+
new_by_series.append(col_or_s)
2182+
else:
2183+
# Rename to distinguish the key from a different DataFrame.
2184+
new_by_series.append(col_or_s.rename("__tmp_groupkey_{}__".format(i)))
21812185
elif isinstance(col_or_s, tuple):
21822186
kser = kdf[col_or_s]
21832187
if not isinstance(kser, Series):

databricks/koalas/tests/test_dataframe.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,13 @@ def test_clip(self):
13921392
self.assert_eq(str_kdf.clip(1, 3), str_kdf)
13931393

13941394
def test_binary_operators(self):
1395+
pdf = pd.DataFrame(
1396+
{"A": [0, 2, 4], "B": [4, 2, 0], "X": [-1, 10, 0]}, index=np.random.rand(3)
1397+
)
1398+
kdf = ks.from_pandas(pdf)
1399+
1400+
self.assert_eq(kdf + kdf.copy(), pdf + pdf.copy())
1401+
13951402
self.assertRaisesRegex(
13961403
ValueError,
13971404
"it comes from a different dataframe",

databricks/koalas/tests/test_groupby.py

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -237,18 +237,18 @@ def test_split_apply_combine_on_series(self):
237237
almost=almost,
238238
)
239239

240-
kkey, pkey = (kdf.b + 1, pdf.b + 1)
241-
with self.subTest(as_index=as_index, func=func, key=pkey):
242-
self.assert_eq(
243-
sort(getattr(kdf.groupby(kkey, as_index=as_index).a, func)()),
244-
sort(getattr(pdf.groupby(pkey, as_index=as_index).a, func)()),
245-
almost=almost,
246-
)
247-
self.assert_eq(
248-
sort(getattr(kdf.groupby(kkey, as_index=as_index), func)()),
249-
sort(getattr(pdf.groupby(pkey, as_index=as_index), func)()),
250-
almost=almost,
251-
)
240+
for kkey, pkey in [(kdf.b + 1, pdf.b + 1), (kdf.copy().b, pdf.copy().b)]:
241+
with self.subTest(as_index=as_index, func=func, key=pkey):
242+
self.assert_eq(
243+
sort(getattr(kdf.groupby(kkey, as_index=as_index).a, func)()),
244+
sort(getattr(pdf.groupby(pkey, as_index=as_index).a, func)()),
245+
almost=almost,
246+
)
247+
self.assert_eq(
248+
sort(getattr(kdf.groupby(kkey, as_index=as_index), func)()),
249+
sort(getattr(pdf.groupby(pkey, as_index=as_index), func)()),
250+
almost=almost,
251+
)
252252

253253
for almost, func in funcs:
254254
for i in [0, 4, 7]:
@@ -265,7 +265,11 @@ def test_split_apply_combine_on_series(self):
265265
)
266266

267267
for almost, func in funcs:
268-
for kkey, pkey in [(kdf.b, pdf.b), (kdf.b + 1, pdf.b + 1)]:
268+
for kkey, pkey in [
269+
(kdf.b, pdf.b),
270+
(kdf.b + 1, pdf.b + 1),
271+
(kdf.copy().b, pdf.copy().b),
272+
]:
269273
with self.subTest(func=func, key=pkey):
270274
self.assert_eq(
271275
getattr(kdf.a.groupby(kkey), func)().sort_index(),
@@ -330,28 +334,32 @@ def test_aggregate(self):
330334
sort(pdf.groupby(pkey, as_index=True).agg(["sum"]).reset_index()),
331335
)
332336

333-
kkey, pkey = (kdf.A + 1, pdf.A + 1)
334-
with self.subTest(as_index=as_index, key=pkey):
335-
self.assert_eq(
336-
sort(kdf.groupby(kkey, as_index=as_index).agg("sum")),
337-
sort(pdf.groupby(pkey, as_index=as_index).agg("sum")),
338-
)
339-
self.assert_eq(
340-
sort(kdf.groupby(kkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
341-
sort(pdf.groupby(pkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
342-
)
343-
self.assert_eq(
344-
sort(
345-
kdf.groupby(kkey, as_index=as_index).agg({"B": ["min", "max"], "C": "sum"})
346-
),
347-
sort(
348-
pdf.groupby(pkey, as_index=as_index).agg({"B": ["min", "max"], "C": "sum"})
349-
),
350-
)
351-
self.assert_eq(
352-
sort(kdf.groupby(kkey, as_index=as_index).agg(["sum"])),
353-
sort(pdf.groupby(pkey, as_index=as_index).agg(["sum"])),
354-
)
337+
for kkey, pkey in [(kdf.A + 1, pdf.A + 1), (kdf.copy().A, pdf.copy().A)]:
338+
with self.subTest(as_index=as_index, key=pkey):
339+
self.assert_eq(
340+
sort(kdf.groupby(kkey, as_index=as_index).agg("sum")),
341+
sort(pdf.groupby(pkey, as_index=as_index).agg("sum")),
342+
)
343+
self.assert_eq(
344+
sort(kdf.groupby(kkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
345+
sort(pdf.groupby(pkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
346+
)
347+
self.assert_eq(
348+
sort(
349+
kdf.groupby(kkey, as_index=as_index).agg(
350+
{"B": ["min", "max"], "C": "sum"}
351+
)
352+
),
353+
sort(
354+
pdf.groupby(pkey, as_index=as_index).agg(
355+
{"B": ["min", "max"], "C": "sum"}
356+
)
357+
),
358+
)
359+
self.assert_eq(
360+
sort(kdf.groupby(kkey, as_index=as_index).agg(["sum"])),
361+
sort(pdf.groupby(pkey, as_index=as_index).agg(["sum"])),
362+
)
355363

356364
expected_error_message = (
357365
r"aggs must be a dict mapping from column name \(string or "

databricks/koalas/tests/test_indexing.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,14 @@ def test_loc_with_series(self):
364364
pdf = self.pdf
365365

366366
self.assert_eq(kdf.loc[kdf.a % 2 == 0], pdf.loc[pdf.a % 2 == 0])
367+
self.assert_eq(kdf.loc[kdf.a % 2 == 0, "a"], pdf.loc[pdf.a % 2 == 0, "a"])
368+
self.assert_eq(kdf.loc[kdf.a % 2 == 0, ["a"]], pdf.loc[pdf.a % 2 == 0, ["a"]])
369+
self.assert_eq(kdf.a.loc[kdf.a % 2 == 0], pdf.a.loc[pdf.a % 2 == 0])
370+
371+
self.assert_eq(kdf.loc[kdf.copy().a % 2 == 0], pdf.loc[pdf.copy().a % 2 == 0])
372+
self.assert_eq(kdf.loc[kdf.copy().a % 2 == 0, "a"], pdf.loc[pdf.copy().a % 2 == 0, "a"])
373+
self.assert_eq(kdf.loc[kdf.copy().a % 2 == 0, ["a"]], pdf.loc[pdf.copy().a % 2 == 0, ["a"]])
374+
self.assert_eq(kdf.a.loc[kdf.copy().a % 2 == 0], pdf.a.loc[pdf.copy().a % 2 == 0])
367375

368376
def test_loc_noindex(self):
369377
kdf = self.kdf

databricks/koalas/utils.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,16 @@ def same_anchor(
4747
from databricks.koalas.base import IndexOpsMixin
4848
from databricks.koalas.frame import DataFrame
4949

50-
if isinstance(this, DataFrame):
51-
this_kdf = this
52-
else:
53-
assert isinstance(this, IndexOpsMixin), type(this)
54-
this_kdf = this._kdf
55-
if isinstance(that, DataFrame):
56-
that_kdf = that
57-
else:
58-
assert isinstance(that, IndexOpsMixin), type(that)
59-
that_kdf = that._kdf
60-
return this_kdf is that_kdf
50+
assert isinstance(this, (DataFrame, IndexOpsMixin)), type(this)
51+
this_internal = this._internal
52+
53+
assert isinstance(that, (DataFrame, IndexOpsMixin)), type(that)
54+
that_internal = that._internal
55+
56+
return (
57+
this_internal.spark_frame is that_internal.spark_frame
58+
and this_internal.index_map == that_internal.index_map
59+
)
6160

6261

6362
def combine_frames(this, *args, how="full", preserve_order_column=False):

0 commit comments

Comments
 (0)