Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 96 additions & 1 deletion databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
A wrapper class for Spark DataFrame to behave similar to pandas DataFrame.
"""
from functools import partial, reduce
from typing import Any, List, Union
from typing import Any, List, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -1467,6 +1467,101 @@ def shape(self):
"""
return len(self), len(self.columns)

def merge(self, right: 'DataFrame', how: str = 'inner', on: str = None,
suffixes: Tuple[str, str] = ('_x', '_y')) -> 'DataFrame':
"""
Perform a database (SQL) merge operation between two DataFrame objects.

Parameters
----------
right: the DataFrame for the right side of the join operation
how: a join method out of ['inner', 'left', 'right', 'full', 'outer']
on: the column name to be joined on. Defaults to the index if not specified
suffixes: suffix to apply to overlapping column names in the left and right side,
respectively

Returns
-------
DataFrame
The joined DataFrame

Examples
--------
>>> left_kdf = koalas.DataFrame({'A': [1, 2]})
>>> right_kdf = koalas.DataFrame({'B': ['x', 'y']}, index=[1, 2])

>>> left_kdf.merge(right_kdf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case what is the join key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is outdated. Now it should be

left_kdf.merge(right_kdf, left_index=True, right_index=True)

to join on the indices.

A B
0 2 x

>>> left_kdf.merge(right_kdf, how='left')
A B
0 1 None
1 2 x

>>> left_kdf.merge(right_kdf, how='right')
A B
0 2.0 x
1 NaN y

>>> left_kdf.merge(right_kdf, how='outer')
A B
0 1.0 None
1 2.0 x
2 NaN y

Notes
-----
As described in #263, joining string columns currently returns None for missing values
instead of NaN.
"""
if how == 'full':
print("Warning: While Koalas will accept 'full', you should use 'outer' instead to",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use warnings package like warnings.warn("...", UserWarning)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've changed it to warnings.warn(...) with 4e7c429.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need one more space at the end of the string: instead to "

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When comma-separating strings in a print statement, Python will automatically add spaces in between. However, since 4e7c429 moves from print() to warnings.warn(), this has been taken care of.

"be compatible with the pandas merge API")
pass
if how == 'outer':
# 'outer' in pandas equals 'full' in Spark
how = 'full'
if how not in ('inner', 'left', 'right', 'full'):
raise ValueError("The 'how' parameter has to be amongst the following values: ",
"['inner', 'left', 'right', 'full', 'outer']")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should include 'full' in the message since it is optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I've removed the full option with 0ad39b8.


if on is None:
# FIXME Move index string to constant?
on = '__index_level_0__'

left_table = self._sdf.alias('left_table')
right_table = right._sdf.alias('right_table')

# Unpack suffixes tuple for convenience
left_suffix = suffixes[0]
right_suffix = suffixes[1]

# Append suffixes to columns with the same name to avoid conflicts later
duplicate_columns = list(self.columns & right.columns)
if duplicate_columns:
for duplicate_column_name in duplicate_columns:
left_table = left_table.withColumnRenamed(duplicate_column_name,
duplicate_column_name + left_suffix)
right_table = right_table.withColumnRenamed(duplicate_column_name,
duplicate_column_name + right_suffix)

join_condition = (left_table[on] == right_table[on] if on not in duplicate_columns
else left_table[on + left_suffix] == right_table[on + right_suffix])
joined_table = left_table.join(right_table, join_condition, how=how)

if on in duplicate_columns:
# Merge duplicate key columns
joined_table = joined_table.withColumnRenamed(on + left_suffix, on)
joined_table = joined_table.drop(on + right_suffix)

# Remove auxiliary index
# FIXME Move index string to constant?
joined_table = joined_table.drop('__index_level_0__')

kdf = DataFrame(joined_table)
return kdf

def _pd_getitem(self, key):
from databricks.koalas.series import Series
if key is None:
Expand Down
38 changes: 38 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,41 @@ def test_isin(self):
msg = "Values should be iterable, Series, DataFrame or dict."
with self.assertRaisesRegex(TypeError, msg):
kdf.isin(1)

