Skip to content

Replace bigSizedJoin with SubPartitionHashJoin in SizedHashJoin to avoid CudfColumnSizeOverflow #12734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: branch-25.08
Choose a base branch
from

Conversation

thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented May 15, 2025

Closes #12353

I think it's in an early stage. At least need more tests.

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven requested a review from binmahone May 15, 2025 10:02
private def realTargetBatchSize(): Long = {
val configValue = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)
// The 10k is mostly for tests, hopefully no one is setting anything that low in production.
Math.max(configValue, 10 * 1024)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what we must set this?

@binmahone
Copy link
Collaborator

need tests on 1. NDS 2. customer queries (we can selectively pick 20 queries whose per task build&stream side sizes are both big)

@thirtiseven thirtiseven requested a review from Copilot May 19, 2025 08:54
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR replaces the big-sized join implementation with a sub-partition hash join variant to mitigate potential overflow issues with large build-side batches.

  • Introduces mixins for GpuHashJoin and GpuSubPartitionHashJoin
  • Adds a new method, realTargetBatchSize, to enforce a minimum GPU batch size
  • Removes the legacy BigSizedJoinIterator and updates join execution to use the new sub-partitioning approach
Comments suppressed due to low confidence (1)

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala:409

  • Ensure that unit tests validate the behavior of realTargetBatchSize, especially for configuration values below 10 * 1024, to confirm that the enforced lower limit is working as expected.
private def realTargetBatchSize(): Long = {

@thirtiseven
Copy link
Collaborator Author

thirtiseven commented May 27, 2025

We are still running tests on the customer side to see what happens when we really hit the bigSizedJoin. It will probably take a few more days to get the results back.

just ran nds on two A100 and 3k data for 6 times and got an 8.61% average gain.

@abellina could you please review the code if you have time, since the 25.06 release is approaching? thanks

Update: Note that both the NDS and customer runs are against #12354.

@binmahone
Copy link
Collaborator

this PR should also close #12387, so may need to revert #12372 once this PR is merged

@sameerz sameerz added the bug Something isn't working label Jun 2, 2025
@thirtiseven thirtiseven changed the base branch from branch-25.06 to branch-25.08 June 4, 2025 01:49
@thirtiseven
Copy link
Collaborator Author

thirtiseven commented Jun 12, 2025

Got some results on #12354 vs this pr:

In the following query, we can see that the subpartition hash join (this pr, on the right) is significantly slower than the bigSizedJoin (Mahone's 12354, on the left).

Screenshot 2025-06-12 at 12 00 01.

Also, the size of subpartition hash join‘s spill is twice as big as the bigSizedJoin's in this query. This seems unnatural because 12354 will read all following batches to a spillable queue, which should have more spill.

Mahone pointed out that it could possibly be because sub-partition hash join uses small batches, which uses less GPU memory. And then dynamic concurrentGpuTasks #12374 takes effects. So more concurrent gpu tasks caused larger spill size.

Setting set spark.rapids.sql.concurrentGpuTasks.dynamic=falsecauses the subpartition hash join to use 90% less memory than the bigSizedJoin, but it is still slow:

Screenshot 2025-06-12 at 12 01 05

In another query, we can see some following nodes of the subpartition hash join also got slower. This seems because the batches size got smaller.

Screenshot 2025-06-12 at 12 09 36

So I think we can switch back to pr 12354's approach for now.

What do you think? @binmahone @abellina , thanks!

@binmahone
Copy link
Collaborator

@abellina looks like it's worthwhile to pay for the price of risking spilling, do you think we should turn back to #12354 ? Let me know if you need a face to face discussion on this.

@abellina
Copy link
Collaborator

@abellina looks like it's worthwhile to pay for the price of risking spilling, do you think we should turn back to #12354 ? Let me know if you need a face to face discussion on this.

Looking at #12734 (comment) one thing I see is the row counts are different between the two joins. Is that a metric issue in this draft? (potentially -> given the second graph shows a project with the right row count).

It feels we need to get an idea about why the perf difference. I see likely causes in the comments, but no definitive "this is the reason for the slowness". In other words, should we be improving sub partitioning instead of moving away from it?

@binmahone
Copy link
Collaborator

@abellina looks like it's worthwhile to pay for the price of risking spilling, do you think we should turn back to #12354 ? Let me know if you need a face to face discussion on this.

Looking at #12734 (comment) one thing I see is the row counts are different between the two joins. Is that a metric issue in this draft? (potentially -> given the second graph shows a project with the right row count).

It feels we need to get an idea about why the perf difference. I see likely causes in the comments, but no definitive "this is the reason for the slowness". In other words, should we be improving sub partitioning instead of moving away from it?

hi @abellina , we'll investigate on the row count diff in the first query (I checked the second query and see the row count is same, but didn't notice the diff for the first query). We should definitely align the row counts before any meaningful analysis.

For the second query, to be honest, we haven't conducted an in-depth analysis yet, because intuitively, the new approach starts by splitting the data into 16 parts for separate processing, which comes with inherent overhead. So, it's not surprising if it's slower (at least in some cases). However, if you're still keen on understanding the new approach better, we will dive deeper into its NSYS and flame graph.

But taking one step back, is the original solution (#12354) that bad? I does risk more spills, but it will also come up with better number of subpartitions, right?

@binmahone
Copy link
Collaborator

@abellina just double confirmed with Haoyang, the row number should be a metrics problem. He ran the two runs at the same time, so the input data should be identical

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

Looking at #12734 (comment) one thing I see is the row counts are different between the two joins. Is that a metric issue in this draft? (potentially -> given the second graph shows a project with the right row count).

Yes it's a metric bug, fixed in c098407. The test data should be the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] AsymmetricJoinSizer passes wrong buildSize to JoinInfo
4 participants