Skip to content

Commit 6fef829

Browse files
authored
Implement row group pruning with bloom filters in experimental PQ reader (#18545)
Contributes to #17896. Part of #18011. This PR implements row group pruning with bloom filters in the experimental Parquet reader optimized for hybrid scan queries. Dictionary based row group pruning is still WIP in a separate branch and so this PR has empty definitions where needed. Note: Unfortunately, we can't add any tests for this feature as we don't yet have capability of writing parquet files with bloom filters. However, the code that filters row groups with bloom filters is identical to already tested code at: https://github.com/rapidsai/cudf/blob/branch-25.06/cpp/src/io/parquet/predicate_pushdown.cpp#L198-L240 Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Nghia Truong (https://github.com/ttnghia) - Bradley Dice (https://github.com/bdice) URL: #18545
1 parent bf25e60 commit 6fef829

File tree

8 files changed

+501
-98
lines changed

8 files changed

+501
-98
lines changed

cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ add_library(
546546
src/io/parquet/compact_protocol_reader.cpp
547547
src/io/parquet/compact_protocol_writer.cpp
548548
src/io/parquet/decode_preprocess.cu
549+
src/io/parquet/experimental/dictionary_page_filter.cu
549550
src/io/parquet/experimental/hybrid_scan.cpp
550551
src/io/parquet/experimental/hybrid_scan_helpers.cpp
551552
src/io/parquet/experimental/hybrid_scan_impl.cpp

cpp/include/cudf/io/experimental/hybrid_scan.hpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,16 @@ class hybrid_scan_reader {
339339
rmm::cuda_stream_view stream) const;
340340

341341
/**
342-
* @brief Get byte ranges of bloom filters and dictionary pages (secondary filters) for
343-
* further row group pruning
342+
* @brief Get byte ranges of bloom filters and dictionary pages (secondary filters) for row group
343+
* pruning
344+
*
345+
* @note Device buffers for bloom filter byte ranges must be allocated using a 32 byte
346+
* aligned memory resource
344347
*
345348
* @param row_group_indices Input row groups indices
346349
* @param options Parquet reader options
347-
* @return Pair of vectors of byte ranges to per-column-chunk bloom filters and dictionary pages
350+
* @return Pair of vectors of byte ranges of column chunk with bloom filters and dictionary
351+
* pages subject to filter predicate
348352
*/
349353
[[nodiscard]] std::pair<std::vector<byte_range_info>, std::vector<byte_range_info>>
350354
secondary_filters_byte_ranges(cudf::host_span<size_type const> row_group_indices,
@@ -353,7 +357,8 @@ class hybrid_scan_reader {
353357
/**
354358
* @brief Filter the row groups using column chunk dictionary pages
355359
*
356-
* @param dictionary_page_data Device buffers containing per-column-chunk dictionary page data
360+
* @param dictionary_page_data Device buffers containing dictionary page data of column chunks
361+
* with (in)equality predicate
357362
* @param row_group_indices Input row groups indices
358363
* @param options Parquet reader options
359364
* @param stream CUDA stream used for device memory operations and kernel launches
@@ -368,7 +373,11 @@ class hybrid_scan_reader {
368373
/**
369374
* @brief Filter the row groups using column chunk bloom filters
370375
*
371-
* @param bloom_filter_data Device buffers containing per-column-chunk bloom filter data
376+
* @note The `bloom_filter_data` device buffers must be allocated using a 32
377+
* byte aligned memory resource
378+
*
379+
* @param bloom_filter_data Device buffers containing bloom filter data of column chunks with
380+
* an equality predicate
372381
* @param row_group_indices Input row groups indices
373382
* @param options Parquet reader options
374383
* @param stream CUDA stream used for device memory operations and kernel launches

cpp/src/io/parquet/bloom_filter_reader.cu

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -513,15 +513,15 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
513513
host_span<std::vector<ast::literal*> const> literals,
514514
size_type total_row_groups,
515515
host_span<data_type const> output_dtypes,
516-
host_span<int const> equality_col_schemas,
516+
host_span<int const> bloom_filter_col_schemas,
517517
std::reference_wrapper<ast::expression const> filter,
518518
rmm::cuda_stream_view stream) const
519519
{
520520
// Number of input table columns
521521
auto const num_input_columns = static_cast<cudf::size_type>(output_dtypes.size());
522522

523523
// Get parquet types for the predicate columns
524-
auto const parquet_types = get_parquet_types(input_row_group_indices, equality_col_schemas);
524+
auto const parquet_types = get_parquet_types(input_row_group_indices, bloom_filter_col_schemas);
525525

526526
// Create spans from bloom filter bitset buffers to use in cuco::bloom_filter_ref.
527527
std::vector<cudf::device_span<cuda::std::byte>> h_bloom_filter_spans;
@@ -542,7 +542,7 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
542542
bloom_filter_caster const bloom_filter_col{bloom_filter_spans,
543543
parquet_types,
544544
static_cast<size_t>(total_row_groups),
545-
equality_col_schemas.size()};
545+
bloom_filter_col_schemas.size()};
546546

547547
// Converts bloom filter membership for equality predicate columns to a table
548548
// containing a column for each `col[i] == literal` predicate to be evaluated.
@@ -584,8 +584,6 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
584584
stream);
585585
}
586586

587-
equality_literals_collector::equality_literals_collector() = default;
588-
589587
equality_literals_collector::equality_literals_collector(ast::expression const& expr,
590588
cudf::size_type num_input_columns)
591589
: _num_input_columns{num_input_columns}
@@ -604,7 +602,7 @@ std::reference_wrapper<ast::expression const> equality_literals_collector::visit
604602
ast::column_reference const& expr)
605603
{
606604
CUDF_EXPECTS(expr.get_table_source() == ast::table_reference::LEFT,
607-
"BloomfilterAST supports only left table");
605+
"DictionaryAST and BloomfilterAST support only left table");
608606
CUDF_EXPECTS(expr.get_column_index() < _num_input_columns,
609607
"Column index cannot be more than number of columns in the table");
610608
return expr;
@@ -613,7 +611,7 @@ std::reference_wrapper<ast::expression const> equality_literals_collector::visit
613611
std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
614612
ast::column_name_reference const& expr)
615613
{
616-
CUDF_FAIL("Column name reference is not supported in BloomfilterAST");
614+
CUDF_FAIL("Column name reference is not supported in DictionaryAST and BloomfilterAST");
617615
}
618616

619617
std::reference_wrapper<ast::expression const> equality_literals_collector::visit(
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "hybrid_scan_helpers.hpp"
18+
19+
#include <cudf/ast/detail/expression_transformer.hpp>
20+
#include <cudf/ast/detail/operators.hpp>
21+
#include <cudf/ast/expressions.hpp>
22+
#include <cudf/detail/cuco_helpers.hpp>
23+
#include <cudf/detail/transform.hpp>
24+
#include <cudf/hashing/detail/xxhash_64.cuh>
25+
#include <cudf/io/parquet_schema.hpp>
26+
#include <cudf/logger.hpp>
27+
#include <cudf/utilities/span.hpp>
28+
#include <cudf/utilities/traits.hpp>
29+
#include <cudf/utilities/type_checks.hpp>
30+
31+
#include <rmm/cuda_stream_view.hpp>
32+
#include <rmm/device_buffer.hpp>
33+
#include <rmm/exec_policy.hpp>
34+
35+
#include <thrust/iterator/counting_iterator.h>
36+
#include <thrust/tabulate.h>
37+
38+
#include <future>
39+
#include <numeric>
40+
#include <optional>
41+
42+
namespace cudf::io::parquet::experimental::detail {
43+
44+
using parquet::detail::chunk_page_info;
45+
using parquet::detail::ColumnChunkDesc;
46+
using parquet::detail::decode_error;
47+
using parquet::detail::PageInfo;
48+
49+
dictionary_literals_collector::dictionary_literals_collector(ast::expression const& expr,
50+
cudf::size_type num_input_columns)
51+
{
52+
_num_input_columns = num_input_columns;
53+
_literals.resize(num_input_columns);
54+
expr.accept(*this);
55+
}
56+
57+
std::reference_wrapper<ast::expression const> dictionary_literals_collector::visit(
58+
ast::operation const& expr)
59+
{
60+
using cudf::ast::ast_operator;
61+
auto const operands = expr.get_operands();
62+
auto const op = expr.get_operator();
63+
64+
if (auto* v = dynamic_cast<ast::column_reference const*>(&operands[0].get())) {
65+
// First operand should be column reference, second should be literal.
66+
CUDF_EXPECTS(cudf::ast::detail::ast_operator_arity(op) == 2,
67+
"Only binary operations are supported on column reference");
68+
auto const literal_ptr = dynamic_cast<ast::literal const*>(&operands[1].get());
69+
CUDF_EXPECTS(literal_ptr != nullptr,
70+
"Second operand of binary operation with column reference must be a literal");
71+
v->accept(*this);
72+
73+
// Push to the corresponding column's literals and operators list iff EQUAL or NOT_EQUAL
74+
// operator is seen
75+
if (op == ast_operator::EQUAL or op == ast::ast_operator::NOT_EQUAL) {
76+
auto const col_idx = v->get_column_index();
77+
_literals[col_idx].emplace_back(const_cast<ast::literal*>(literal_ptr));
78+
}
79+
} else {
80+
// Visit the operands and ignore any output as we only want to collect literals
81+
std::ignore = visit_operands(operands);
82+
}
83+
84+
return expr;
85+
}
86+
87+
} // namespace cudf::io::parquet::experimental::detail

0 commit comments

Comments
 (0)