Skip to content

Add coalesce kernel andBatchCoalescer for statefully combining selected b…atches: #7597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 5, 2025

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jun 3, 2025

Which issue does this PR close?

Rationale for this change

The pattern of combining multiple small RecordBatches to form one larger one for
subsequent processing is common in query engines like DataFusion which filter or
partition incoming Arrays. Current best practice is to use the filter or take kernels and then
concat kernels as explained in

This pattern also appears in my attempt to improve parquet filter performance (to cache the result
of applying a filter rather than re-decoding the results). See

The current pattern is non optimal as it requires:

  1. At least 2x peak memory (holding the input and output of concat)
  2. 2 copies of the data (to create the output of filter and then create the output of concat)

The theory is that with sufficient optimization we can reduce the peak memory
requirements and (possibly) make it faster as well.

However, to add a benchmark for this filter+concat, I basically had nothing to
benchmark. Specifically, there needed to be an API to call.

What changes are included in this PR?

I ported the code from DataFusion downstream upstream into arrow-rs so

  1. We can use it in the parquet reader

  2. We can benchmark and optimize it appropriately

  3. Add BatchCoalescer to arrow-select, and tests

  4. Update documentation

  5. Add examples

  6. Add a pub export in arrow

  7. Add Benchmark

Are there any user-facing changes?

This is a new API.

I next plan to make an benchmark for this particular

@github-actions github-actions bot added the arrow Changes to the arrow crate label Jun 3, 2025
/// assert!(coalescer.next_batch().is_none());
/// ```
///
/// # Background
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major differences between this and what is in DataFusion:

  1. does not implement limit which I think is more appropriate to leave to a higher level structure
  2. Outputs exactly the target batch size -- which will (hopefully) result in the ability to avoid any additional allocations

@alamb alamb changed the title Add coalesce / BatchCoalescer for statefully combining selected b…atches: Add coalesce kernel andBatchCoalescer for statefully combining selected b…atches: Jun 3, 2025
@alamb alamb force-pushed the alamb/coalesce branch from 6a4df48 to ac051b7 Compare June 3, 2025 21:08
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This

