-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add GPU batch concatenation for small cuDF batches #16201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Hi @hikey-dj! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
✅ Deploy Preview for meta-velox canceled.
|
|
Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks! |
devavret
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a stab at this.
| using TopNRowNumberNodePtr = std::shared_ptr<const TopNRowNumberNode>; | ||
|
|
||
| // A basic plan node for the GPU batch concatenation operation. | ||
| class CudfBatchConcatNode : public core::PlanNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer keeping core velox untouched and utilize its extensibility to achieve cudf support.
Since we're relying on driver adapter, we don't need a separate plan node anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you can, move this to a source inside experimental/cudf
| @@ -0,0 +1,95 @@ | |||
| #include "velox/experimental/cudf/CudfConfig.h" | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file needs a license
| RowVectorPtr CudfBatchConcat::getOutput() { | ||
| // Drain the queue if there is any output to be flushed | ||
| if (!outputQueue_.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend annotating this with an nvtx range.
| RowVectorPtr CudfBatchConcat::getOutput() { | |
| // Drain the queue if there is any output to be flushed | |
| if (!outputQueue_.empty()) { | |
| RowVectorPtr CudfBatchConcat::getOutput() { | |
| VELOX_NVTX_OPERATOR_FUNC_RANGE(); | |
| // Drain the queue if there is any output to be flushed | |
| if (!outputQueue_.empty()) { |
| for (size_t i = 0; i < tables.size(); ++i) { | ||
| bool isLast = (i == tables.size() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be an interator based loop. currently i is used only for last, which can also be done with an iterator.
| // Do not push the last batch into the queue if it is smaller than | ||
| // targetRows_ But push it if it is the final batch | ||
| if (isLast && !noMoreInput_ && rowCount < targetRows_) { | ||
| currentNumRows_ = rowCount; | ||
| buffer_.push_back( | ||
| std::make_shared<CudfVector>( | ||
| pool(), outputType_, rowCount, std::move(tables[i]), stream)); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can go outside the loop. The instead of the loop this becomes two statements
- Move begin to end - 1 into outputQueue_ with some stl algo
- Check once at the end where the last one should go
| auto breaksBatchContinuity = [](const exec::Operator* op) { | ||
| return isAnyOf< | ||
| exec::LocalPartition, | ||
| exec::HashProbe, | ||
| exec::HashBuild, | ||
| exec::HashAggregation, | ||
| exec::StreamingAggregation, | ||
| exec::Limit, | ||
| exec::OrderBy, | ||
| exec::TopN, | ||
| CudfOperator>(op); | ||
| }; | ||
|
|
||
| // Operators which are stateless and thus concat can safely be inserted before | ||
| auto isStatelessOperator = [](const exec::Operator* op) { | ||
| return isAnyOf<exec::FilterProject>(op); | ||
| }; | ||
|
|
||
| // Determines if concat is needed before an operator | ||
| auto needsConcat = [breaksBatchContinuity, isStatelessOperator]( | ||
| const exec::Operator* currentOp, | ||
| const exec::Operator* nextOp) { | ||
| // Add concat if current breaks batch and next is stateless | ||
| return currentOp != nullptr && nextOp != nullptr && | ||
| breaksBatchContinuity(currentOp) && isStatelessOperator(nextOp); | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems too limiting. IIUC, with this, concat can only appear before FilterProject. A HashProbe should also be fairly easily able to take a concatenated batch. Similarly for HashAggregation. The operators that don't make sense having a concat before are the ones that do the concat themselves because they collect all input before emitting output. Like OrderBy and HashBuild
Summary:
Adds a reusable GPU batch concatenation mechanism to coalesce small cuDF batches between GPU operators, improving kernel efficiency. Currently the operator is triggered after functions which may make the batches smaller and before filter projection.
Motivation:
After exchanges and some operators, GPU batches can become very small.
This change inserts a plan-level batch coalescing step in the cuDF adapter
without altering operator semantics.
Addresses #16105