Skip to content

Commit 6e85375

Browse files
committed
Add an option to enable operations on different DataFrames
1 parent 37e783c commit 6e85375

File tree

4 files changed

+555
-25
lines changed

4 files changed

+555
-25
lines changed

databricks/koalas/base.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
3333
from databricks.koalas.internal import _InternalFrame
3434
from databricks.koalas.typedef import pandas_wraps
35+
from databricks.koalas.utils import align_diff_series
3536

3637

3738
def _column_op(f):
@@ -47,16 +48,23 @@ def _column_op(f):
4748
"""
4849
@wraps(f)
4950
def wrapper(self, *args):
50-
assert all((not isinstance(arg, IndexOpsMixin))
51-
or (arg._kdf is self._kdf) for arg in args), \
52-
"Cannot combine column argument because it comes from a different dataframe"
53-
5451
# It is possible for the function `f` takes other arguments than Spark Column.
5552
# To cover this case, explicitly check if the argument is Koalas Series and
5653
# extract Spark Column. For other arguments, they are used as are.
57-
args = [arg._scol if isinstance(arg, IndexOpsMixin) else arg for arg in args]
58-
scol = f(self._scol, *args)
59-
return self._with_new_scol(scol)
54+
cols = [arg for arg in args if isinstance(arg, IndexOpsMixin)]
55+
if all(self._kdf is col._kdf for col in cols):
56+
# Same DataFrame anchors
57+
args = [arg._scol if isinstance(arg, IndexOpsMixin) else arg for arg in args]
58+
scol = f(self._scol, *args)
59+
60+
return self._with_new_scol(scol)
61+
else:
62+
# Different DataFrame anchors
63+
def apply_func(this_column, *that_columns):
64+
return f(this_column, *that_columns)
65+
66+
return align_diff_series(apply_func, self, *args, how="full")
67+
6068
return wrapper
6169

6270

databricks/koalas/frame.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
"""
1818
A wrapper class for Spark DataFrame to behave similar to pandas DataFrame.
1919
"""
20+
from collections import OrderedDict
2021
from distutils.version import LooseVersion
2122
import re
2223
import warnings
2324
import inspect
2425
from functools import partial, reduce
2526
import sys
27+
from itertools import zip_longest
2628
from typing import Any, Optional, List, Tuple, Union, Generic, TypeVar
2729

2830
import numpy as np
@@ -42,7 +44,7 @@
4244
from pyspark.sql.functions import pandas_udf
4345

4446
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
45-
from databricks.koalas.utils import validate_arguments_and_invoke_function
47+
from databricks.koalas.utils import validate_arguments_and_invoke_function, align_diff_frames
4648
from databricks.koalas.generic import _Frame, max_display_count
4749
from databricks.koalas.internal import _InternalFrame, IndexMap
4850
from databricks.koalas.missing.frame import _MissingPandasLikeDataFrame
@@ -386,19 +388,33 @@ def calculate_columns_axis(*cols):
386388

387389
# Arithmetic Operators
388390
def _map_series_op(self, op, other):
389-
if isinstance(other, DataFrame) or is_sequence(other):
391+
if not isinstance(other, DataFrame) and is_sequence(other):
390392
raise ValueError(
391-
"%s with another DataFrame or a sequence is currently not supported; "
393+
"%s with a sequence is currently not supported; "
392394
"however, got %s." % (op, type(other)))
393395

394396
applied = []
395-
for column in self._internal.data_columns:
396-
applied.append(getattr(self[column], op)(other))
397+
if isinstance(other, DataFrame) and self is not other:
398+
# Different DataFrames
399+
def apply_op(kdf, this_columns, that_columns):
400+
for this_column, that_column in zip(this_columns, that_columns):
401+
yield getattr(kdf[this_column], op)(kdf[that_column])
402+
403+
return align_diff_frames(
404+
apply_op, self, other, fillna=True, how="full", include_all_that_columns=False)
405+
elif isinstance(other, DataFrame) and self is not other:
406+
# Same DataFrames
407+
for column in self._internal.data_columns:
408+
applied.append(getattr(self[column], op)(other[column]))
409+
else:
410+
# DataFrame and Series
411+
for column in self._internal.data_columns:
412+
applied.append(getattr(self[column], op)(other))
397413

398-
sdf = self._sdf.select(
399-
self._internal.index_scols + [c._scol for c in applied])
400-
internal = self._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
401-
return DataFrame(internal)
414+
sdf = self._sdf.select(
415+
self._internal.index_scols + [c._scol for c in applied])
416+
internal = self._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
417+
return DataFrame(internal)
402418

403419
def __add__(self, other):
404420
return self._map_series_op("add", other)
@@ -6337,17 +6353,36 @@ def __getitem__(self, key):
63376353

63386354
def __setitem__(self, key, value):
63396355
from databricks.koalas.series import Series
6340-
# For now, we don't support realignment against different dataframes.
6341-
# This is too expensive in Spark.
6342-
# Are we assigning against a column?
6343-
if isinstance(value, Series):
6344-
assert value._kdf is self, \
6345-
"Cannot combine column argument because it comes from a different dataframe"
6346-
if isinstance(key, (tuple, list)):
6347-
assert isinstance(value.schema, StructType)
6348-
field_names = value.schema.fieldNames()
6356+
6357+
if ((isinstance(value, Series) and value._kdf is not self) or
6358+
(isinstance(value, DataFrame) and value is not self)):
6359+
# Different (anchor) DataFrames
6360+
if isinstance(value, Series):
6361+
value = value.to_frame()
6362+
6363+
if not isinstance(key, (tuple, list)):
6364+
key = [key]
6365+
6366+
def assign_columns(kdf, this_columns, that_columns):
6367+
assert len(key) == len(that_columns)
6368+
# Note that here intentionally uses `zip_longest` that combine
6369+
# that_columns.
6370+
for k, this_column, that_column in zip_longest(key, this_columns, that_columns):
6371+
yield kdf[that_column].rename(k)
6372+
if this_column is not None:
6373+
# if both're same columns first one is higher priority.
6374+
yield kdf[this_column]
6375+
6376+
kdf = align_diff_frames(
6377+
assign_columns, self, value, fillna=False,
6378+
how="left", include_all_that_columns=True)
6379+
elif isinstance(key, (tuple, list)):
6380+
assert isinstance(value, DataFrame)
6381+
# Same DataFrames.
6382+
field_names = value.columns
63496383
kdf = self.assign(**{k: value[c] for k, c in zip(key, field_names)})
63506384
else:
6385+
# Same anchor DataFrames.
63516386
kdf = self.assign(**{key: value})
63526387

63536388
self._internal = kdf._internal

0 commit comments

Comments
 (0)