Skip to content

Commit cb8d4c0

Browse files
authored
fix: correct accounting in DictEncoder::estimated_memory_size, Interner::estimated_memory_size (#9720)
# Which issue does this PR close? - Closes #9719, #9744. # Rationale for this change The returned value should estimate the actual memory usage, but instead it uses the evaluation of the encoded size of the dictionary data, and bypasses the hash table memory usage added by the `Interner` member. The implementation of `Storage::estimated_memory_size` implementation for the unique key storage was not correct as well, but it was unused. # What changes are included in this PR? Correct both problems by making the `KeyStorage`'s implementation of `estimated_memory_size` return the size of the allocated `uniques` vector added with the values' sizes if applicable, and make `DictEncoder::estimated_memory_size` delegate to the `interner`, which calls the method of `KeyStorage` and adds accounting for its own data structure. # Are these changes tested? Added tests verifying that at least the expected added amounts are accounted for when values are added. Overreporting is hard to disprove due to dependency on allocation behavior internal to other libraries. # Are there any user-facing changes? No.
1 parent 4fa8d2f commit cb8d4c0

2 files changed

Lines changed: 283 additions & 5 deletions

File tree

parquet/src/encodings/encoding/dict_encoder.rs

Lines changed: 282 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ impl<T: DataType> Storage for KeyStorage<T> {
6464
}
6565

6666
fn estimated_memory_size(&self) -> usize {
67-
self.size_in_bytes + self.uniques.capacity() * std::mem::size_of::<T::T>()
67+
let uniques_heap_bytes = match T::get_physical_type() {
68+
Type::FIXED_LEN_BYTE_ARRAY => self.type_length * self.uniques.len(),
69+
_ => <Self::Value as ParquetValueType>::variable_length_bytes(&self.uniques)
70+
.unwrap_or(0) as usize,
71+
};
72+
self.uniques.capacity() * std::mem::size_of::<T::T>() + uniques_heap_bytes
6873
}
6974
}
7075

