Skip to content

Conversation

@LucasG0
Copy link
Contributor

@LucasG0 LucasG0 commented Mar 20, 2020

Manage broadcast hint for dataframes.
Provides broadcast function which, from a given koalas DataFrame, returns a new one with broadcast hint.
Broadcast join can now be performed in DataFrame.join, DataFrame.merge and DataFrame.update.
A broadcast join may be more efficient than sort merge join (Spark default) between a small dataframe and a big daframe. It gives every node a copy of a the small dataframe, which reduces the number of shuffle between partitions. By default, Spark performs it if a dataframe is smaller than ~10MB, but the user should be able to force it.

@codecov-io
Copy link

codecov-io commented Mar 20, 2020

Codecov Report

Merging #1360 into master will increase coverage by 0.02%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1360      +/-   ##
==========================================
+ Coverage   95.23%   95.26%   +0.02%     
==========================================
  Files          34       34              
  Lines        7742     7785      +43     
==========================================
+ Hits         7373     7416      +43     
  Misses        369      369              
Impacted Files Coverage Δ
databricks/koalas/frame.py 96.82% <ø> (+0.05%) ⬆️
databricks/koalas/namespace.py 88.79% <100.00%> (+0.12%) ⬆️
databricks/koalas/indexing.py 94.56% <0.00%> (+0.01%) ⬆️
databricks/koalas/internal.py 96.05% <0.00%> (+0.01%) ⬆️
databricks/koalas/generic.py 97.54% <0.00%> (+0.40%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d4012b6...789bd83. Read the comment docs.

@ueshin
Copy link
Collaborator

ueshin commented Mar 20, 2020

@LucasG0 Thanks for your first contribution!
I really like the idea to have a way to indicate broadcast join.

I guess we should have a discussion about how we should have the way.

  1. Adding an argument to indicate broadcast join (the current way here)
    • Pros:
      • Users can easily know which function supports broadcast join by its signature.
    • Cons:
      • We usually hesitate to add extra arguments without a special reason.
      • We might need another way to indicate left side should be broadcasted.
        • (we can make broadcast take both left and right, or something, though.)
  2. Adding broadcast method to DataFrame.
    • Pros:
      • We don't need to manage each function one-by-one.
      • Users can easily apply both left and right sides.
    • Cons:
      • Users might not know which function will be affected.
      • We usually hesitate to add extra functions without a special reason.
  3. Adding broadcast function separately as the same as PySpark's functions.py.
    • Pros:
      • We don't need to manage each function one-by-one.
      • Users can easily apply both left and right sides.
    • Cons:
      • Users might not know which function will be affected.

cc @HyukjinKwon @itholic

@itholic
Copy link
Contributor

itholic commented Mar 21, 2020

@ueshin , Thanks for arranging the discussions! Let me take a look at this tomorrow.

@LucasG0 , I really welcome to your first contribution to Koalas. Thanks very much :D

@LucasG0
Copy link
Contributor Author

LucasG0 commented Mar 21, 2020

Thanks for your answers @ueshin @itholic ! :)

First, I would like to notice that only one side needs to be broadcasted during a brodcast join.
About the different ways to implement it :

  1. Adding an argument to indicate broadcast join (the current way here).

    It seems to be the only way to use the existing update method with a broadcast join.
    However, I agree that managing functions one par one may be a significant issue.

  2. Adding broadcast method to DataFrame.

    I think it is a good way. We should choose between small_df.broadcast(large_df) or
    large_df.broadcast(small_df). The first way seems more intuitive, as the small DataFrame
    is the one to be broadcasted.
    We may call it broadcast_join, to explicit that broadcast is used for joining purpose. It could
    avoid confusion with broadcast variables in Spark, which are not necessarily used to join.

  3. Adding broadcast function separately as the same as PySpark's functions.py.

    As for the 2nd way, it could be broadcast_join(small_df,large_df).
    However, I find the 2nd way fits more, as we want this functionality on DataFrame. Moreover, it
    seems less explicit for user on which DataFrame should be broadcasted.

So I think the second way is the more interesting. :)

@ueshin
Copy link
Collaborator

ueshin commented Mar 21, 2020

Sorry, I have to describe more about 2) and 3).

I meant broadcast method or function should return Koalas DataFrame containing Spark DataFrame with broadcast hint.

So the usage of 2) and 3) in my mind were like:

left_df.merge(right_df.broadcast(), ...)

or

left_df.merge(F.broadcast(right_df), ...)

instead of adding broadcast_join.

Then we can reuse the current implementation without any additional work in each join-like function and Spark will handle the broadcast hint properly.

@LucasG0
Copy link
Contributor Author

LucasG0 commented Mar 22, 2020

Alright, these ways are indeed better !
We could use 2) as it targets specifically DataFrame or 3) to avoid adding extra methods in DataFrame.

@itholic
Copy link
Contributor

itholic commented Mar 23, 2020

