Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

ResultSet refactoring and clean-up [17/N] #274

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 12 additions & 24 deletions omniscidb/QueryEngine/Descriptors/QueryMemoryDescriptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,18 @@

#include "QueryEngine/Descriptors/QueryMemoryDescriptor.h"

#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/algorithm/string.hpp>

#include "DataMgr/DataMgr.h"
#include "QueryEngine/ColRangeInfo.h"
#include "QueryEngine/StreamingTopN.h"

namespace {

bool anyOf(std::vector<const hdk::ir::Expr*> const& target_exprs,
hdk::ir::AggType agg_kind) {
return boost::algorithm::any_of(target_exprs, [agg_kind](hdk::ir::Expr const* expr) {
auto const* const agg = dynamic_cast<hdk::ir::AggExpr const*>(expr);
return agg && agg->aggType() == agg_kind;
});
}

} // namespace

QueryMemoryDescriptor::QueryMemoryDescriptor(
Data_Namespace::DataMgr* data_mgr,
ConfigPtr config,
const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& query_infos,
const bool use_bump_allocator,
const bool approx_quantile,
const bool allow_multifrag,
const bool keyless_hash,
const bool interleaved_bins_on_gpu,
Expand Down Expand Up @@ -81,7 +70,7 @@ QueryMemoryDescriptor::QueryMemoryDescriptor(
sort_on_gpu_ = sort_on_gpu_hint && canOutputColumnar() && !keyless_hash_;

if (sort_on_gpu_) {
CHECK(!ra_exe_unit.use_bump_allocator);
CHECK(!use_bump_allocator);
output_columnar_ = true;
} else {
switch (query_desc_type_) {
Expand All @@ -91,11 +80,10 @@ QueryMemoryDescriptor::QueryMemoryDescriptor(
case QueryDescriptionType::GroupByPerfectHash:
case QueryDescriptionType::GroupByBaselineHash:
case QueryDescriptionType::NonGroupedAggregate:
output_columnar_ =
output_columnar_hint &&
QueryMemoryDescriptor::countDescriptorsLogicallyEmpty(
count_distinct_descriptors_) &&
!anyOf(ra_exe_unit.target_exprs, hdk::ir::AggType::kApproxQuantile);
output_columnar_ = output_columnar_hint &&
QueryMemoryDescriptor::countDescriptorsLogicallyEmpty(
count_distinct_descriptors_) &&
!approx_quantile;
break;
default:
output_columnar_ = false;
Expand All @@ -106,7 +94,7 @@ QueryMemoryDescriptor::QueryMemoryDescriptor(
if (isLogicalSizedColumnsAllowed()) {
// TODO(adb): Ensure fixed size buffer allocations are correct with all logical
// column sizes
CHECK(!ra_exe_unit.use_bump_allocator);
CHECK(!use_bump_allocator);
col_slot_context_.setAllSlotsPaddedSizeToLogicalSize();
col_slot_context_.validate();
}
Expand Down Expand Up @@ -453,12 +441,12 @@ size_t QueryMemoryDescriptor::getNextColOffInBytesRowOnly(const int8_t* col_ptr,
}

size_t QueryMemoryDescriptor::getBufferSizeBytes(
const RelAlgExecutionUnit& ra_exe_unit,
const size_t max_rows,
const unsigned thread_count,
const ExecutorDeviceType device_type) const {
if (use_streaming_top_n_) {
const size_t n = ra_exe_unit.sort_info.offset + ra_exe_unit.sort_info.limit;
return streaming_top_n::get_heap_size(getRowSize(), n, thread_count);
CHECK_GT(max_rows, size_t(0));
return streaming_top_n::get_heap_size(getRowSize(), max_rows, thread_count);
}
return getBufferSizeBytes(device_type, entry_count_);
}
Expand Down
23 changes: 3 additions & 20 deletions omniscidb/QueryEngine/Descriptors/QueryMemoryDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class BufferProvider;
class QueryExecutionContext;
class RowSetMemoryOwner;
struct InputTableInfo;
struct RelAlgExecutionUnit;
class TResultSetBufferDescriptor;
struct ColRangeInfo;

Expand All @@ -77,8 +76,9 @@ class QueryMemoryDescriptor {
// constructor for init call
QueryMemoryDescriptor(Data_Namespace::DataMgr* data_mgr,
ConfigPtr config,
const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& query_infos,
const bool use_bump_allocator,
const bool approx_quantile,
const bool allow_multifrag,
const bool keyless_hash,
const bool interleaved_bins_on_gpu,
Expand Down Expand Up @@ -109,23 +109,6 @@ class QueryMemoryDescriptor {

bool operator==(const QueryMemoryDescriptor& other) const;

static std::unique_ptr<QueryMemoryDescriptor> init(
Data_Namespace::DataMgr* data_mgr,
ConfigPtr config,
const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& query_infos,
const ColRangeInfo& col_range_info,
const KeylessInfo& keyless_info,
const bool allow_multifrag,
const ExecutorDeviceType device_type,
const int8_t crt_min_byte_width,
const bool sort_on_gpu_hint,
const size_t max_groups_buffer_entry_count,
const CountDistinctDescriptors count_distinct_descriptors,
const bool must_use_baseline_sort,
const bool output_columnar_hint,
const bool streaming_top_n_hint);

static bool many_entries(const int64_t max_val,
const int64_t min_val,
const int64_t bucket) {
Expand Down Expand Up @@ -263,7 +246,7 @@ class QueryMemoryDescriptor {
size_t getKeyCount() const { return keyless_hash_ ? 0 : getGroupbyColCount(); }
size_t getBufferColSlotCount() const;

size_t getBufferSizeBytes(const RelAlgExecutionUnit& ra_exe_unit,
size_t getBufferSizeBytes(const size_t max_rows,
const unsigned thread_count,
const ExecutorDeviceType device_type) const;
size_t getBufferSizeBytes(const ExecutorDeviceType device_type) const;
Expand Down
18 changes: 16 additions & 2 deletions omniscidb/QueryEngine/MemoryLayoutBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "QueryEngine/OutputBufferInitialization.h"
#include "QueryEngine/UsedColumnsCollector.h"

#include <boost/algorithm/cxx11/any_of.hpp>

MemoryLayoutBuilder::MemoryLayoutBuilder(const RelAlgExecutionUnit& ra_exe_unit)
: ra_exe_unit_(ra_exe_unit) {
for (const auto& groupby_expr : ra_exe_unit_.groupby_exprs) {
Expand Down Expand Up @@ -753,6 +755,14 @@ std::vector<int64_t> target_expr_proj_indices(const RelAlgExecutionUnit& ra_exe_
return target_indices;
}

bool anyOf(std::vector<const hdk::ir::Expr*> const& target_exprs,
hdk::ir::AggType agg_kind) {
return boost::algorithm::any_of(target_exprs, [agg_kind](hdk::ir::Expr const* expr) {
auto const* const agg = dynamic_cast<hdk::ir::AggExpr const*>(expr);
return agg && agg->aggType() == agg_kind;
});
}

std::unique_ptr<QueryMemoryDescriptor> build_query_memory_descriptor(
const Executor* executor,
const RelAlgExecutionUnit& ra_exe_unit,
Expand Down Expand Up @@ -790,8 +800,9 @@ std::unique_ptr<QueryMemoryDescriptor> build_query_memory_descriptor(
return std::make_unique<QueryMemoryDescriptor>(
executor->getDataMgr(),
executor->getConfigPtr(),
ra_exe_unit,
query_infos,
false,
false,
allow_multifrag,
false,
false,
Expand Down Expand Up @@ -928,10 +939,13 @@ std::unique_ptr<QueryMemoryDescriptor> build_query_memory_descriptor(
UNREACHABLE() << "Unknown query type";
}

auto approx_quantile =
anyOf(ra_exe_unit.target_exprs, hdk::ir::AggType::kApproxQuantile);
return std::make_unique<QueryMemoryDescriptor>(executor->getDataMgr(),
executor->getConfigPtr(),
ra_exe_unit,
query_infos,
ra_exe_unit.use_bump_allocator,
approx_quantile,
allow_multifrag,
keyless_hash,
interleaved_bins_on_gpu,
Expand Down
8 changes: 5 additions & 3 deletions omniscidb/QueryEngine/QueryMemoryInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ QueryMemoryInitializer::QueryMemoryInitializer(
query_mem_desc.getRowSize();
}
} else {
size_t max_rows = ra_exe_unit.sort_info.offset + ra_exe_unit.sort_info.limit;
group_buffer_size =
query_mem_desc.getBufferSizeBytes(ra_exe_unit, thread_count, device_type);
query_mem_desc.getBufferSizeBytes(max_rows, thread_count, device_type);
}
CHECK_GE(group_buffer_size, size_t(0));

Expand Down Expand Up @@ -1288,10 +1289,11 @@ void QueryMemoryInitializer::applyStreamingTopNOffsetCpu(
const size_t buffer_start_idx = query_mem_desc.hasVarlenOutput() ? 1 : 0;
CHECK_EQ(group_by_buffers_.size(), buffer_start_idx + 1);

size_t max_rows = ra_exe_unit.sort_info.offset + ra_exe_unit.sort_info.limit;
const auto rows_copy = streaming_top_n::get_rows_copy_from_heaps(
group_by_buffers_[buffer_start_idx],
query_mem_desc.getBufferSizeBytes(ra_exe_unit, 1, ExecutorDeviceType::CPU),
ra_exe_unit.sort_info.offset + ra_exe_unit.sort_info.limit,
query_mem_desc.getBufferSizeBytes(max_rows, 1, ExecutorDeviceType::CPU),
max_rows,
1);
CHECK_EQ(rows_copy.size(),
query_mem_desc.getEntryCount() * query_mem_desc.getRowSize());
Expand Down
3 changes: 1 addition & 2 deletions omniscidb/QueryEngine/StreamingTopN.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ std::vector<int8_t> pick_top_n_rows_from_dev_heaps(
return pop_n_rows_from_merged_heaps_gpu(
buffer_provider,
dev_heaps_buffer,
query_mem_desc.getBufferSizeBytes(
ra_exe_unit, thread_count, ExecutorDeviceType::GPU),
query_mem_desc.getBufferSizeBytes(n, thread_count, ExecutorDeviceType::GPU),
n,
pod_oe,
oe_layout,
Expand Down