@@ -183,6 +188,281 @@ impl<T: DataType> Encoder<T> for DictEncoder<T> {
183188
///
184189
/// For this encoder, the indices are unencoded bytes (refer to [`Self::write_indices`]).
185190
fn estimated_memory_size(&self) -> usize {
186-
self.interner.storage().size_in_bytes + self.indices.len() * std::mem::size_of::<usize>()
191+
self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
192+
}
193+
}
194+
195+
#[cfg(test)]
196+
mod tests {
197+
use std::sync::Arc;
198+
199+
use super::*;
200+
use crate::data_type::{
201+
ByteArray, ByteArrayType, FixedLenByteArray, FixedLenByteArrayType, Int32Type,
202+
};
203+
use crate::encodings::encoding::Encoder;
204+
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
205+
206+
fn make_col_desc<T: DataType>() -> ColumnDescPtr {
207+
make_col_desc_with_length::<T>(-1)
208+
}
209+
210+
fn make_col_desc_with_length<T: DataType>(type_length: i32) -> ColumnDescPtr {
211+
let ty = SchemaType::primitive_type_builder("col", T::get_physical_type())
212+
.with_length(type_length)
213+
.build()
214+
.unwrap();
215+
Arc::new(ColumnDescriptor::new(
216+
Arc::new(ty),
217+
0,
218+
0,
219+
ColumnPath::new(vec![]),
220+
))
221+
}
222+
223+
#[test]
224+
fn test_estimated_memory_size_primitive_with_duplicates() {
225+
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
226+
let empty_size = encoder.estimated_memory_size();
227+
228+
// 3 distinct values, repeated to produce 9 indices total.
229+
encoder.put(&[1, 2, 3, 1, 2, 3, 1, 2, 3]).unwrap();
230+
231+
let size = encoder.estimated_memory_size();
232+
233+
// Must account for the 3 unique dictionary entries.
234+
let dict_entry_size = 3 * std::mem::size_of::<i32>();
235+
assert!(
236+
size >= empty_size + dict_entry_size,
237+
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
238+
);
239+
240+
// Must also account for the 9 buffered indices.
241+
let indices_size = 9 * std::mem::size_of::<u64>();
242+
assert!(
243+
size >= empty_size + dict_entry_size + indices_size,
244+
"memory size {size} should include indices ({indices_size} bytes)"
245+
);
246+
}
247+
248+
#[test]
249+
fn test_estimated_memory_size_primitive_all_distinct() {
250+
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
251+
let empty_size = encoder.estimated_memory_size();
252+
253+
let values: Vec<i32> = (0..100).collect();
254+
encoder.put(&values).unwrap();
255+
256+
let size = encoder.estimated_memory_size();
257+
258+
// Must account for the 100 unique dictionary entries.
259+
let dict_entry_size = 100 * std::mem::size_of::<i32>();
260+
assert!(
261+
size >= empty_size + dict_entry_size,
262+
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
263+
);
264+
265+
// Must also account for the 100 buffered indices.
266+
let indices_size = 100 * std::mem::size_of::<u64>();
267+
assert!(
268+
size >= empty_size + dict_entry_size + indices_size,
269+
"memory size {size} should include indices ({indices_size} bytes)"
270+
);
271+
}
272+
273+
#[test]
274+
fn test_estimated_memory_size_byte_array_with_duplicates() {
275+
let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
276+
let empty_size = encoder.estimated_memory_size();
277+
278+
// 3 distinct byte strings ("foo", "bar", "baz" — 3 bytes each), repeated to produce
279+
// 9 indices total.
280+
let vals: Vec<ByteArray> = [
281+
"foo", "bar", "baz", "foo", "bar", "baz", "foo", "bar", "baz",
282+
]
283+
.iter()
284+
.map(|s| ByteArray::from(*s))
285+
.collect();
286+
encoder.put(&vals).unwrap();
287+
288+
let size = encoder.estimated_memory_size();
289+
290+
// Must account for the 3 unique dictionary entries, including their heap-allocated bytes.
291+
let dict_entry_size = 3 * std::mem::size_of::<ByteArray>() + 3 * 3; // 3 values × 3 bytes each
292+
assert!(
293+
size >= empty_size + dict_entry_size,
294+
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
295+
);
296+
297+
// Must also account for the 9 buffered indices.
298+
let indices_size = 9 * std::mem::size_of::<u64>();
299+
assert!(
300+
size >= empty_size + dict_entry_size + indices_size,
301+
"memory size {size} should include indices ({indices_size} bytes)"
302+
);
303+
}
304+
305+
#[test]
306+
fn test_estimated_memory_size_byte_array_all_distinct() {
307+
let mut encoder = DictEncoder::<ByteArrayType>::new(make_col_desc::<ByteArrayType>());
308+
let empty_size = encoder.estimated_memory_size();
309+
310+
// 100 distinct values: "0".."9" (1 byte each) and "10".."99" (2 bytes each).
311+
let values: Vec<ByteArray> = (0..100_u32)
312+
.map(|i| ByteArray::from(i.to_string().into_bytes()))
313+
.collect();
314+
let bytes_total: usize = values.iter().map(|v| v.len()).sum(); // 10×1 + 90×2 = 190
315+
encoder.put(&values).unwrap();
316+
317+
let size = encoder.estimated_memory_size();
318+
319+
// Must account for the 100 unique dictionary entries, including their heap-allocated bytes.
320+
let dict_entry_size = 100 * std::mem::size_of::<ByteArray>() + bytes_total;
321+
assert!(
322+
size >= empty_size + dict_entry_size,
323+
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
324+
);
325+
326+
// Must also account for the 100 buffered indices.
327+
let indices_size = 100 * std::mem::size_of::<u64>();
328+
assert!(
329+
size >= empty_size + dict_entry_size + indices_size,
330+
"memory size {size} should include indices ({indices_size} bytes)"
331+
);
332+
}
333+
334+
#[test]
335+
fn test_estimated_memory_size_fixed_len_byte_array_with_duplicates() {
336+
const TYPE_LEN: usize = 3;
337+
let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
338+
FixedLenByteArrayType,
339+
>(TYPE_LEN as i32));
340+
let empty_size = encoder.estimated_memory_size();
341+
342+
// 3 distinct 3-byte values, repeated to produce 9 indices total.
343+
let vals = [
344+
b"foo", b"bar", b"baz", b"foo", b"bar", b"baz", b"foo", b"bar", b"baz",
345+
]
346+
.iter()
347+
.map(|b| FixedLenByteArray::from(b.to_vec()))
348+
.collect::<Vec<_>>();
349+
encoder.put(&vals).unwrap();
350+
351+
let size = encoder.estimated_memory_size();
352+
353+
// Must account for the 3 unique dictionary entries: struct overhead plus the
354+
// fixed-length bytes allocated per entry.
355+
let dict_entry_size = 3 * std::mem::size_of::<FixedLenByteArray>() + 3 * TYPE_LEN;
356+
assert!(
357+
size >= empty_size + dict_entry_size,
358+
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
359+
);
360+
361+
// Must also account for the 9 buffered indices.
362+
let indices_size = 9 * std::mem::size_of::<u64>();
363+
assert!(
364+
size >= empty_size + dict_entry_size + indices_size,
365+
"memory size {size} should include indices ({indices_size} bytes)"
366+
);
367+
}
368+
369+
#[test]
370+
fn test_estimated_memory_size_fixed_len_byte_array_all_distinct() {
371+
const TYPE_LEN: usize = 3;
372+
let mut encoder = DictEncoder::<FixedLenByteArrayType>::new(make_col_desc_with_length::<
373+
FixedLenByteArrayType,
374+
>(TYPE_LEN as i32));
375+
let empty_size = encoder.estimated_memory_size();
376+
377+
// 100 distinct 3-byte values: zero-padded big-endian u8 indices.
378+
let values = (0..100_u8)
379+
.map(|i| FixedLenByteArray::from(vec![0u8, 0u8, i]))
380+
.collect::<Vec<_>>();
381+
encoder.put(&values).unwrap();
382+
383+
let size = encoder.estimated_memory_size();
384+
385+
// Must account for the 100 unique dictionary entries: struct overhead plus the
386+
// fixed-length bytes allocated per entry.
387+
let dict_entry_size = 100 * std::mem::size_of::<FixedLenByteArray>() + 100 * TYPE_LEN;
388+
assert!(
389+
size >= empty_size + dict_entry_size,
390+
"memory size {size} should grow by at least the dict storage ({dict_entry_size} bytes)"
391+
);
392+
393+
// Must also account for the 100 buffered indices.
394+
let indices_size = 100 * std::mem::size_of::<u64>();
395+
assert!(
396+
size >= empty_size + dict_entry_size + indices_size,
397+
"memory size {size} should include indices ({indices_size} bytes)"
398+
);
399+
}
400+
401+
#[test]
402+
fn test_estimated_memory_size_includes_interner_dedup_table() {
403+
// The dedup `HashTable` in `Interner` is preallocated with
404+
// `DEFAULT_DEDUP_CAPACITY` slots at construction, independent of any
405+
// values pushed.
406+
let encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
407+
408+
let size = encoder.estimated_memory_size();
409+
410+
assert!(
411+
size > 0,
412+
"memory size should include the preallocated dedup hash table"
413+
);
414+
}
415+
416+
#[test]
417+
fn test_estimated_memory_size_accounts_for_indices_capacity() {
418+
// Exercises the `indices.capacity()` (not `.len()`) accounting.
419+
// After a flush, `indices` is cleared but its capacity is retained; pushing a
420+
// smaller batch afterwards leaves capacity strictly greater than length.
421+
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
422+
423+
let big: Vec<i32> = vec![0; 64];
424+
encoder.put(&big).unwrap();
425+
let _ = encoder.flush_buffer().unwrap();
426+
427+
let flushed_size = encoder.estimated_memory_size();
428+
429+
// Push a single value — indices.len() == 1 but indices.capacity() >= 64.
430+
// No change on the key storage since the value is already interned.
431+
encoder.put(&[0]).unwrap();
432+
433+
let size = encoder.estimated_memory_size();
434+
435+
assert_eq!(
436+
size, flushed_size,
437+
"memory size should include retained indices capacity",
438+
);
439+
}
440+
441+
#[test]
442+
fn test_estimated_memory_size_accounts_for_uniques_capacity() {
443+
let mut encoder = DictEncoder::<Int32Type>::new(make_col_desc::<Int32Type>());
444+
445+
let values: Vec<i32> = (0..64).collect();
446+
encoder.put(&values).unwrap();
447+
// Flush indices so they don't mask the uniques accounting in the lower bound.
448+
let _ = encoder.flush_buffer().unwrap();
449+
450+
let size1 = encoder.estimated_memory_size();
451+
452+
// Push more values to trigger uniques capacity growth.
453+
// The pre-allocated dedup hash table is unlikely to be resized.
454+
let values: Vec<i32> = (64..128).collect();
455+
encoder.put(&values).unwrap();
456+
// Flush indices so they don't mask the uniques accounting in the lower bound.
457+
let _ = encoder.flush_buffer().unwrap();
458+
459+
let size2 = encoder.estimated_memory_size();
460+
461+
let min_uniques_bytes = 64 * std::mem::size_of::<i32>();
462+
assert!(
463+
size2 >= size1 + min_uniques_bytes,
464+
"memory size {size2} should grow from {size1} by allocated uniques capacity \
465+
(at least {min_uniques_bytes} bytes)"
466+
);
187467
}
188468
}

parquet/src/util/interner.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ impl<S: Storage> Interner<S> {
7777
/// Return estimate of the memory used, in bytes
7878
#[allow(dead_code)] // not used in parquet_derive, so is dead there
7979
pub fn estimated_memory_size(&self) -> usize {
80-
self.storage.estimated_memory_size() +
81-
// estimate size of dedup hashmap as just th size of the keys
82-
self.dedup.capacity() + std::mem::size_of::<S::Key>()
80+
self.storage.estimated_memory_size() + self.dedup.allocation_size()
8381
}
8482

8583
/// Returns the storage for this interner

0 commit comments

Comments
 (0)