Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,25 +599,45 @@ def _map_series_op(self, op, other):
"however, got %s." % (op, type(other))
)

if isinstance(other, DataFrame) and not same_anchor(self, other):
if isinstance(other, DataFrame):
if self._internal.column_labels_level != other._internal.column_labels_level:
raise ValueError("cannot join with no overlapping index names")

# Different DataFrames
def apply_op(kdf, this_column_labels, that_column_labels):
for this_label, that_label in zip(this_column_labels, that_column_labels):
yield (
getattr(kdf._kser_for(this_label), op)(kdf._kser_for(that_label)),
this_label,
)
if not same_anchor(self, other):
# Different DataFrames
def apply_op(kdf, this_column_labels, that_column_labels):
for this_label, that_label in zip(this_column_labels, that_column_labels):
yield (
getattr(kdf._kser_for(this_label), op)(kdf._kser_for(that_label)),
this_label,
)

return align_diff_frames(apply_op, self, other, fillna=True, how="full")
else:
# DataFrame and Series
if isinstance(other, DataFrame):
return self._apply_series_op(lambda kser: getattr(kser, op)(other[kser.name]))
return align_diff_frames(apply_op, self, other, fillna=True, how="full")
else:
return self._apply_series_op(lambda kser: getattr(kser, op)(other))
applied = []
column_labels = []
for label in self._internal.column_labels:
if label in other._internal.column_labels:
applied.append(getattr(self._kser_for(label), op)(other._kser_for(label)))
else:
applied.append(
F.lit(None)
.cast(self._internal.spark_type_for(label))
.alias(name_like_string(label))
)
column_labels.append(label)
for label in other._internal.column_labels:
if label not in column_labels:
applied.append(
F.lit(None)
.cast(other._internal.spark_type_for(label))
.alias(name_like_string(label))
)
column_labels.append(label)
internal = self._internal.with_new_columns(applied, column_labels)
return DataFrame(internal)
else:
return self._apply_series_op(lambda kser: getattr(kser, op)(other))

def __add__(self, other):
return self._map_series_op("add", other)
Expand Down
8 changes: 6 additions & 2 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2175,9 +2175,13 @@ def assign_columns(kdf, this_column_labels, that_column_labels):
@staticmethod
def _resolve_grouping(kdf: DataFrame, by: List[Union[Series, Tuple[str, ...]]]) -> List[Series]:
new_by_series = []
for col_or_s in by:
for i, col_or_s in enumerate(by):
if isinstance(col_or_s, Series):
new_by_series.append(col_or_s)
if col_or_s._kdf is kdf:
new_by_series.append(col_or_s)
else:
# Rename to distinguish the key from a different DataFrame.
new_by_series.append(col_or_s.rename("__tmp_groupkey_{}__".format(i)))
elif isinstance(col_or_s, tuple):
kser = kdf[col_or_s]
if not isinstance(kser, Series):
Expand Down
7 changes: 7 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,13 @@ def test_clip(self):
self.assert_eq(str_kdf.clip(1, 3), str_kdf)

def test_binary_operators(self):
pdf = pd.DataFrame(
{"A": [0, 2, 4], "B": [4, 2, 0], "X": [-1, 10, 0]}, index=np.random.rand(3)
)
kdf = ks.from_pandas(pdf)

self.assert_eq(kdf + kdf.copy(), pdf + pdf.copy())

self.assertRaisesRegex(
ValueError,
"it comes from a different dataframe",
Expand Down
78 changes: 43 additions & 35 deletions databricks/koalas/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,18 @@ def test_split_apply_combine_on_series(self):
almost=almost,
)

kkey, pkey = (kdf.b + 1, pdf.b + 1)
with self.subTest(as_index=as_index, func=func, key=pkey):
self.assert_eq(
sort(getattr(kdf.groupby(kkey, as_index=as_index).a, func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index).a, func)()),
almost=almost,
)
self.assert_eq(
sort(getattr(kdf.groupby(kkey, as_index=as_index), func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index), func)()),
almost=almost,
)
for kkey, pkey in [(kdf.b + 1, pdf.b + 1), (kdf.copy().b, pdf.copy().b)]:
with self.subTest(as_index=as_index, func=func, key=pkey):
self.assert_eq(
sort(getattr(kdf.groupby(kkey, as_index=as_index).a, func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index).a, func)()),
almost=almost,
)
self.assert_eq(
sort(getattr(kdf.groupby(kkey, as_index=as_index), func)()),
sort(getattr(pdf.groupby(pkey, as_index=as_index), func)()),
almost=almost,
)

for almost, func in funcs:
for i in [0, 4, 7]:
Expand All @@ -265,7 +265,11 @@ def test_split_apply_combine_on_series(self):
)

