Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -5468,6 +5468,40 @@ class TopNRowNumberNode : public PlanNode {

using TopNRowNumberNodePtr = std::shared_ptr<const TopNRowNumberNode>;

// A basic plan node for the GPU batch concatenation operation.
class CudfBatchConcatNode : public core::PlanNode {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer keeping core velox untouched and utilize its extensibility to achieve cudf support.
Since we're relying on driver adapter, we don't need a separate plan node anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can, move this to a source inside experimental/cudf

public:
CudfBatchConcatNode(const core::PlanNodeId& id, core::PlanNodePtr source)
: PlanNode(id), sources_({std::move(source)}) {}

const RowTypePtr& outputType() const override {
return sources_[0]->outputType();
}

const std::vector<core::PlanNodePtr>& sources() const override {
return sources_;
}

std::string_view name() const override {
return "CudfBatchConcat";
}

void addDetails(std::stringstream& stream) const override {
stream << name();
}

folly::dynamic serialize() const override {
auto obj = PlanNode::serialize();
obj["sources"] = ISerializable::serialize(sources_);
return obj;
}

private:
const std::vector<core::PlanNodePtr> sources_;
};

using CudfBatchConcatNodePtr = std::shared_ptr<const CudfBatchConcatNode>;

class PlanNodeVisitorContext {
public:
virtual ~PlanNodeVisitorContext() = default;
Expand Down
13 changes: 13 additions & 0 deletions velox/experimental/cudf/CudfConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <optional>
#include <string>
#include <unordered_map>

Expand All @@ -35,6 +36,10 @@ struct CudfConfig {
"cudf.ast_expression_priority"};
static constexpr const char* kCudfAllowCpuFallback{"cudf.allow_cpu_fallback"};
static constexpr const char* kCudfLogFallback{"cudf.log_fallback"};
static constexpr const char* kCudfBatchSizeMinThreshold{
"cudf.batch_size_min_threshold"};
static constexpr const char* kCudfBatchSizeMaxThreshold{
"cudf.batch_size_max_threshold"};

/// Singleton CudfConfig instance.
/// Clients must set the configs below before invoking registerCudf().
Expand Down Expand Up @@ -78,6 +83,14 @@ struct CudfConfig {

/// Whether to log a reason for falling back to Velox CPU execution.
bool logFallback{true};

/// Minimum rows to accumulate before GPU-side concatenation in
/// `CudfBatchConcat` (default 100k).
int32_t batchSizeMinThreshold{100000};

/// Maximum rows allowed in a concatenated batch (user configurable).
/// When not set, cuDF's own `size_type::max()` is used.
std::optional<int32_t> batchSizeMaxThreshold;
};

} // namespace facebook::velox::cudf_velox
1 change: 1 addition & 0 deletions velox/experimental/cudf/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_library(
CudfLimit.cpp
CudfLocalPartition.cpp
CudfOrderBy.cpp
CudfBatchConcat.cpp
CudfTopN.cpp
DebugUtil.cpp
ToCudf.cpp
Expand Down
95 changes: 95 additions & 0 deletions velox/experimental/cudf/exec/CudfBatchConcat.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#include "velox/experimental/cudf/CudfConfig.h"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs a license

#include "velox/experimental/cudf/exec/CudfBatchConcat.h"
#include "velox/experimental/cudf/exec/Utilities.h"

namespace facebook::velox::cudf_velox {

std::unique_ptr<exec::Operator> CudfBatchConcatTranslator::toOperator(
exec::DriverCtx* ctx,
int32_t id,
const core::PlanNodePtr& node) {
if (auto batchConcatNode =
std::dynamic_pointer_cast<const core::CudfBatchConcatNode>(node)) {
return std::make_unique<CudfBatchConcat>(id, ctx, batchConcatNode);
}
return nullptr;
}

CudfBatchConcat::CudfBatchConcat(
int32_t operatorId,
exec::DriverCtx* driverCtx,
std::shared_ptr<const core::PlanNode> planNode)
: exec::Operator(
driverCtx,
planNode->outputType(),
operatorId,
planNode->id(),
"CudfBatchConcat"),
CudfOperator(operatorId, planNode->id()),
driverCtx_(driverCtx),
targetRows_(CudfConfig::getInstance().batchSizeMinThreshold) {}

void CudfBatchConcat::addInput(RowVectorPtr input) {
auto cudfVector = std::dynamic_pointer_cast<CudfVector>(input);
VELOX_CHECK_NOT_NULL(cudfVector, "CudfBatchConcat expects CudfVector input");

// Push input cudf table to buffer
currentNumRows_ += cudfVector->getTableView().num_rows();
buffer_.push_back(std::move(cudfVector));
}

RowVectorPtr CudfBatchConcat::getOutput() {
// Drain the queue if there is any output to be flushed
if (!outputQueue_.empty()) {
Comment on lines +41 to +43
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend annotating this with an nvtx range.

Suggested change
RowVectorPtr CudfBatchConcat::getOutput() {
// Drain the queue if there is any output to be flushed
if (!outputQueue_.empty()) {
RowVectorPtr CudfBatchConcat::getOutput() {
VELOX_NVTX_OPERATOR_FUNC_RANGE();
// Drain the queue if there is any output to be flushed
if (!outputQueue_.empty()) {

auto table = std::move(outputQueue_.front());
auto rowCount = table->num_rows();
outputQueue_.pop();
auto stream = cudfGlobalStreamPool().get_stream();
return std::make_shared<CudfVector>(
pool(), outputType_, rowCount, std::move(table), stream);
}

// Merge tables if there are enough rows
if (currentNumRows_ >= targetRows_ || (noMoreInput_ && !buffer_.empty())) {
// Use stream from existing buffer vectors
auto stream = buffer_[0]->stream();
auto tables = getConcatenatedTableBatched(buffer_, outputType_, stream);

buffer_.clear();
currentNumRows_ = 0;

for (size_t i = 0; i < tables.size(); ++i) {
bool isLast = (i == tables.size() - 1);
Comment on lines +61 to +62
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be an interator based loop. currently i is used only for last, which can also be done with an iterator.

auto rowCount = tables[i]->num_rows();

// Do not push the last batch into the queue if it is smaller than
// targetRows_ But push it if it is the final batch
if (isLast && !noMoreInput_ && rowCount < targetRows_) {
currentNumRows_ = rowCount;
buffer_.push_back(
std::make_shared<CudfVector>(
pool(), outputType_, rowCount, std::move(tables[i]), stream));
} else {
Comment on lines +65 to +72
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can go outside the loop. The instead of the loop this becomes two statements

  1. Move begin to end - 1 into outputQueue_ with some stl algo
  2. Check once at the end where the last one should go

outputQueue_.push(std::move(tables[i]));
}
}

// Return the first batch from the new queue
if (!outputQueue_.empty()) {
auto table = std::move(outputQueue_.front());
stream = cudfGlobalStreamPool().get_stream();
auto rowCount = table->num_rows();
outputQueue_.pop();
return std::make_shared<CudfVector>(
pool(), outputType_, rowCount, std::move(table), stream);
}
}

return nullptr;
}

bool CudfBatchConcat::isFinished() {
return noMoreInput_ && buffer_.empty() && outputQueue_.empty();
}

} // namespace facebook::velox::cudf_velox
54 changes: 54 additions & 0 deletions velox/experimental/cudf/exec/CudfBatchConcat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include "velox/experimental/cudf/exec/CudfOperator.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
#include "velox/experimental/cudf/vector/CudfVector.h"

#include "velox/exec/Operator.h"

#include <queue>

namespace facebook::velox::cudf_velox {

class CudfBatchConcat : public exec::Operator, public CudfOperator {
public:
CudfBatchConcat(
int32_t operatorId,
exec::DriverCtx* driverCtx,
std::shared_ptr<const core::PlanNode> planNode);

bool needsInput() const override {
return !noMoreInput_ && outputQueue_.empty() &&
currentNumRows_ < targetRows_;
}

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

void noMoreInput() override {
noMoreInput_ = true;
}

exec::BlockingReason isBlocked(ContinueFuture* /*future*/) override {
return exec::BlockingReason::kNotBlocked;
}

bool isFinished() override;

private:
exec::DriverCtx* const driverCtx_;
std::vector<CudfVectorPtr> buffer_;
std::queue<std::unique_ptr<cudf::table>> outputQueue_;
size_t currentNumRows_{0};
const size_t targetRows_{0};
bool noMoreInput_{false};
};

class CudfBatchConcatTranslator : public exec::Operator::PlanNodeTranslator {
public:
std::unique_ptr<exec::Operator>
toOperator(exec::DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node);
};

} // namespace facebook::velox::cudf_velox
52 changes: 52 additions & 0 deletions velox/experimental/cudf/exec/ToCudf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveDataSource.h"
#include "velox/experimental/cudf/exec/CudfAssignUniqueId.h"
#include "velox/experimental/cudf/exec/CudfBatchConcat.h"
#include "velox/experimental/cudf/exec/CudfConversion.h"
#include "velox/experimental/cudf/exec/CudfFilterProject.h"
#include "velox/experimental/cudf/exec/CudfHashAggregation.h"
Expand Down Expand Up @@ -217,6 +218,35 @@ bool CompileState::compile(bool allowCpuFallback) {
operators.end(),
isSupportedGpuOperators.begin(),
isSupportedGpuOperator);

// Operators which break batch continuity and need concat before next operator
auto breaksBatchContinuity = [](const exec::Operator* op) {
return isAnyOf<
exec::LocalPartition,
exec::HashProbe,
exec::HashBuild,
exec::HashAggregation,
exec::StreamingAggregation,
exec::Limit,
exec::OrderBy,
exec::TopN,
CudfOperator>(op);
};

// Operators which are stateless and thus concat can safely be inserted before
auto isStatelessOperator = [](const exec::Operator* op) {
return isAnyOf<exec::FilterProject>(op);
};

// Determines if concat is needed before an operator
auto needsConcat = [breaksBatchContinuity, isStatelessOperator](
const exec::Operator* currentOp,
const exec::Operator* nextOp) {
// Add concat if current breaks batch and next is stateless
return currentOp != nullptr && nextOp != nullptr &&
breaksBatchContinuity(currentOp) && isStatelessOperator(nextOp);
};
Comment on lines +223 to +248
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems too limiting. IIUC, with this, concat can only appear before FilterProject. A HashProbe should also be fairly easily able to take a concatenated batch. Similarly for HashAggregation. The operators that don't make sense having a concat before are the ones that do the concat themselves because they collect all input before emitting output. Like OrderBy and HashBuild


auto acceptsGpuInput = [isFilterProjectSupported,
isJoinSupported,
isAggregationSupported](const exec::Operator* op) {
Expand Down Expand Up @@ -373,6 +403,17 @@ bool CompileState::compile(bool allowCpuFallback) {
keepOperator = 1;
}

// Check if batch concat needs to be added before the next operator
exec::Operator* nextOper = nullptr;
if (operatorIndex < operators.size() - 1) {
nextOper = operators[operatorIndex + 1];
}
if (needsConcat(oper, nextOper) && !nextOperatorIsNotGpu) {
auto planNode = getPlanNode(oper->planNodeId());
replaceOp.push_back(
std::make_unique<CudfBatchConcat>(oper->operatorId(), ctx, planNode));
}

if (producesGpuOutput(oper) and
(nextOperatorIsNotGpu or isLastOperatorOfTask)) {
auto planNode = getPlanNode(oper->planNodeId());
Expand Down Expand Up @@ -414,6 +455,7 @@ bool CompileState::compile(bool allowCpuFallback) {
auto condition = (GpuReplacedOperator(oper) && !replaceOp.empty() &&
keepOperator == 0) ||
(GpuRetainedOperator(oper) && replaceOp.empty() && keepOperator == 1);

if (CudfConfig::getInstance().debugEnabled) {
LOG(INFO) << "GpuReplacedOperator = " << GpuReplacedOperator(oper)
<< ", GpuRetainedOperator = " << GpuRetainedOperator(oper)
Expand Down Expand Up @@ -500,6 +542,8 @@ void registerCudf() {

exec::Operator::registerOperator(
std::make_unique<CudfHashJoinBridgeTranslator>());
exec::Operator::registerOperator(
std::make_unique<CudfBatchConcatTranslator>());
CudfDriverAdapter cda{CudfConfig::getInstance().allowCpuFallback};
exec::DriverAdapter cudfAdapter{kCudfAdapterName, {}, cda};
exec::DriverFactory::registerAdapter(cudfAdapter);
Expand Down Expand Up @@ -544,6 +588,14 @@ void CudfConfig::initialize(
if (config.find(kCudfMemoryPercent) != config.end()) {
memoryPercent = folly::to<int32_t>(config[kCudfMemoryPercent]);
}
if (config.find(kCudfBatchSizeMinThreshold) != config.end()) {
batchSizeMinThreshold =
folly::to<int32_t>(config[kCudfBatchSizeMinThreshold]);
}
if (config.find(kCudfBatchSizeMaxThreshold) != config.end()) {
batchSizeMaxThreshold =
folly::to<int32_t>(config[kCudfBatchSizeMaxThreshold]);
}
if (config.find(kCudfFunctionNamePrefix) != config.end()) {
functionNamePrefix = config[kCudfFunctionNamePrefix];
}
Expand Down
7 changes: 5 additions & 2 deletions velox/experimental/cudf/exec/Utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/exec/Utilities.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"

Expand Down Expand Up @@ -228,8 +229,10 @@ std::vector<std::unique_ptr<cudf::table>> getConcatenatedTableBatched(
}

std::vector<std::unique_ptr<cudf::table>> outputTables;
auto const maxRows =
static_cast<size_t>(std::numeric_limits<cudf::size_type>::max());
const auto& cudfConfig = CudfConfig::getInstance();
auto const maxRows = cudfConfig.batchSizeMaxThreshold
? static_cast<size_t>(cudfConfig.batchSizeMaxThreshold.value())
: static_cast<size_t>(std::numeric_limits<cudf::size_type>::max());
size_t startpos = 0;
size_t runningRows = 0;
for (size_t i = 0; i < tableViews.size(); ++i) {
Expand Down
Loading