Skip to content

Commit 52e7b82

Browse files
authored
Add a parquet reader utility to update output null masks (#19370)
Contributes to PR #19308 This PR adds a parquet reader utility to update the null masks of output buffers in order to nullify rows corresponding to the pruned out pages. No independent tests for this PR here. Instead this is tested in #19308 Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Tianyu Liu (https://github.com/kingcrimsontianyu) - Bradley Dice (https://github.com/bdice) - Vukasin Milovanovic (https://github.com/vuule) URL: #19370
1 parent cd54e33 commit 52e7b82

File tree

4 files changed

+88
-5
lines changed

4 files changed

+88
-5
lines changed

cpp/src/io/parquet/page_data.cu

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,8 @@ struct mask_tform {
473473

474474
} // anonymous namespace
475475

476-
uint32_t GetAggregatedDecodeKernelMask(cudf::detail::hostdevice_span<PageInfo const> pages,
477-
rmm::cuda_stream_view stream)
476+
uint32_t get_aggregated_decode_kernel_mask(cudf::detail::hostdevice_span<PageInfo const> pages,
477+
rmm::cuda_stream_view stream)
478478
{
479479
// determine which kernels to invoke
480480
auto mask_iter = thrust::make_transform_iterator(pages.device_begin(), mask_tform{});

cpp/src/io/parquet/parquet_gpu.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,8 +726,8 @@ void build_string_dictionary_index(ColumnChunkDesc* chunks,
726726
* @param[in] stream CUDA stream to use
727727
* @return Bitwise OR of all page `kernel_mask` values
728728
*/
729-
uint32_t GetAggregatedDecodeKernelMask(cudf::detail::hostdevice_span<PageInfo const> pages,
730-
rmm::cuda_stream_view stream);
729+
uint32_t get_aggregated_decode_kernel_mask(cudf::detail::hostdevice_span<PageInfo const> pages,
730+
rmm::cuda_stream_view stream);
731731

732732
/**
733733
* @brief Compute page output size information.

cpp/src/io/parquet/reader_impl.cpp

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_
7070
});
7171

7272
// figure out which kernels to run
73-
auto const kernel_mask = GetAggregatedDecodeKernelMask(subpass.pages, _stream);
73+
auto const kernel_mask = get_aggregated_decode_kernel_mask(subpass.pages, _stream);
7474

7575
// Check to see if there are any string columns present. If so, then we need to get size info
7676
// for each string page. This size info will be used to pre-allocate memory for the column,
@@ -445,6 +445,9 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_
445445
page_nesting.device_to_host_async(_stream);
446446
page_nesting_decode.device_to_host_async(_stream);
447447

448+
// Invalidate output buffer nullmasks at row indices spanned by pruned pages
449+
update_output_nullmasks_for_pruned_pages(host_page_mask);
450+
448451
// Copy over initial string offsets from device
449452
auto h_initial_str_offsets = cudf::detail::make_host_vector_async(initial_str_offsets, _stream);
450453

@@ -862,6 +865,79 @@ bool reader_impl::has_next()
862865
return has_more_work() or is_first_output_chunk();
863866
}
864867

868+
void reader_impl::update_output_nullmasks_for_pruned_pages(cudf::host_span<bool const> page_mask)
869+
{
870+
auto const& subpass = _pass_itm_data->subpass;
871+
auto const& pages = subpass->pages;
872+
auto const& chunks = _pass_itm_data->chunks;
873+
auto const num_columns = _input_columns.size();
874+
875+
CUDF_EXPECTS(pages.size() == page_mask.size(), "Page mask size mismatch");
876+
877+
// Return early if page mask is empty or all pages are required
878+
if (page_mask.empty() or std::all_of(page_mask.begin(), page_mask.end(), std::identity{})) {
879+
return;
880+
}
881+
882+
auto page_and_mask_begin =
883+
thrust::make_zip_iterator(thrust::make_tuple(pages.host_begin(), page_mask.begin()));
884+
885+
auto null_masks = std::vector<bitmask_type*>{};
886+
auto begin_bits = std::vector<cudf::size_type>{};
887+
auto end_bits = std::vector<cudf::size_type>{};
888+
889+
std::for_each(
890+
page_and_mask_begin, page_and_mask_begin + pages.size(), [&](auto const& page_and_mask_pair) {
891+
// Return early if the page is valid
892+
if (thrust::get<1>(page_and_mask_pair)) { return; }
893+
894+
auto const& page = thrust::get<0>(page_and_mask_pair);
895+
auto const chunk_idx = page.chunk_idx;
896+
auto const start_row = chunks[chunk_idx].start_row + page.chunk_row;
897+
auto const end_row = start_row + page.num_rows;
898+
auto& input_col = _input_columns[chunk_idx % num_columns];
899+
auto max_depth = input_col.nesting_depth();
900+
auto* cols = &_output_buffers;
901+
902+
for (size_t l_idx = 0; l_idx < max_depth; l_idx++) {
903+
auto& out_buf = (*cols)[input_col.nesting[l_idx]];
904+
cols = &out_buf.children;
905+
// Continue if the current column is a list column
906+
if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) { continue; }
907+
// Add the nullmask and bit bounds to corresponding lists
908+
null_masks.emplace_back(out_buf.null_mask());
909+
begin_bits.emplace_back(start_row);
910+
end_bits.emplace_back(end_row);
911+
912+
// Increment the null count by the number of rows in this page
913+
out_buf.null_count() += page.num_rows;
914+
}
915+
});
916+
917+
// Min number of nullmasks to use bulk update optimally
918+
constexpr auto min_nullmasks_for_bulk_update = 32;
919+
920+
// Bulk update the nullmasks if optimal
921+
if (null_masks.size() >= min_nullmasks_for_bulk_update) {
922+
auto valids = cudf::detail::make_host_vector<bool>(null_masks.size(), _stream);
923+
std::fill(valids.begin(), valids.end(), false);
924+
cudf::set_null_masks_safe(null_masks, begin_bits, end_bits, valids, _stream);
925+
}
926+
// Otherwise, update the nullmasks in a loop
927+
else {
928+
auto nullmask_iter = thrust::make_zip_iterator(
929+
thrust::make_tuple(null_masks.begin(), begin_bits.begin(), end_bits.begin()));
930+
std::for_each(
931+
nullmask_iter, nullmask_iter + null_masks.size(), [&](auto const& nullmask_tuple) {
932+
cudf::set_null_mask(thrust::get<0>(nullmask_tuple),
933+
thrust::get<1>(nullmask_tuple),
934+
thrust::get<2>(nullmask_tuple),
935+
false,
936+
_stream);
937+
});
938+
}
939+
}
940+
865941
namespace {
866942
parquet_column_schema walk_schema(aggregate_reader_metadata const* mt, int idx)
867943
{

cpp/src/io/parquet/reader_impl.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,13 @@ class reader_impl {
302302
*/
303303
void decode_page_data(read_mode mode, size_t skip_rows, size_t num_rows);
304304

305+
/**
306+
* @brief Invalidate output buffer nullmask for rows spanned by the pruned pages
307+
*
308+
* @param page_mask Boolean vector indicating if a page needs to be decoded or is pruned
309+
*/
310+
void update_output_nullmasks_for_pruned_pages(cudf::host_span<bool const> page_mask);
311+
305312
/**
306313
* @brief Creates file-wide parquet chunk information.
307314
*

0 commit comments

Comments
 (0)