def test_merge(self):
left_kdf = koalas.DataFrame({'A': [1, 2]})
right_kdf = koalas.DataFrame({'B': ['x', 'y']}, index=[1, 2])

msg = ("The 'how' parameter has to be amongst the following values: ['inner', 'left', " +
"'right', 'full', 'outer']")
with self.assertRaises(ValueError, msg=msg):
left_kdf.merge(right_kdf, how='foo')

# Assert inner join
res = left_kdf.merge(right_kdf)
self.assert_eq(res, pd.DataFrame({'A': [2], 'B': ['x']}))
Copy link
Collaborator

@ueshin ueshin May 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this behavior following pandas'?
I might miss something, but I got an error from pandas:

>>> import pandas as pd
>>> pd.__version__
'0.24.2'
>>> left_pdf = pd.DataFrame({'A': [1, 2]})
>>> right_pdf = pd.DataFrame({'B': ['x', 'y']}, index=[1, 2])
>>> left_pdf.merge(right_pdf)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../pandas/core/frame.py", line 6868, in merge
    copy=copy, indicator=indicator, validate=validate)
  File "/.../pandas/core/reshape/merge.py", line 47, in merge
    validate=validate)
  File "/.../pandas/core/reshape/merge.py", line 524, in __init__
    self._validate_specification()
  File "/.../pandas/core/reshape/merge.py", line 1033, in _validate_specification
    lidx=self.left_index, ridx=self.right_index))
pandas.errors.MergeError: No common columns to perform merge on. Merge options: left_on=None, right_on=None, left_index=False, right_index=False

Btw, I think basically we should use exactly the same pandas code as the counterpart of the assert_eq().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Without specifying any columns to join on, the current implementation defaults to the pandas equivalent of left_pdf.merge(right_pdf, left_index=True, right_index=True). Would you prefer the addition of those parameters and the respective MergeError to follow the behavior of pandas?

Regarding the tests, I totally agree with you. But because of #263, it currently fails as some data types mismatch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to add the parameters here, but at least we should respect the default value of pandas, e.g., assuming left_index=False, and right_index=False.

As for the MergeError, hmm, interesting.
I remember I added SparkPandasIndexingError for pandas IndexingError before. Maybe we can add SparkPandasMergeError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now added the SparkPandasMergeError with abfe0c9.


# Assert inner join on non-default column
left_kdf_with_id = koalas.DataFrame({'A': [1, 2], 'id': [0, 1]})
right_kdf_with_id = koalas.DataFrame({'B': ['x', 'y'], 'id': [0, 1]}, index=[1, 2])
res = left_kdf_with_id.merge(right_kdf_with_id, on='id')
self.assert_eq(res, pd.DataFrame({'A': [1, 2], 'id': [0, 1], 'B': ['x', 'y']}))

# Assert left join
res = left_kdf.merge(right_kdf, how='left')
# FIXME Replace None with np.nan once #263 is solved
self.assert_eq(res, pd.DataFrame({'A': [1, 2], 'B': [None, 'x']}))

# Assert right join
res = left_kdf.merge(right_kdf, how='right')
self.assert_eq(res, pd.DataFrame({'A': [2, np.nan], 'B': ['x', 'y']}))

# Assert full outer join
res = left_kdf.merge(right_kdf, how='outer')
# FIXME Replace None with np.nan once #263 is solved
self.assert_eq(res, pd.DataFrame({'A': [1, 2, np.nan], 'B': [None, 'x', 'y']}))

# Assert full outer join also works with 'full' keyword
res = left_kdf.merge(right_kdf, how='full')
# FIXME Replace None with np.nan once #263 is solved
self.assert_eq(res, pd.DataFrame({'A': [1, 2, np.nan], 'B': [None, 'x', 'y']}))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to use suffixes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with 1407922.