-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add GPU batch concatenation for small cuDF batches #16201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,95 @@ | ||||||||||||||||
| #include "velox/experimental/cudf/CudfConfig.h" | ||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file needs a license |
||||||||||||||||
| #include "velox/experimental/cudf/exec/CudfBatchConcat.h" | ||||||||||||||||
| #include "velox/experimental/cudf/exec/Utilities.h" | ||||||||||||||||
|
|
||||||||||||||||
| namespace facebook::velox::cudf_velox { | ||||||||||||||||
|
|
||||||||||||||||
| std::unique_ptr<exec::Operator> CudfBatchConcatTranslator::toOperator( | ||||||||||||||||
| exec::DriverCtx* ctx, | ||||||||||||||||
| int32_t id, | ||||||||||||||||
| const core::PlanNodePtr& node) { | ||||||||||||||||
| if (auto batchConcatNode = | ||||||||||||||||
| std::dynamic_pointer_cast<const core::CudfBatchConcatNode>(node)) { | ||||||||||||||||
| return std::make_unique<CudfBatchConcat>(id, ctx, batchConcatNode); | ||||||||||||||||
| } | ||||||||||||||||
| return nullptr; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| CudfBatchConcat::CudfBatchConcat( | ||||||||||||||||
| int32_t operatorId, | ||||||||||||||||
| exec::DriverCtx* driverCtx, | ||||||||||||||||
| std::shared_ptr<const core::PlanNode> planNode) | ||||||||||||||||
| : exec::Operator( | ||||||||||||||||
| driverCtx, | ||||||||||||||||
| planNode->outputType(), | ||||||||||||||||
| operatorId, | ||||||||||||||||
| planNode->id(), | ||||||||||||||||
| "CudfBatchConcat"), | ||||||||||||||||
| CudfOperator(operatorId, planNode->id()), | ||||||||||||||||
| driverCtx_(driverCtx), | ||||||||||||||||
| targetRows_(CudfConfig::getInstance().batchSizeMinThreshold) {} | ||||||||||||||||
|
|
||||||||||||||||
| void CudfBatchConcat::addInput(RowVectorPtr input) { | ||||||||||||||||
| auto cudfVector = std::dynamic_pointer_cast<CudfVector>(input); | ||||||||||||||||
| VELOX_CHECK_NOT_NULL(cudfVector, "CudfBatchConcat expects CudfVector input"); | ||||||||||||||||
|
|
||||||||||||||||
| // Push input cudf table to buffer | ||||||||||||||||
| currentNumRows_ += cudfVector->getTableView().num_rows(); | ||||||||||||||||
| buffer_.push_back(std::move(cudfVector)); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| RowVectorPtr CudfBatchConcat::getOutput() { | ||||||||||||||||
| // Drain the queue if there is any output to be flushed | ||||||||||||||||
| if (!outputQueue_.empty()) { | ||||||||||||||||
|
Comment on lines
+41
to
+43
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Recommend annotating this with an nvtx range.
Suggested change
|
||||||||||||||||
| auto table = std::move(outputQueue_.front()); | ||||||||||||||||
| auto rowCount = table->num_rows(); | ||||||||||||||||
| outputQueue_.pop(); | ||||||||||||||||
| auto stream = cudfGlobalStreamPool().get_stream(); | ||||||||||||||||
| return std::make_shared<CudfVector>( | ||||||||||||||||
| pool(), outputType_, rowCount, std::move(table), stream); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Merge tables if there are enough rows | ||||||||||||||||
| if (currentNumRows_ >= targetRows_ || (noMoreInput_ && !buffer_.empty())) { | ||||||||||||||||
| // Use stream from existing buffer vectors | ||||||||||||||||
| auto stream = buffer_[0]->stream(); | ||||||||||||||||
| auto tables = getConcatenatedTableBatched(buffer_, outputType_, stream); | ||||||||||||||||
|
|
||||||||||||||||
| buffer_.clear(); | ||||||||||||||||
| currentNumRows_ = 0; | ||||||||||||||||
|
|
||||||||||||||||
| for (size_t i = 0; i < tables.size(); ++i) { | ||||||||||||||||
| bool isLast = (i == tables.size() - 1); | ||||||||||||||||
|
Comment on lines
+61
to
+62
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be an interator based loop. currently i is used only for last, which can also be done with an iterator. |
||||||||||||||||
| auto rowCount = tables[i]->num_rows(); | ||||||||||||||||
|
|
||||||||||||||||
| // Do not push the last batch into the queue if it is smaller than | ||||||||||||||||
| // targetRows_ But push it if it is the final batch | ||||||||||||||||
| if (isLast && !noMoreInput_ && rowCount < targetRows_) { | ||||||||||||||||
| currentNumRows_ = rowCount; | ||||||||||||||||
| buffer_.push_back( | ||||||||||||||||
| std::make_shared<CudfVector>( | ||||||||||||||||
| pool(), outputType_, rowCount, std::move(tables[i]), stream)); | ||||||||||||||||
| } else { | ||||||||||||||||
|
Comment on lines
+65
to
+72
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can go outside the loop. The instead of the loop this becomes two statements
|
||||||||||||||||
| outputQueue_.push(std::move(tables[i])); | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Return the first batch from the new queue | ||||||||||||||||
| if (!outputQueue_.empty()) { | ||||||||||||||||
| auto table = std::move(outputQueue_.front()); | ||||||||||||||||
| stream = cudfGlobalStreamPool().get_stream(); | ||||||||||||||||
| auto rowCount = table->num_rows(); | ||||||||||||||||
| outputQueue_.pop(); | ||||||||||||||||
| return std::make_shared<CudfVector>( | ||||||||||||||||
| pool(), outputType_, rowCount, std::move(table), stream); | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| return nullptr; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| bool CudfBatchConcat::isFinished() { | ||||||||||||||||
| return noMoreInput_ && buffer_.empty() && outputQueue_.empty(); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| } // namespace facebook::velox::cudf_velox | ||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| #pragma once | ||
|
|
||
| #include "velox/experimental/cudf/exec/CudfOperator.h" | ||
| #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" | ||
| #include "velox/experimental/cudf/vector/CudfVector.h" | ||
|
|
||
| #include "velox/exec/Operator.h" | ||
|
|
||
| #include <queue> | ||
|
|
||
| namespace facebook::velox::cudf_velox { | ||
|
|
||
| class CudfBatchConcat : public exec::Operator, public CudfOperator { | ||
| public: | ||
| CudfBatchConcat( | ||
| int32_t operatorId, | ||
| exec::DriverCtx* driverCtx, | ||
| std::shared_ptr<const core::PlanNode> planNode); | ||
|
|
||
| bool needsInput() const override { | ||
| return !noMoreInput_ && outputQueue_.empty() && | ||
| currentNumRows_ < targetRows_; | ||
| } | ||
|
|
||
| void addInput(RowVectorPtr input) override; | ||
|
|
||
| RowVectorPtr getOutput() override; | ||
|
|
||
| void noMoreInput() override { | ||
| noMoreInput_ = true; | ||
| } | ||
|
|
||
| exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override { | ||
| return exec::BlockingReason::kNotBlocked; | ||
| } | ||
|
|
||
| bool isFinished() override; | ||
|
|
||
| private: | ||
| exec::DriverCtx* const driverCtx_; | ||
| std::vector<CudfVectorPtr> buffer_; | ||
| std::queue<std::unique_ptr<cudf::table>> outputQueue_; | ||
| size_t currentNumRows_{0}; | ||
| const size_t targetRows_{0}; | ||
| bool noMoreInput_{false}; | ||
| }; | ||
|
|
||
| class CudfBatchConcatTranslator : public exec::Operator::PlanNodeTranslator { | ||
| public: | ||
| std::unique_ptr<exec::Operator> | ||
| toOperator(exec::DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node); | ||
| }; | ||
|
|
||
| } // namespace facebook::velox::cudf_velox |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" | ||
| #include "velox/experimental/cudf/connectors/hive/CudfHiveDataSource.h" | ||
| #include "velox/experimental/cudf/exec/CudfAssignUniqueId.h" | ||
| #include "velox/experimental/cudf/exec/CudfBatchConcat.h" | ||
| #include "velox/experimental/cudf/exec/CudfConversion.h" | ||
| #include "velox/experimental/cudf/exec/CudfFilterProject.h" | ||
| #include "velox/experimental/cudf/exec/CudfHashAggregation.h" | ||
|
|
@@ -217,6 +218,35 @@ bool CompileState::compile(bool allowCpuFallback) { | |
| operators.end(), | ||
| isSupportedGpuOperators.begin(), | ||
| isSupportedGpuOperator); | ||
|
|
||
| // Operators which break batch continuity and need concat before next operator | ||
| auto breaksBatchContinuity = [](const exec::Operator* op) { | ||
| return isAnyOf< | ||
| exec::LocalPartition, | ||
| exec::HashProbe, | ||
| exec::HashBuild, | ||
| exec::HashAggregation, | ||
| exec::StreamingAggregation, | ||
| exec::Limit, | ||
| exec::OrderBy, | ||
| exec::TopN, | ||
| CudfOperator>(op); | ||
| }; | ||
|
|
||
| // Operators which are stateless and thus concat can safely be inserted before | ||
| auto isStatelessOperator = [](const exec::Operator* op) { | ||
| return isAnyOf<exec::FilterProject>(op); | ||
| }; | ||
|
|
||
| // Determines if concat is needed before an operator | ||
| auto needsConcat = [breaksBatchContinuity, isStatelessOperator]( | ||
| const exec::Operator* currentOp, | ||
| const exec::Operator* nextOp) { | ||
| // Add concat if current breaks batch and next is stateless | ||
| return currentOp != nullptr && nextOp != nullptr && | ||
| breaksBatchContinuity(currentOp) && isStatelessOperator(nextOp); | ||
| }; | ||
|
Comment on lines
+223
to
+248
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems too limiting. IIUC, with this, concat can only appear before |
||
|
|
||
| auto acceptsGpuInput = [isFilterProjectSupported, | ||
| isJoinSupported, | ||
| isAggregationSupported](const exec::Operator* op) { | ||
|
|
@@ -373,6 +403,17 @@ bool CompileState::compile(bool allowCpuFallback) { | |
| keepOperator = 1; | ||
| } | ||
|
|
||
| // Check if batch concat needs to be added before the next operator | ||
| exec::Operator* nextOper = nullptr; | ||
| if (operatorIndex < operators.size() - 1) { | ||
| nextOper = operators[operatorIndex + 1]; | ||
| } | ||
| if (needsConcat(oper, nextOper) && !nextOperatorIsNotGpu) { | ||
| auto planNode = getPlanNode(oper->planNodeId()); | ||
| replaceOp.push_back( | ||
| std::make_unique<CudfBatchConcat>(oper->operatorId(), ctx, planNode)); | ||
| } | ||
|
|
||
| if (producesGpuOutput(oper) and | ||
| (nextOperatorIsNotGpu or isLastOperatorOfTask)) { | ||
| auto planNode = getPlanNode(oper->planNodeId()); | ||
|
|
@@ -414,6 +455,7 @@ bool CompileState::compile(bool allowCpuFallback) { | |
| auto condition = (GpuReplacedOperator(oper) && !replaceOp.empty() && | ||
| keepOperator == 0) || | ||
| (GpuRetainedOperator(oper) && replaceOp.empty() && keepOperator == 1); | ||
|
|
||
| if (CudfConfig::getInstance().debugEnabled) { | ||
| LOG(INFO) << "GpuReplacedOperator = " << GpuReplacedOperator(oper) | ||
| << ", GpuRetainedOperator = " << GpuRetainedOperator(oper) | ||
|
|
@@ -500,6 +542,8 @@ void registerCudf() { | |
|
|
||
| exec::Operator::registerOperator( | ||
| std::make_unique<CudfHashJoinBridgeTranslator>()); | ||
| exec::Operator::registerOperator( | ||
| std::make_unique<CudfBatchConcatTranslator>()); | ||
| CudfDriverAdapter cda{CudfConfig::getInstance().allowCpuFallback}; | ||
| exec::DriverAdapter cudfAdapter{kCudfAdapterName, {}, cda}; | ||
| exec::DriverFactory::registerAdapter(cudfAdapter); | ||
|
|
@@ -544,6 +588,14 @@ void CudfConfig::initialize( | |
| if (config.find(kCudfMemoryPercent) != config.end()) { | ||
| memoryPercent = folly::to<int32_t>(config[kCudfMemoryPercent]); | ||
| } | ||
| if (config.find(kCudfBatchSizeMinThreshold) != config.end()) { | ||
| batchSizeMinThreshold = | ||
| folly::to<int32_t>(config[kCudfBatchSizeMinThreshold]); | ||
| } | ||
| if (config.find(kCudfBatchSizeMaxThreshold) != config.end()) { | ||
| batchSizeMaxThreshold = | ||
| folly::to<int32_t>(config[kCudfBatchSizeMaxThreshold]); | ||
| } | ||
| if (config.find(kCudfFunctionNamePrefix) != config.end()) { | ||
| functionNamePrefix = config[kCudfFunctionNamePrefix]; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer keeping core velox untouched and utilize its extensibility to achieve cudf support.
Since we're relying on driver adapter, we don't need a separate plan node anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you can, move this to a source inside experimental/cudf