for almost, func in funcs:
for kkey, pkey in [(kdf.b, pdf.b), (kdf.b + 1, pdf.b + 1)]:
for kkey, pkey in [
(kdf.b, pdf.b),
(kdf.b + 1, pdf.b + 1),
(kdf.copy().b, pdf.copy().b),
]:
with self.subTest(func=func, key=pkey):
self.assert_eq(
getattr(kdf.a.groupby(kkey), func)().sort_index(),
Expand Down Expand Up @@ -330,28 +334,32 @@ def test_aggregate(self):
sort(pdf.groupby(pkey, as_index=True).agg(["sum"]).reset_index()),
)

kkey, pkey = (kdf.A + 1, pdf.A + 1)
with self.subTest(as_index=as_index, key=pkey):
self.assert_eq(
sort(kdf.groupby(kkey, as_index=as_index).agg("sum")),
sort(pdf.groupby(pkey, as_index=as_index).agg("sum")),
)
self.assert_eq(
sort(kdf.groupby(kkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
sort(pdf.groupby(pkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
)
self.assert_eq(
sort(
kdf.groupby(kkey, as_index=as_index).agg({"B": ["min", "max"], "C": "sum"})
),
sort(
pdf.groupby(pkey, as_index=as_index).agg({"B": ["min", "max"], "C": "sum"})
),
)
self.assert_eq(
sort(kdf.groupby(kkey, as_index=as_index).agg(["sum"])),
sort(pdf.groupby(pkey, as_index=as_index).agg(["sum"])),
)
for kkey, pkey in [(kdf.A + 1, pdf.A + 1), (kdf.copy().A, pdf.copy().A)]:
with self.subTest(as_index=as_index, key=pkey):
self.assert_eq(
sort(kdf.groupby(kkey, as_index=as_index).agg("sum")),
sort(pdf.groupby(pkey, as_index=as_index).agg("sum")),
)
self.assert_eq(
sort(kdf.groupby(kkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
sort(pdf.groupby(pkey, as_index=as_index).agg({"B": "min", "C": "sum"})),
)
self.assert_eq(
sort(
kdf.groupby(kkey, as_index=as_index).agg(
{"B": ["min", "max"], "C": "sum"}
)
),
sort(
pdf.groupby(pkey, as_index=as_index).agg(
{"B": ["min", "max"], "C": "sum"}
)
),
)
self.assert_eq(
sort(kdf.groupby(kkey, as_index=as_index).agg(["sum"])),
sort(pdf.groupby(pkey, as_index=as_index).agg(["sum"])),
)

expected_error_message = (
r"aggs must be a dict mapping from column name \(string or "
Expand Down
8 changes: 8 additions & 0 deletions databricks/koalas/tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ def test_loc_with_series(self):
pdf = self.pdf

self.assert_eq(kdf.loc[kdf.a % 2 == 0], pdf.loc[pdf.a % 2 == 0])
self.assert_eq(kdf.loc[kdf.a % 2 == 0, "a"], pdf.loc[pdf.a % 2 == 0, "a"])
self.assert_eq(kdf.loc[kdf.a % 2 == 0, ["a"]], pdf.loc[pdf.a % 2 == 0, ["a"]])
self.assert_eq(kdf.a.loc[kdf.a % 2 == 0], pdf.a.loc[pdf.a % 2 == 0])

self.assert_eq(kdf.loc[kdf.copy().a % 2 == 0], pdf.loc[pdf.copy().a % 2 == 0])
self.assert_eq(kdf.loc[kdf.copy().a % 2 == 0, "a"], pdf.loc[pdf.copy().a % 2 == 0, "a"])
self.assert_eq(kdf.loc[kdf.copy().a % 2 == 0, ["a"]], pdf.loc[pdf.copy().a % 2 == 0, ["a"]])
self.assert_eq(kdf.a.loc[kdf.copy().a % 2 == 0], pdf.a.loc[pdf.copy().a % 2 == 0])

def test_loc_noindex(self):
kdf = self.kdf
Expand Down
21 changes: 10 additions & 11 deletions databricks/koalas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@ def same_anchor(
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.frame import DataFrame

if isinstance(this, DataFrame):
this_kdf = this
else:
assert isinstance(this, IndexOpsMixin), type(this)
this_kdf = this._kdf
if isinstance(that, DataFrame):
that_kdf = that
else:
assert isinstance(that, IndexOpsMixin), type(that)
that_kdf = that._kdf
return this_kdf is that_kdf
assert isinstance(this, (DataFrame, IndexOpsMixin)), type(this)
this_internal = this._internal

assert isinstance(that, (DataFrame, IndexOpsMixin)), type(that)
that_internal = that._internal

return (
this_internal.spark_frame is that_internal.spark_frame
and this_internal.index_map == that_internal.index_map
)


def combine_frames(this, *args, how="full", preserve_order_column=False):
Expand Down