Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -9655,34 +9655,8 @@ def __setitem__(self, key, value):
isinstance(value, DataFrame) and value is not self
):
# Different Series or DataFrames
if isinstance(value, Series):
value = value.to_frame()
else:
assert isinstance(value, DataFrame), type(value)
value = value.copy()
level = self._internal.column_labels_level

value.columns = pd.MultiIndex.from_tuples(
[
tuple([name_like_string(label)] + ([""] * (level - 1)))
for label in value._internal.column_labels
]
)

if isinstance(key, str):
key = [(key,)]
elif isinstance(key, tuple):
key = [key]
else:
key = [k if isinstance(k, tuple) else (k,) for k in key]

if any(len(label) > level for label in key):
raise KeyError(
"Key length ({}) exceeds index depth ({})".format(
max(len(label) for label in key), level
)
)
key = [tuple(list(label) + ([""] * (level - len(label)))) for label in key]
key = self._index_normalized_label(key)
value = self._index_normalized_frame(value)

def assign_columns(kdf, this_column_labels, that_column_labels):
assert len(key) == len(that_column_labels)
Expand All @@ -9707,6 +9681,54 @@ def assign_columns(kdf, this_column_labels, that_column_labels):

self._internal = kdf._internal

def _index_normalized_label(self, labels):
"""
Returns a label that is normalized against the current column index level.
For example, the key "abc" can be ("abc", "", "") if the current Frame has
a multi-index for its column
"""
level = self._internal.column_labels_level

if isinstance(labels, str):
labels = [(labels,)]
elif isinstance(labels, tuple):
labels = [labels]
else:
labels = [k if isinstance(k, tuple) else (k,) for k in labels]

if any(len(label) > level for label in labels):
raise KeyError(
"Key length ({}) exceeds index depth ({})".format(
max(len(label) for label in labels), level
)
)
return [tuple(list(label) + ([""] * (level - len(label)))) for label in labels]

def _index_normalized_frame(self, kser_or_kdf):
"""
Returns a frame that is normalized against the current column index level.
For example, the name in `pd.Series([...], name="abc")` can be can be
("abc", "", "") if the current DataFrame has a multi-index for its column
"""

from databricks.koalas.series import Series

level = self._internal.column_labels_level
if isinstance(kser_or_kdf, Series):
kdf = kser_or_kdf.to_frame()
else:
assert isinstance(kser_or_kdf, DataFrame), type(kser_or_kdf)
kdf = kser_or_kdf.copy()

kdf.columns = pd.MultiIndex.from_tuples(
[
tuple([name_like_string(label)] + ([""] * (level - 1)))
for label in kdf._internal.column_labels
]
)

return kdf

def __getattr__(self, key: str) -> Any:
if key.startswith("__"):
raise AttributeError(key)
Expand Down
110 changes: 99 additions & 11 deletions databricks/koalas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.utils import default_session, name_like_string, scol_for, validate_axis
from databricks.koalas.utils import (
default_session,
name_like_string,
scol_for,
validate_axis,
align_diff_frames,
)
from databricks.koalas.frame import DataFrame, _reduce_spark_multi
from databricks.koalas.internal import _InternalFrame
from databricks.koalas.typedef import pandas_wraps
Expand Down Expand Up @@ -107,6 +113,9 @@ def from_pandas(pobj: Union["pd.DataFrame", "pd.Series"]) -> Union["Series", "Da
raise ValueError("Unknown data type: {}".format(type(pobj)))


_range = range # built-in range


def range(
start: int, end: Optional[int] = None, step: int = 1, num_partitions: Optional[int] = None
) -> DataFrame:
Expand Down Expand Up @@ -1539,11 +1548,11 @@ def concat(objs, axis=0, join="outer", ignore_index=False):
objs : a sequence of Series or DataFrame
Any None objects will be dropped silently unless
they are all None in which case a ValueError will be raised
axis : {0/'index'}, default 0
axis : {0/'index', 1/'columns'}, default 0
The axis to concatenate along.
join : {'inner', 'outer'}, default 'outer'
How to handle indexes on other axis(es)
ignore_index : boolean, default False
How to handle indexes on other axis (or axes).
ignore_index : bool, default False
If True, do not use the index values along the concatenation axis. The
resulting axis will be labeled 0, ..., n - 1. This is useful if you are
concatenating objects where the concatenation axis does not have
Expand All @@ -1552,14 +1561,17 @@ def concat(objs, axis=0, join="outer", ignore_index=False):

Returns
-------
concatenated : object, type of objs
object, type of objs
When concatenating all ``Series`` along the index (axis=0), a
``Series`` is returned. When ``objs`` contains at least one
``DataFrame``, a ``DataFrame`` is returned.
``DataFrame``, a ``DataFrame`` is returned. When concatenating along
the columns (axis=1), a ``DataFrame`` is returned.

See Also
--------
DataFrame.merge
Series.append : Concatenate Series.
DataFrame.join : Join DataFrames using indexes.
DataFrame.merge : Merge DataFrames by indexes or columns.

Examples
--------
Expand Down Expand Up @@ -1645,6 +1657,17 @@ def concat(objs, axis=0, join="outer", ignore_index=False):
1 b 2
0 c 3
1 d 4

>>> df4 = ks.DataFrame([['bird', 'polly'], ['monkey', 'george']],
... columns=['animal', 'name'])

Combine with column axis.

>>> ks.concat([df1, df4], axis=1)
letter number animal name
0 a 1 bird polly
1 b 2 monkey george

"""
if isinstance(objs, (DataFrame, IndexOpsMixin)) or not isinstance(
objs, Iterable
Expand All @@ -1655,10 +1678,6 @@ def concat(objs, axis=0, join="outer", ignore_index=False):
'"{name}"'.format(name=type(objs).__name__)
)

axis = validate_axis(axis)
if axis != 0:
raise NotImplementedError('axis should be either 0 or "index" currently.')

if len(objs) == 0:
raise ValueError("No objects to concatenate")
objs = list(filter(lambda obj: obj is not None, objs))
Expand All @@ -1674,6 +1693,75 @@ def concat(objs, axis=0, join="outer", ignore_index=False):
"and ks.DataFrame are valid".format(name=type(objs).__name__)
)

axis = validate_axis(axis)
if axis == 1:
if isinstance(objs[0], ks.Series):
concat_kdf = objs[0].to_frame()
else:
concat_kdf = objs[0]

with ks.option_context("compute.ops_on_diff_frames", True):

def assign_columns(kdf, this_column_labels, that_column_labels):
duplicated_names = set(
this_column_label[1:] for this_column_label in this_column_labels
).intersection(
set(that_column_label[1:] for that_column_label in that_column_labels)
)
if len(duplicated_names) > 0:
pretty_names = [
name_like_string(column_label) for column_label in duplicated_names
]
raise ValueError(
"Labels have to be unique; however, got "
"duplicated labels %s." % pretty_names
)

# Note that here intentionally uses `zip_longest` that combine
# all columns.
for this_label, that_label in itertools.zip_longest(
this_column_labels, that_column_labels
):
# duplicated columns will be distinct within `align_diff_frames`.
if this_label is not None:
yield (kdf._kser_for(this_label), this_label)
if that_label is not None:
yield (kdf._kser_for(that_label), that_label)

for kser_or_kdf in objs[1:]:
if isinstance(kser_or_kdf, Series):
# TODO: there is a corner case to optimize - when the series are from
# the same DataFrame.
that_kdf = kser_or_kdf.to_frame()
else:
that_kdf = kser_or_kdf

this_index_level = concat_kdf._internal.column_labels_level
that_index_level = that_kdf._internal.column_labels_level

if this_index_level > that_index_level:
concat_kdf = that_kdf._index_normalized_frame(concat_kdf)
if this_index_level < that_index_level:
that_kdf = concat_kdf._index_normalized_frame(that_kdf)

if join == "inner":
concat_kdf = align_diff_frames(
assign_columns, concat_kdf, that_kdf, fillna=False, how="inner",
)
elif join == "outer":
concat_kdf = align_diff_frames(
assign_columns, concat_kdf, that_kdf, fillna=False, how="full",
)
else:
raise ValueError(
"Only can inner (intersect) or outer (union) join the other axis."
)

if ignore_index:
concat_kdf.columns = list(map(str, _range(len(concat_kdf.columns))))

return concat_kdf

# Series, Series ...
# We should return Series if objects are all Series.
should_return_series = all(map(lambda obj: isinstance(obj, Series), objs))
Expand Down
77 changes: 73 additions & 4 deletions databricks/koalas/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import itertools

import pandas as pd

Expand Down Expand Up @@ -92,10 +93,6 @@ def test_concat(self):

self.assertRaisesRegex(ValueError, "All objects passed", lambda: ks.concat([None, None]))

self.assertRaisesRegex(
NotImplementedError, "axis should be either 0 or", lambda: ks.concat([kdf, kdf], axis=1)
)

pdf3 = pdf.copy()
kdf3 = kdf.copy()

Expand Down Expand Up @@ -128,3 +125,75 @@ def test_concat(self):
r"Only can inner \(intersect\) or outer \(union\) join the other axis.",
lambda: ks.concat([kdf, kdf4], join=""),
)

self.assertRaisesRegex(
ValueError,
r"Only can inner \(intersect\) or outer \(union\) join the other axis.",
lambda: ks.concat([kdf, kdf4], join="", axis=1),
)

self.assertRaisesRegex(
ValueError,
r"Only can inner \(intersect\) or outer \(union\) join the other axis.",
lambda: ks.concat([kdf.A, kdf4.B], join="", axis=1),
)

self.assertRaisesRegex(
ValueError,
r"Labels have to be unique; however, got duplicated labels \['A'\].",
lambda: ks.concat([kdf.A, kdf4.A], join="inner", axis=1),
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test also was added to verify duplicated column cases.


def test_concat_column_axis(self):
pdf1 = pd.DataFrame({"A": [0, 2, 4], "B": [1, 3, 5]}, index=[1, 2, 3])
pdf2 = pd.DataFrame({"C": [1, 2, 3], "D": [4, 5, 6]}, index=[1, 3, 5])
kdf1 = ks.from_pandas(pdf1)
kdf2 = ks.from_pandas(pdf2)

kdf3 = kdf1.copy()
kdf4 = kdf2.copy()
pdf3 = pdf1.copy()
pdf4 = pdf2.copy()

columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B")])
pdf3.columns = columns
kdf3.columns = columns

columns = pd.MultiIndex.from_tuples([("X", "C"), ("X", "D")])
pdf4.columns = columns
kdf4.columns = columns

ignore_indexes = [True, False]
joins = ["inner", "outer"]

objs = [
([kdf1.A, kdf2.C], [pdf1.A, pdf2.C]),
([kdf1, kdf2.C], [pdf1, pdf2.C]),
([kdf1.A, kdf2], [pdf1.A, pdf2]),
([kdf1.A, kdf2.C], [pdf1.A, pdf2.C]),
([kdf1.A, kdf1.A.rename("B")], [pdf1.A, pdf1.A.rename("B")]),
([kdf3[("X", "A")], kdf4[("X", "C")]], [pdf3[("X", "A")], pdf4[("X", "C")]]),
([kdf3, kdf4[("X", "C")]], [pdf3, pdf4[("X", "C")]]),
([kdf3[("X", "A")], kdf4], [pdf3[("X", "A")], pdf4]),
([kdf3, kdf4], [pdf3, pdf4]),
([kdf3[("X", "A")], kdf3[("X", "B")]], [pdf3[("X", "A")], pdf3[("X", "B")]],),
(
[kdf3[("X", "A")], kdf3[("X", "B")].rename("ABC")],
[pdf3[("X", "A")], pdf3[("X", "B")].rename("ABC")],
),
(
[kdf3[("X", "A")].rename("ABC"), kdf3[("X", "B")]],
[pdf3[("X", "A")].rename("ABC"), pdf3[("X", "B")]],
),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This two test cases were added to verify single index vs multi-index in the column.

]

for ignore_index, join in itertools.product(ignore_indexes, joins):
for obj in objs:
kdfs, pdfs = obj
with self.subTest(ignore_index=ignore_index, join=join, objs=obj):
actual = ks.concat(kdfs, axis=1, ignore_index=ignore_index, join=join)
expected = pd.concat(pdfs, axis=1, ignore_index=ignore_index, join=join)
self.assert_eq(
repr(actual.sort_values(list(actual.columns)).reset_index(drop=True)),
repr(expected.sort_values(list(expected.columns)).reset_index(drop=True)),
)