Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

ResultSet refactoring and clean-up [07/N] #220

Merged
merged 1 commit into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ class RaExecutionDesc {
QueryMemoryDescriptor(),
nullptr,
nullptr,
nullptr,
0,
0),
{}) {}
Expand Down
11 changes: 1 addition & 10 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ void Executor::resetCodeCache() {

Executor::Executor(const ExecutorId executor_id,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
ConfigPtr config,
const std::string& debug_dir,
const std::string& debug_file)
Expand All @@ -157,7 +156,6 @@ Executor::Executor(const ExecutorId executor_id,
, debug_dir_(debug_dir)
, debug_file_(debug_file)
, data_mgr_(data_mgr)
, buffer_provider_(buffer_provider)
, temporary_tables_(nullptr)
, input_table_info_cache_(this)
, thread_id_(logger::thread_id()) {
Expand Down Expand Up @@ -386,7 +384,6 @@ Executor::CgenStateManager::~CgenStateManager() {
}

std::shared_ptr<Executor> Executor::getExecutor(Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
ConfigPtr config,
const std::string& debug_dir,
const std::string& debug_file) {
Expand All @@ -397,7 +394,7 @@ std::shared_ptr<Executor> Executor::getExecutor(Data_Namespace::DataMgr* data_mg
}

return std::make_shared<Executor>(
executor_id_ctr_++, data_mgr, buffer_provider, config, debug_dir, debug_file);
executor_id_ctr_++, data_mgr, config, debug_dir, debug_file);
}

void Executor::clearMemory(const Data_Namespace::MemoryLevel memory_level,
Expand Down Expand Up @@ -1119,7 +1116,6 @@ TemporaryTable Executor::resultsUnion(SharedKernelContext& shared_context,
QueryMemoryDescriptor(),
row_set_mem_owner_,
data_mgr_,
buffer_provider_,
blockSize(),
gridSize());
}
Expand Down Expand Up @@ -1167,7 +1163,6 @@ ResultSetPtr Executor::reduceMultiDeviceResults(
QueryMemoryDescriptor(),
nullptr,
data_mgr_,
buffer_provider_,
blockSize(),
gridSize());
}
Expand Down Expand Up @@ -1240,7 +1235,6 @@ ResultSetPtr Executor::reduceMultiDeviceResultSets(
query_mem_desc,
row_set_mem_owner,
data_mgr_,
buffer_provider_,
blockSize(),
gridSize());
auto result_storage = reduced_results->allocateStorage(plan_state_->init_agg_vals_);
Expand Down Expand Up @@ -2003,7 +1997,6 @@ TemporaryTable Executor::executeWorkUnitImpl(
QueryMemoryDescriptor(),
nullptr,
data_mgr_,
buffer_provider_,
blockSize(),
gridSize());
}
Expand Down Expand Up @@ -2108,7 +2101,6 @@ ResultSetPtr Executor::executeTableFunction(
ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
this->getRowSetMemoryOwner(),
data_mgr_,
buffer_provider_,
this->blockSize(),
this->gridSize());
}
Expand Down Expand Up @@ -2305,7 +2297,6 @@ ResultSetPtr build_row_for_empty_input(
query_mem_desc,
row_set_mem_owner,
executor->getDataMgr(),
executor->getBufferProvider(),
executor->blockSize(),
executor->gridSize());
rs->allocateStorage();
Expand Down
7 changes: 2 additions & 5 deletions omniscidb/QueryEngine/Execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ class Executor {
// executors map is populated
Executor(const ExecutorId id,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
ConfigPtr config,
const std::string& debug_dir,
const std::string& debug_file);
Expand All @@ -266,7 +265,6 @@ class Executor {
void reset(const bool discard_runtime_modules_only = false);

static std::shared_ptr<Executor> getExecutor(Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
ConfigPtr config = nullptr,
const std::string& debug_dir = "",
const std::string& debug_file = "");
Expand Down Expand Up @@ -353,8 +351,8 @@ class Executor {
}

BufferProvider* getBufferProvider() const {
CHECK(buffer_provider_);
return buffer_provider_;
CHECK(data_mgr_);
return data_mgr_->getBufferProvider();
}

const Config& getConfig() const { return *config_; }
Expand Down Expand Up @@ -1002,7 +1000,6 @@ class Executor {

SchemaProviderPtr schema_provider_;
Data_Namespace::DataMgr* data_mgr_;
BufferProvider* buffer_provider_;
const TemporaryTables* temporary_tables_;
TableIdToNodeMap table_id_to_node_map_;

Expand Down
1 change: 0 additions & 1 deletion omniscidb/QueryEngine/ExternalExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ std::unique_ptr<ResultSet> SqliteMemDatabase::runSelect(
query_mem_desc,
output_spec.executor->getRowSetMemoryOwner(),
nullptr,
nullptr,
0,
0);
const auto storage = rs->allocateStorage();
Expand Down
12 changes: 4 additions & 8 deletions omniscidb/QueryEngine/QueryExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ ResultSetPtr QueryExecutionContext::groupBufferToDeinterleavedResults(
deinterleaved_query_mem_desc,
row_set_mem_owner_,
executor_->getDataMgr(),
executor_->getBufferProvider(),
executor_->blockSize(),
executor_->gridSize());
auto deinterleaved_storage =
Expand Down Expand Up @@ -414,11 +413,8 @@ std::vector<int64_t*> QueryExecutionContext::launchGpuCode(
shared_memory_size ? 1 : block_size_x * grid_size_x * num_fragments;
const auto output_buffer_size_per_agg = num_results_per_agg_col * sizeof(int64_t);
if (ra_exe_unit.estimator) {
estimator_result_set_.reset(new ResultSet(ra_exe_unit.estimator,
ExecutorDeviceType::GPU,
device_id,
data_mgr,
buffer_provider));
estimator_result_set_.reset(new ResultSet(
ra_exe_unit.estimator, ExecutorDeviceType::GPU, device_id, data_mgr));
out_vec_dev_buffers.push_back(
reinterpret_cast<int8_t*>(estimator_result_set_->getDeviceEstimatorBuffer()));
} else {
Expand Down Expand Up @@ -564,8 +560,8 @@ std::vector<int64_t*> QueryExecutionContext::launchCpuCode(
// Subfragments collect the result from multiple runs in a single
// result set.
if (!estimator_result_set_) {
estimator_result_set_.reset(new ResultSet(
ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr, nullptr));
estimator_result_set_.reset(
new ResultSet(ra_exe_unit.estimator, ExecutorDeviceType::CPU, 0, nullptr));
}
out_vec.push_back(
reinterpret_cast<int64_t*>(estimator_result_set_->getHostEstimatorBuffer()));
Expand Down
2 changes: 0 additions & 2 deletions omniscidb/QueryEngine/QueryMemoryInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ QueryMemoryInitializer::QueryMemoryInitializer(
ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
row_set_mem_owner_,
executor->getDataMgr(),
executor->getBufferProvider(),
executor->blockSize(),
executor->gridSize()));
result_sets_.back()->allocateStorage(reinterpret_cast<int8_t*>(group_by_buffer),
Expand Down Expand Up @@ -491,7 +490,6 @@ QueryMemoryInitializer::QueryMemoryInitializer(
ResultSet::fixupQueryMemoryDescriptor(query_mem_desc),
row_set_mem_owner_,
executor->getDataMgr(),
executor->getBufferProvider(),
executor->blockSize(),
executor->gridSize()));
result_sets_.back()->allocateStorage(reinterpret_cast<int8_t*>(group_by_buffer),
Expand Down
20 changes: 8 additions & 12 deletions omniscidb/QueryEngine/RelAlgExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1649,7 +1649,6 @@ ExecutionResult RelAlgExecutor::executeTableFunction(
QueryMemoryDescriptor(),
nullptr,
executor_->getDataMgr(),
executor_->getBufferProvider(),
executor_->blockSize(),
executor_->gridSize()),
{}};
Expand Down Expand Up @@ -2285,7 +2284,6 @@ ExecutionResult RelAlgExecutor::executeWorkUnit(
QueryMemoryDescriptor(),
nullptr,
executor_->getDataMgr(),
executor_->getBufferProvider(),
executor_->blockSize(),
executor_->gridSize()),
{}};
Expand Down Expand Up @@ -2453,16 +2451,14 @@ ExecutionResult RelAlgExecutor::handleOutOfMemoryRetry(
auto ra_exe_unit_in = work_unit.exe_unit;
ra_exe_unit_in.use_bump_allocator = false;

auto result =
ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
co.device_type,
QueryMemoryDescriptor(),
nullptr,
executor_->getDataMgr(),
executor_->getBufferProvider(),
executor_->blockSize(),
executor_->gridSize()),
{}};
auto result = ExecutionResult{std::make_shared<ResultSet>(std::vector<TargetInfo>{},
co.device_type,
QueryMemoryDescriptor(),
nullptr,
executor_->getDataMgr(),
executor_->blockSize(),
executor_->gridSize()),
{}};

const auto table_infos = get_table_infos(ra_exe_unit_in, executor_);
auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
Expand Down
33 changes: 17 additions & 16 deletions omniscidb/QueryEngine/ResultSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
const QueryMemoryDescriptor& query_mem_desc,
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
const unsigned block_size,
const unsigned grid_size)
: targets_(targets)
Expand All @@ -81,7 +80,6 @@ ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
, block_size_(block_size)
, grid_size_(grid_size)
, data_mgr_(data_mgr)
, buffer_provider_(buffer_provider)
, separate_varlen_storage_valid_(false)
, just_explain_(false)
, for_validation_only_(false)
Expand All @@ -97,7 +95,6 @@ ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
const QueryMemoryDescriptor& query_mem_desc,
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
const unsigned block_size,
const unsigned grid_size)
: targets_(targets)
Expand All @@ -116,7 +113,6 @@ ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
, frag_offsets_{frag_offsets}
, consistent_frag_sizes_{consistent_frag_sizes}
, data_mgr_(data_mgr)
, buffer_provider_(buffer_provider)
, separate_varlen_storage_valid_(false)
, just_explain_(false)
, for_validation_only_(false)
Expand All @@ -125,25 +121,23 @@ ResultSet::ResultSet(const std::vector<TargetInfo>& targets,
ResultSet::ResultSet(const std::shared_ptr<const Analyzer::Estimator> estimator,
const ExecutorDeviceType device_type,
const int device_id,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider)
Data_Namespace::DataMgr* data_mgr)
: device_type_(device_type)
, device_id_(device_id)
, query_mem_desc_{}
, crt_row_buff_idx_(0)
, estimator_(estimator)
, data_mgr_(data_mgr)
, buffer_provider_(buffer_provider)
, separate_varlen_storage_valid_(false)
, just_explain_(false)
, for_validation_only_(false)
, cached_row_count_(uninitialized_cached_row_count) {
if (device_type == ExecutorDeviceType::GPU) {
device_estimator_buffer_ = GpuAllocator::allocGpuAbstractBuffer(
buffer_provider_, estimator_->getBufferSize(), device_id_);
buffer_provider_->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
estimator_->getBufferSize(),
device_id_);
getBufferProvider(), estimator_->getBufferSize(), device_id_);
getBufferProvider()->zeroDeviceMem(device_estimator_buffer_->getMemoryPtr(),
estimator_->getBufferSize(),
device_id_);
} else {
host_estimator_buffer_ =
static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
Expand Down Expand Up @@ -690,7 +684,7 @@ void ResultSet::syncEstimatorBuffer() const {
static_cast<int8_t*>(checked_calloc(estimator_->getBufferSize(), 1));
CHECK(device_estimator_buffer_);
auto device_buffer_ptr = device_estimator_buffer_->getMemoryPtr();
buffer_provider_->copyFromDevice(
getBufferProvider()->copyFromDevice(
host_estimator_buffer_, device_buffer_ptr, estimator_->getBufferSize(), device_id_);
}

Expand Down Expand Up @@ -1332,7 +1326,7 @@ void ResultSet::radixSortOnGpu(
const std::list<hdk::ir::OrderEntry>& order_entries) const {
auto timer = DEBUG_TIMER(__func__);
const int device_id{0};
GpuAllocator cuda_allocator(buffer_provider_, device_id);
GpuAllocator cuda_allocator(getBufferProvider(), device_id);
CHECK_GT(block_size_, 0);
CHECK_GT(grid_size_, 0);
std::vector<int64_t*> group_by_buffers(block_size_);
Expand All @@ -1352,10 +1346,13 @@ void ResultSet::radixSortOnGpu(
/*use_bump_allocator=*/false,
/*has_varlen_output=*/false,
/*insitu_allocator*=*/nullptr);
inplace_sort_gpu(
order_entries, query_mem_desc_, dev_group_by_buffers, buffer_provider_, device_id);
inplace_sort_gpu(order_entries,
query_mem_desc_,
dev_group_by_buffers,
getBufferProvider(),
device_id);
copy_group_by_buffers_from_gpu(
buffer_provider_,
getBufferProvider(),
group_by_buffers,
query_mem_desc_.getBufferSizeBytes(ExecutorDeviceType::GPU),
dev_group_by_buffers.data,
Expand Down Expand Up @@ -1584,3 +1581,7 @@ bool result_set::can_use_parallel_algorithms(const ResultSet& rows) {
bool result_set::use_parallel_algorithms(const ResultSet& rows) {
return result_set::can_use_parallel_algorithms(rows) && rows.entryCount() >= 20000;
}

BufferProvider* ResultSet::getBufferProvider() const {
return data_mgr_ ? data_mgr_->getBufferProvider() : nullptr;
}
7 changes: 2 additions & 5 deletions omniscidb/QueryEngine/ResultSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ class ResultSet {
const QueryMemoryDescriptor& query_mem_desc,
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
const unsigned block_size,
const unsigned grid_size);

Expand All @@ -182,15 +181,13 @@ class ResultSet {
const QueryMemoryDescriptor& query_mem_desc,
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider,
const unsigned block_size,
const unsigned grid_size);

ResultSet(const std::shared_ptr<const Analyzer::Estimator>,
const ExecutorDeviceType device_type,
const int device_id,
Data_Namespace::DataMgr* data_mgr,
BufferProvider* buffer_provider);
Data_Namespace::DataMgr* data_mgr);

ResultSet(const std::string& explanation);

Expand Down Expand Up @@ -803,6 +800,7 @@ class ResultSet {
size_t rowCountImpl(const bool force_parallel) const;

Data_Namespace::DataMgr* getDataManager() const;
BufferProvider* getBufferProvider() const;

int getGpuCount() const;

Expand Down Expand Up @@ -855,7 +853,6 @@ class ResultSet {
Data_Namespace::AbstractBuffer* device_estimator_buffer_{nullptr};
mutable int8_t* host_estimator_buffer_{nullptr};
Data_Namespace::DataMgr* data_mgr_{nullptr};
BufferProvider* buffer_provider_{nullptr};

// only used by serialization
using SerializedVarlenBufferStorage = std::vector<std::string>;
Expand Down
1 change: 0 additions & 1 deletion omniscidb/QueryEngine/ResultSetBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ ResultSet* ResultSetBuilder::makeResultSet(
query_mem_desc,
row_set_mem_owner,
executor ? executor->getDataMgr() : nullptr,
executor ? executor->getBufferProvider() : nullptr,
executor ? executor->blockSize() : 0,
executor ? executor->gridSize() : 0);
}
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/ResultSetIteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ InternalTargetValue ResultSet::getVarlenOrderEntry(const int64_t str_ptr,
cpu_buffer.resize(str_len);
const auto executor = query_mem_desc_.getExecutor();
CHECK(executor);
buffer_provider_->copyFromDevice(
getBufferProvider()->copyFromDevice(
&cpu_buffer[0], reinterpret_cast<const int8_t*>(str_ptr), str_len, device_id_);
host_str_ptr = reinterpret_cast<char*>(&cpu_buffer[0]);
} else {
Expand Down
1 change: 0 additions & 1 deletion omniscidb/QueryEngine/ResultSetReduction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,6 @@ ResultSet* ResultSetManager::reduce(std::vector<ResultSet*>& result_sets,
query_mem_desc,
row_set_mem_owner,
result_rs->data_mgr_,
result_rs->buffer_provider_,
0,
0));
auto result_storage = rs_->allocateStorage(first_result.target_init_vals_);
Expand Down
Loading