left_df.merge(right_df.broadcast(), ...)

I also think this way above seems good for now.

(But in the future, It would be better to use something like AQE to make it work automatically ??)

@HyukjinKwon
Copy link
Member

We could even do both 2) and 3) like DataFrame.merge, koalas.merge, DataFrame.melt and koalas.melt.

One nit on 2) is though, DataFrame.broadcast isn't friendly to users who come from PySpark compared to koalas.broadcast. But I don't really have a strong preference. Let me leave it to @ueshin.

@ueshin
Copy link
Collaborator

ueshin commented Mar 23, 2020

Then shall we take 3) ?
@LucasG0 Could you update this PR as we discussed? Thanks!

@itholic
Copy link
Contributor

itholic commented Mar 24, 2020

Then shall we take 3) ?

Yes I agree. I feel Hyukjin's latest comment makes sense.

@LucasG0
Copy link
Contributor Author

LucasG0 commented Mar 24, 2020

Let's go for 3) then !
It seems that there is no way to test that a PySpark DataFrame has broadcast hint.
It is possible to test if a broadcast join occurred, by using the test of PySpark itself
https://github.com/apache/spark/pull/8801/files#diff-7c2fe8530271c0635fb99f7b49e0c4a4R1086.
However, I wonder if it is relevant to reproduce this test in Koalas tests, so for now I just tested equality between koalas DataFrame and the one with broadcast hint.

Copy link
Collaborator

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a line for a doc in docs/source/reference/general_functions.rst? Around line 28 should be good.
Also, could you add See Also link in the doc for each function?


if not isinstance(obj, DataFrame):
raise ValueError("Invalid type : expected DataFrame got {}".format(type(obj)))
return ks.DataFrame(data=spark.functions.broadcast(obj._internal.to_external_spark_frame))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use with_new_sdf:

return DataFrame(obj._interval.with_new_sdf(F.broadcast(obj._sdf)))

return pd.to_numeric(arg)


#
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to add a comment here? Otherwise, please remove the unrelated line.

... columns=['rkey', 'value'])
>>> merged = df1.merge(ks.broadcast(df2), left_on='lkey', right_on='rkey')
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we show the result? Or how about explain?

>>> merged.explain()  # doctest: +ELLIPSIS
== Physical Plan ==
...
...Broadcast...
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged.explain() seems to have a different behavior depending on runtime environment.
That is why I did not detailed Physical Plan and removed # doctest : +ELLIPSIS.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need ELLIPSIS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was not familiar with this notation.
I added it back.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I was just curious why it works without # doctest: +ELLIPSIS.

raise ValueError(
"columns overlap but no suffix specified: " "{rename}".format(rename=common)
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revert unrelated changes?

>>> df1 = ks.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
... 'value': [1, 2, 3, 5]},
... columns=['lkey', 'value'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove an extra line.

>>> df2 = ks.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
... 'value': [5, 6, 7, 8]},
... columns=['rkey', 'value'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.


#
def broadcast(obj):
""" Marks a DataFrame as small enough for use in broadcast joins.
Copy link
Contributor

@itholic itholic Mar 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you move this docstring to next line like other methods?

like:

def broadcast(obj):
    """
    Marks a DataFrame as small enough for use in broadcast joins
    ...
    """

@LucasG0 LucasG0 force-pushed the master branch 2 times, most recently from ba64e4d to e8318cb Compare March 26, 2020 12:39
See Also
--------
DataFrame.merge : Merge DataFrame objects with a database-style join.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add DataFrame.join and DataFrame.update as well?
Also, could you add a link to DataFrame.broadcast from the docs for each function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add them.
Do you mean ks.broadcast in "See Also" bloc of these methods ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. sorry, ks.broadcast is right, and maybe only broadcast should work in the docstring.

See Also
--------
broadcast : ...

Copy link
Contributor Author

@LucasG0 LucasG0 Mar 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In deed, ks.broadcast does not work in the docstring.

... columns=['rkey', 'value'])
>>> merged = df1.merge(ks.broadcast(df2), left_on='lkey', right_on='rkey')
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I was just curious why it works without # doctest: +ELLIPSIS.

Copy link
Collaborator

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, LGTM.

@ueshin
Copy link
Collaborator

ueshin commented Mar 27, 2020

LGTM.
@LucasG0 Could you update the PR description?

@itholic
Copy link
Contributor

itholic commented Mar 27, 2020

LGTM, too. 👍

@LucasG0 LucasG0 changed the title Add broadcast join for dataframes. Manage broadcast hint for dataframes. Mar 27, 2020
@ueshin
Copy link
Collaborator

ueshin commented Mar 27, 2020

Thanks! merging.

@ueshin ueshin merged commit 0366308 into databricks:master Mar 27, 2020
@LucasG0
Copy link
Contributor Author

LucasG0 commented Mar 29, 2020

Nice, thanks !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants