Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
A wrapper class for Spark DataFrame to behave similar to pandas DataFrame.
"""
from functools import partial, reduce
from typing import Any, List, Union
from typing import Any, List, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -1467,6 +1467,112 @@ def shape(self):
"""
return len(self), len(self.columns)

def merge(self, right: 'DataFrame', how: str = 'inner', on: str = None,
suffixes: Tuple[str, str] = ('_x', '_y')) -> 'DataFrame':
"""
Merge DataFrame objects with a database-style join.

Parameters
----------
right: Object to merge with.
how: Type of merge to be performed.
{‘left’, ‘right’, ‘outer’, ‘inner’}, default ‘inner’

left: use only keys from left frame, similar to a SQL left outer join; preserve key
order.
right: use only keys from right frame, similar to a SQL right outer join; preserve key
order.
outer: use union of keys from both frames, similar to a SQL full outer join; sort keys
lexicographically.
inner: use intersection of keys from both frames, similar to a SQL inner join;
preserve the order of the left keys.
on: Column or index level names to join on. These must be found in both DataFrames. If on
is None and not merging on indexes then this defaults to the intersection of the
columns in both DataFrames.
suffixes: Suffix to apply to overlapping column names in the left and right side,
respectively.

Returns
-------
DataFrame
A DataFrame of the two merged objects.

Examples
--------
>>> left_kdf = ks.DataFrame({'A': [1, 2]})
>>> right_kdf = ks.DataFrame({'B': ['x', 'y']}, index=[1, 2])

>>> left_kdf.merge(right_kdf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case what is the join key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is outdated. Now it should be

left_kdf.merge(right_kdf, left_index=True, right_index=True)

to join on the indices.

A B
0 2 x

>>> left_kdf.merge(right_kdf, how='left')
A B
0 1 None
1 2 x

>>> left_kdf.merge(right_kdf, how='right')
A B
0 2.0 x
1 NaN y

>>> left_kdf.merge(right_kdf, how='outer')
A B
0 1.0 None
1 2.0 x
2 NaN y

Notes
-----
As described in #263, joining string columns currently returns None for missing values
instead of NaN.
"""
if how == 'full':
print("Warning: While Koalas will accept 'full', you should use 'outer' instead to",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use warnings package like warnings.warn("...", UserWarning)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've changed it to warnings.warn(...) with 4e7c429.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need one more space at the end of the string: instead to "

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When comma-separating strings in a print statement, Python will automatically add spaces in between. However, since 4e7c429 moves from print() to warnings.warn(), this has been taken care of.

"be compatible with the pandas merge API")
if how == 'outer':
# 'outer' in pandas equals 'full' in Spark
how = 'full'
if how not in ('inner', 'left', 'right', 'full'):
raise ValueError("The 'how' parameter has to be amongst the following values: ",
"['inner', 'left', 'right', 'full', 'outer']")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should include 'full' in the message since it is optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I've removed the full option with 0ad39b8.


if on is None:
# FIXME Move index string to constant?
on = '__index_level_0__'

left_table = self._sdf.alias('left_table')
right_table = right._sdf.alias('right_table')

# Unpack suffixes tuple for convenience
left_suffix = suffixes[0]
right_suffix = suffixes[1]

# Append suffixes to columns with the same name to avoid conflicts later
duplicate_columns = list(self.columns & right.columns)
if duplicate_columns:
for duplicate_column_name in duplicate_columns:
left_table = left_table.withColumnRenamed(duplicate_column_name,
duplicate_column_name + left_suffix)
right_table = right_table.withColumnRenamed(duplicate_column_name,
duplicate_column_name + right_suffix)

join_condition = (left_table[on] == right_table[on] if on not in duplicate_columns
else left_table[on + left_suffix] == right_table[on + right_suffix])
joined_table = left_table.join(right_table, join_condition, how=how)

if on in duplicate_columns:
# Merge duplicate key columns
joined_table = joined_table.withColumnRenamed(on + left_suffix, on)
joined_table = joined_table.drop(on + right_suffix)

# Remove auxiliary index
# FIXME Move index string to constant?
joined_table = joined_table.drop('__index_level_0__')

kdf = DataFrame(joined_table)
return kdf

def _pd_getitem(self, key):
from databricks.koalas.series import Series
if key is None:
Expand Down
1 change: 0 additions & 1 deletion databricks/koalas/missing/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class _MissingPandasLikeDataFrame(object):
median = unsupported_function('median')
melt = unsupported_function('melt')
memory_usage = unsupported_function('memory_usage')
merge = unsupported_function('merge')
mod = unsupported_function('mod')
mode = unsupported_function('mode')
mul = unsupported_function('mul')
Expand Down
38 changes: 38 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,41 @@ def test_isin(self):
msg = "Values should be iterable, Series, DataFrame or dict."
with self.assertRaisesRegex(TypeError, msg):
kdf.isin(1)

def test_merge(self):
left_kdf = koalas.DataFrame({'A': [1, 2]})
right_kdf = koalas.DataFrame({'B': ['x', 'y']}, index=[1, 2])

msg = ("The 'how' parameter has to be amongst the following values: ['inner', 'left', " +
"'right', 'full', 'outer']")
with self.assertRaises(ValueError, msg=msg):
left_kdf.merge(right_kdf, how='foo')

# Assert inner join
res = left_kdf.merge(right_kdf)
self.assert_eq(res, pd.DataFrame({'A': [2], 'B': ['x']}))
Copy link
Collaborator

@ueshin ueshin May 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this behavior following pandas'?
I might miss something, but I got an error from pandas:

>>> import pandas as pd
>>> pd.__version__
'0.24.2'
>>> left_pdf = pd.DataFrame({'A': [1, 2]})
>>> right_pdf = pd.DataFrame({'B': ['x', 'y']}, index=[1, 2])
>>> left_pdf.merge(right_pdf)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pandas/core/frame.py", line 6868, in merge
    copy=copy, indicator=indicator, validate=validate)
  File "/.../pandas/core/reshape/merge.py", line 47, in merge
    validate=validate)
  File "/.../pandas/core/reshape/merge.py", line 524, in __init__
    self._validate_specification()
  File "/.../pandas/core/reshape/merge.py", line 1033, in _validate_specification
    lidx=self.left_index, ridx=self.right_index))
pandas.errors.MergeError: No common columns to perform merge on. Merge options: left_on=None, right_on=None, left_index=False, right_index=False

Btw, I think basically we should use exactly the same pandas code as the counterpart of the assert_eq().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Without specifying any columns to join on, the current implementation defaults to the pandas equivalent of left_pdf.merge(right_pdf, left_index=True, right_index=True). Would you prefer the addition of those parameters and the respective MergeError to follow the behavior of pandas?

Regarding the tests, I totally agree with you. But because of #263, it currently fails as some data types mismatch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to add the parameters here, but at least we should respect the default value of pandas, e.g., assuming left_index=False, and right_index=False.

As for the MergeError, hmm, interesting.
I remember I added SparkPandasIndexingError for pandas IndexingError before. Maybe we can add SparkPandasMergeError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now added the SparkPandasMergeError with abfe0c9.


# Assert inner join on non-default column
left_kdf_with_id = koalas.DataFrame({'A': [1, 2], 'id': [0, 1]})
right_kdf_with_id = koalas.DataFrame({'B': ['x', 'y'], 'id': [0, 1]}, index=[1, 2])
res = left_kdf_with_id.merge(right_kdf_with_id, on='id')
self.assert_eq(res, pd.DataFrame({'A': [1, 2], 'id': [0, 1], 'B': ['x', 'y']}))

# Assert left join
res = left_kdf.merge(right_kdf, how='left')
# FIXME Replace None with np.nan once #263 is solved
self.assert_eq(res, pd.DataFrame({'A': [1, 2], 'B': [None, 'x']}))

# Assert right join
res = left_kdf.merge(right_kdf, how='right')
self.assert_eq(res, pd.DataFrame({'A': [2, np.nan], 'B': ['x', 'y']}))

# Assert full outer join
res = left_kdf.merge(right_kdf, how='outer')
# FIXME Replace None with np.nan once #263 is solved
self.assert_eq(res, pd.DataFrame({'A': [1, 2, np.nan], 'B': [None, 'x', 'y']}))

# Assert full outer join also works with 'full' keyword
res = left_kdf.merge(right_kdf, how='full')
# FIXME Replace None with np.nan once #263 is solved
self.assert_eq(res, pd.DataFrame({'A': [1, 2, np.nan], 'B': [None, 'x', 'y']}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to use suffixes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with 1407922.