Suggested change
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {

This can avoid some allocations / Arc clones in the implementation.

let Some(s) = c.as_string_view_opt() else {
return Arc::clone(c);
};
let ideal_buffer_size: usize = s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have another fast path here before looking into views: if the data buffer is small compared to view, gc doesn't have much impact.

if actual_buffer_size > (ideal_buffer_size * 2) {
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder = StringViewBuilder::with_capacity(s.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing the views is likely quite a bit faster (didn't test the transmute, I think that should go to from / as_u128 if it shows to be faster in benchmarks):

Suggested change
let mut builder = StringViewBuilder::with_capacity(s.len());
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
let views: Vec<u128> = s.views().as_ref().iter().cloned().map(|v| {
// SAFETY: ByteView has same memory layout as u128
let mut b: ByteView = unsafe { std::mem::transmute(v) };
if b.length > 12 {
let offset = buffer.len() as u32;
buffer.extend_from_slice(
buffers[b.buffer_index as usize]
.get(b.offset as usize..b.offset as usize + b.length as usize)
.expect("Invalid buffer slice"),
);
b.offset = offset;
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
}
unsafe { std::mem::transmute(b) }
}).collect();```

return Ok(());
}

let mut batch = gc_string_view_batch(&batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we would support gc'ing multiple batches together which is faster / concat is faster (also here the risk for buffering too many small batches).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed -- this is a great idea. I hope to have some benchmarks written up today that we can then start optimizing this kernel substantially

///
/// See [`Self::next_batch()`] to retrieve any completed batches.
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
if batch.num_rows() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another fast path could be pushing the batch as is if the buffered batches is empty and added batch bigger than a certain limit (e.g. batch_size / 2). This avoids concatenating already large enough batches.

alamb added a commit to alamb/datafusion that referenced this pull request Jun 4, 2025
@alamb alamb force-pushed the alamb/coalesce branch from 80c49c7 to d75a71b Compare June 4, 2025 20:05
@alamb alamb marked this pull request as ready for review June 4, 2025 20:06
@alamb
Copy link
Contributor Author

alamb commented Jun 4, 2025

Ok, I think this PR is ready for a real review and hopefully merge.

My proposed next steps are:

  1. Merge this PR
  2. Iterate in subsequent PRs to improve the kernel using the benchmarks
  3. Eventually work up to adding a special push_filtered method for skipping the filter step

It pains me to leave so much potential performance (I really want to try several of the ones that @Dandandan has listed), but I think we can do them as follow on PRs because:

  1. This API should remain stable
  2. The kernel is basically "state of the art" (aka I copied it from DataFusion) in terms of performance.

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks nice to start iterating on.

@alamb
Copy link
Contributor Author

alamb commented Jun 5, 2025

Thank you for the review @Dandandan

I'll plan to merge this tomorrow so we can begin iterating.

cc @zhuqi-lucas and @tustvold

Copy link
Contributor

@zhuqi-lucas zhuqi-lucas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thank you @alamb , let's go!

///
/// # Heuristic
///
/// If the average size of each view is larger than 32 bytes, we compact the array.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor question, do we have some benchmark result for the:

average size of each view is larger than 32 bytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current heuristic is a bit different? The total buffer size is 2x the size of the non-inlined (>12) view lengths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah @Dandandan , it seems no 32 bytes in code implement now.


// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse
if actual_buffer_size > (ideal_buffer_size * 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we change to only one buffer, may be we can investigate emit StringArray for those cases which are natural only one buffer, and to compare the performance.

I remember some compare OP, StringArray has better performance especially for larger buffer size for StringViewArray.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think StringArray can be faster for some cases (where all the strings are used and are longer than 12 bytes, for example) so this is an interesting idea.

the challenge is that the kernels typically have known output type -- the output types only depend on the input type, they don't vary based on input VALUE.

It would be pretty hard to use a kernel that sometimes returns a StringViewArray and sometimes returns a StringArray given the current code structure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb , i agree, it's hard for us to forward this investigation.

@Dandandan Dandandan merged commit f92ff18 into apache:main Jun 5, 2025
30 checks passed
@Dandandan
Copy link
Contributor

Thanks @alamb let's try some suggestions.

@alamb
Copy link
Contributor Author

alamb commented Jun 6, 2025

Thanks @alamb let's try some suggestions.

@Dandandan made a great PR here:

Dandandan pushed a commit that referenced this pull request Jun 6, 2025
…ws (#7619)

# Which issue does this PR close?

- Follow on to #7597

# Rationale for this change


While reviewing the code and the concat kernel for
- #7617

I realized there is a non trivial difference when there all inlined
views vs some inlined views vs mostly large strings so the benchmarks
should capture that


# What changes are included in this PR?

1. Add variations of benchmark with different size strings in
StringViewArray

# Are there any user-facing changes?

If there are user-facing changes then we may require documentation to be
updated before approving the PR.

If there are any breaking changes to public APIs, please call them out.
Dandandan added a commit that referenced this pull request Jun 7, 2025
# Which issue does this PR close?
- Closes #7615
- Follow on to #7597



# Rationale for this change

Improve performance of `gc_string_view_batch`

```
filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.001       1.00     30.4±1.05ms        ? ?/sec    1.29     39.3±0.88ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.01        1.00      4.3±0.17ms        ? ?/sec    1.20      5.2±0.15ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.1         1.00  1805.1±25.77µs        ? ?/sec    1.32      2.4±0.20ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.8         1.00      2.6±0.12ms        ? ?/sec    1.48      3.8±0.11ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.001     1.00     42.5±0.48ms        ? ?/sec    1.23     52.2±1.33ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.01      1.00      5.8±0.12ms        ? ?/sec    1.28      7.4±0.20ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.1       1.00      2.2±0.02ms        ? ?/sec    1.37      3.1±0.18ms        ? ?/sec
filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.8       1.00      3.6±0.15ms        ? ?/sec    1.43      5.1±0.12ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0, selectivity: 0.001      1.00     51.0±0.59ms        ? ?/sec    1.38     70.3±1.11ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0, selectivity: 0.01       1.00      6.7±0.03ms        ? ?/sec    1.32      8.8±0.16ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0, selectivity: 0.1        1.00      3.0±0.01ms        ? ?/sec    1.41      4.3±0.09ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0, selectivity: 0.8        1.00      4.5±0.34ms        ? ?/sec    1.71      7.7±0.28ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.001    1.00     64.2±0.74ms        ? ?/sec    1.33     85.1±1.52ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.01     1.00      9.4±0.09ms        ? ?/sec    1.35     12.6±0.26ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.1      1.00      3.8±0.03ms        ? ?/sec    1.46      5.6±0.11ms        ? ?/sec
filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.8      1.00      5.7±0.28ms        ? ?/sec    1.73      9.9±0.27ms        ? ?/sec
```

# What changes are included in this PR?

* Avoiding recreating the views from scratch.
* Specialize concat for view types
* Takes owned RecordBatch (effect on performance is small, might be
measurable with smaller batch size / more columns).

# Are there any user-facing changes?

no

---------

Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants