-
Notifications
You must be signed in to change notification settings - Fork 960
POC: Sketch out parquet cached filter result API #7513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
78f96d1
to
31f2fa1
Compare
31f2fa1
to
244e187
Compare
filters: Vec<BooleanArray>, | ||
} | ||
|
||
impl CachedPredicateResultBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is very clear to get the cached result!
8961196
to
9e91e9f
Compare
/// TODO: potentially incrementally build the result of the predicate | ||
/// evaluation without holding all the batches in memory. See | ||
/// <https://github.com/apache/arrow-rs/issues/6692> | ||
in_progress_arrays: Vec<Box<dyn InProgressArray>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb ,
Does it mean, this in_progress_arrays is not the final result for us to generate the final batch?
For example:
Predicate a > 1 => in_progress_array_a filtered by a > 1
Predicate b >2 => in_progress_array_b filtered by b > 2 also based filtered by a > 1, but we don't update the in_progress_array_a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent question
What I was thinking is that CachedPredicateResult
would manage the "currently" applied predicate
So in the case where there are multiple predicates, I was thinking of a method like CachedPredicateResult::merge
method which could take the result of filtering a
and apply the result of filtering by b
We can then put heuristics / logic for if/when we materialize the filters into CachedPredicateResult
But that is sort of speculation at this point -- I don't have it all worked out yet
My plan is to get far enough to show this structure works and can improve performance, and then I'll work on the trickier logic of applying multiple filters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CachedPredicateResult::merge method which could take the result of filtering a and apply the result of filtering by b
Great idea!
But that is sort of speculation at this point -- I don't have it all worked out yet
Sure, i will continue to review, thank you @alamb !
9e91e9f
to
147c7a7
Compare
I tested this branch using a query that filters and selects the same column (NOTE it is critical to NOT use cargo bench --features="arrow async" --bench arrow_reader_clickbench -- Q24 Here are the benchmark results (30ms --> 22ms) (25 % faster)
I realize this branch currently uses more memory (to buffer the filter results), but I think the additional memory growth can be limited with a setting. |
147c7a7
to
f1f7103
Compare
Amazing result , i think it will be the perfect way instead of page cache, because page caching will have cache missing, but this PR will always cache the result! |
Thanks -- I think one potential problem is that the cached results may consume too much memory (but I will try and handle that shortly) I think we should proceed with starting to merge some refactorings; I left some suggestions here: |
It makes sense! Thank you @alamb. |
🤖 |
🤖: Benchmark completed Details
|
It seems regression for Q36/Q37. |
Yes, I agree -- I will figure out why |
f1f7103
to
a0e4b29
Compare
I did some profiling: samply record target/release/deps/arrow_reader_clickbench-aef15514767c9665 --bench arrow_reader_clickbench/sync/Q36 Basically, the issue is that calling I added some printlns and it seems like we have 181k rows in total that pass but the number of buffers is crazy (I think this is related to concat not compacting the ByteViewArray). Working on this...
|
🤖 |
🤖: Benchmark completed Details
|
Well, that is looking quite a bit better I am now working on a way to reduce buffering requirements (will require incremental concat'ing) |
4dd1d69
to
893819a
Compare
Amazing result @alamb , it looks pretty cool! |
ffb8f44
to
80b30ca
Compare
I looked at the profiling for using the incremental record batch builder in DataFusion -- 5% of the time is taken copying StringView data. Maybe we can improve that as well. |
let view_len = *v as u32; | ||
// if view_len is 12 or less, data is inlined and doesn't need an update | ||
// if view is 12 or more, need to update the buffer offset | ||
if view_len > 12 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the check can be omitted (and will be faster without it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Views with length less than 12 have the entire string inlined -- so if we update the buffer_index
"field" for such small views we may well be updating the inlined string bytes. So I do think we need the check here
} | ||
|
||
// Update any views that point to the old buffers | ||
for v in views.as_slice_mut()[starting_view..].iter_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be faster to extend
in one go, rather than first append_from_slice
and later iter_mut
(in views.append_slice(array.views())
)
// if view_len is 12 or less, data is inlined and doesn't need an update | ||
// if view is 12 or more, need to update the buffer offset | ||
if view_len > 12 { | ||
let mut view = ByteView::from(*v); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteView
can use std::mem::transmute
for from
/ as_u128
which I think would be faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I need to make some benchmarks so I can test these ideas more easily. Right now the cycle time is pretty long as I need to rebuild datafusion and then kick off a benchmark run.
} else { | ||
// otherwise the array is sparse so copy the data into a single new | ||
// buffer as well as updating the views | ||
let mut new_buffer: Vec<u8> = Vec::with_capacity(used_buffer_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this doesn't create too small buffer size after filtering?
Shouldn't we create a in progress buffer with a larger buffer size if used_buffer_size
is smaller than that (based on allocation strategy) and then use that one instead of creating small buffers and flushing them every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(That could be one benefit from concat / potentially buffering filter - we can know the exact capacity upfront).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are 100% right - the copying should be handled in a similar way to the rest of the builder: fill up any remaining allocation first and allocate new buffers following the normal allocation strategy. I will pursue that approach
let mut new_buffer: Vec<u8> = Vec::with_capacity(used_buffer_size); | ||
let new_buffer_index = buffers.len() as u32; // making one new buffer | ||
// Update any views that point to the old buffers. | ||
for v in views.as_slice_mut()[starting_view..].iter_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same - better to extend
I added some insights for the string view data when looking at the concat and string gc implementations. |
@@ -538,6 +538,18 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> { | |||
|
|||
l_full_data.cmp(r_full_data) | |||
} | |||
|
|||
/// return the total number of bytes required to hold all strings pointed to by views in this array | |||
pub fn minimum_buffer_size(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably compiles to the same, but I think using sum
is slightly more idiomatic.
self.views.iter()
.map(|v| {
let len = (*v as u32) as usize;
if len > 12 {
len
} else {
0
}
})
.sum()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I used sum before and it showed up in a profile -- I was trying to see if I could get the code faster but I didn't do it scientifically
/// | ||
/// This method optimizes for the case where the filter selects all or no rows | ||
/// and ensures all output arrays in `current` is at most `batch_size` rows long. | ||
pub fn append_filtered( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for any binary data it makes sense to buffer the BooleanArray
up until batch_size row limit (or number of array limit if it gets too large) so the final capacity of data buffer can be determined in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge with this strategy is that it may hold on to many many buffers (for example, if the array only holds on to 1 row from each batch, each 8MB or whatever buffer is mostly unused.
I will make a benchmark to explore the options more fully
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah true I thought about this as well (right now this is the case with concat after filtering views as well, but I think we can do better with e.g. setting a memory budget)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I started writing a benchmark and then realized I had nothing to benchmark. So I ported a version of code from DataFusion here:
I plan to make a benchmark tomorrow and hopefully start playing around with the optimizations
// Flush the in-progress buffer | ||
unsafe { | ||
// All views were coped from `array` | ||
self.finalize_copied_views(starting_view, array); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to delay this until finishing the buffer (so it can allocate larger buffer and go through the views in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but as mentioned above that will hold on to quite a bit more memory for selective filters. I have some other ideas of how to improve it
/// Append all views from the given array into the inprogress builder | ||
/// | ||
/// Will copy the underlying views based on the value of target_buffer_load_factor | ||
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be used in concat
as well (a similar implementation is almost 1.5x as fast on string views)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will file a ticket for this idea which we can pursue separately
…lected b…atches: (#7597) # Which issue does this PR close? - Part of #6692 - Part of #7589 # Rationale for this change The pattern of combining multiple small RecordBatches to form one larger one for subsequent processing is common in query engines like DataFusion which filter or partition incoming Arrays. Current best practice is to use the `filter` or `take` kernels and then `concat` kernels as explained in - #6692 This pattern also appears in my attempt to improve parquet filter performance (to cache the result of applying a filter rather than re-decoding the results). See - apache/datafusion#3463 - #7513 The current pattern is non optimal as it requires: 1. At least 2x peak memory (holding the input and output of `concat`) 2. 2 copies of the data (to create the output of `filter` and then create the output of `concat`) The theory is that with sufficient optimization we can reduce the peak memory requirements and (possibly) make it faster as well. However, to add a benchmark for this filter+concat, I basically had nothing to benchmark. Specifically, there needed to be an API to call. - Note I also made a PR to DataFusion showing this API can be used and it is not slower: apache/datafusion#16249 # What changes are included in this PR? I ported the code from DataFusion downstream upstream into arrow-rs so 1. We can use it in the parquet reader 2. We can benchmark and optimize it appropriately 1. Add `BatchCoalescer` to `arrow-select`, and tests 2. Update documentation 2. Add examples 3. Add a `pub` export in `arrow` 4. Add Benchmark # Are there any user-facing changes? This is a new API. I next plan to make an benchmark for this particular
In case anyone is following along, we added the coalesce kernel and have started optimizing it |
|
Draft until:
Which issue does this PR close?
ReadPlan
to encapsulate the calculation of what parquet rows to decode #7502Rationale for this change
I am trying to sketch out enough of a cached filter result API to show performance improvements. Once I have done that, I will start proposing how to break it up into smaller PRs
What changes are included in this PR?
Are there any user-facing changes?