Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 121 additions & 34 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,17 +706,21 @@ struct ArrowColumnChunkData {
length: usize,
store: Box<dyn PageStore>,
keys: Vec<PageKey>,
/// The dictionary page's serialized blobs (header ‖ data), held in memory
/// rather than the store.
/// Handles to the dictionary page's blobs (header then data) in the store.
///
/// A dictionary page is produced at most once and bounded by
/// `dict_page_size_limit`, but it must be written *first* in the chunk even
/// though the data pages reach the writer before it (see
/// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only
/// round-trip ~1 page to the backend and straight back, so it is kept here
/// and emitted ahead of the data pages at splice. Empty for non-dictionary
/// columns.
dictionary: Vec<Bytes>,
/// [`PageWriter::defers_dictionary_ordering`]). Its header and data are `put`
/// into the store like any other page — which keeps the store uniform, and
/// lets an oversized dictionary page spill — and their handles are held apart
/// so they can be emitted ahead of the data pages at splice.
/// Empty for non-dictionary columns.
dictionary_keys: Vec<PageKey>,
/// Serialized length of the dictionary page (0 if there is none), recorded
/// so the data pages can be shifted past it when offsets are rewritten to a
/// dictionary-first layout at splice.
dictionary_len: usize,
}

impl ArrowColumnChunkData {
Expand All @@ -725,7 +729,8 @@ impl ArrowColumnChunkData {
length: 0,
store,
keys: Vec::new(),
dictionary: Vec::new(),
dictionary_keys: Vec::new(),
dictionary_len: 0,
}
}

Expand All @@ -737,47 +742,55 @@ impl ArrowColumnChunkData {
Ok(())
}

/// Retain a dictionary-page blob in memory (emitted first at splice).
fn push_dictionary(&mut self, value: Bytes) {
self.dictionary.push(value);
}

/// Total serialized size of the in-memory dictionary page, in bytes.
fn dictionary_len(&self) -> usize {
self.dictionary.iter().map(Bytes::len).sum()
/// Store a dictionary-page blob (header or data) in the page store,
/// recording its handle (emitted first at splice) and accumulating its
/// serialized length.
fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
self.dictionary_len += value.len();
let key = self.store.put(value)?;
self.dictionary_keys.push(key);
Ok(())
}

/// Bytes this chunk currently holds on the heap: whatever the store keeps
/// resident (zero for a spilling backend) plus the in-memory dictionary
/// page.
/// resident (zero for a spilling backend).
fn memory_size(&self) -> usize {
self.store.memory_size() + self.dictionary_len()
self.store.memory_size()
}
}

/// A streaming [`Read`] over one column chunk's buffered pages, in final file
/// order: the in-memory dictionary page (if any) first, then the data pages.
/// order: the dictionary page (if any) first, then the data pages.
///
/// Each data-page blob is taken back out of the [`PageStore`] *as it is
/// Each blob is taken back out of the [`PageStore`] *as it is
/// consumed* and released immediately afterwards, so splicing a chunk into the
/// output file never materializes more than a single page in memory at a time.
/// This is what keeps the splice phase within the memory bound for a spilling
/// backend (an in-memory store already holds the bytes, so it is unaffected).
struct StreamingColumnChunkReader {
/// Dictionary-page blobs, emitted before any data page.
dictionary: IntoIter<Bytes>,
store: Box<dyn PageStore>,
/// Page handles in final file order: the dictionary page first (if any),
/// then the data pages.
keys: IntoIter<PageKey>,
/// The blob currently being drained into the output; emptied as it is read.
current: Bytes,
}

impl StreamingColumnChunkReader {
fn new(data: ArrowColumnChunkData) -> Self {
// The dictionary page must be emitted first, ahead of the data pages,
// even though it was the last page produced.
let keys = if data.dictionary_keys.is_empty() {
data.keys
} else {
let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
keys.extend(data.dictionary_keys);
keys.extend(data.keys);
keys
};
Self {
dictionary: data.dictionary.into_iter(),
store: data.store,
keys: data.keys.into_iter(),
keys: keys.into_iter(),
current: Bytes::new(),
}
}
Expand All @@ -786,11 +799,9 @@ impl StreamingColumnChunkReader {
impl Read for StreamingColumnChunkReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
// Refill from the next blob whenever the current one is drained: the
// dictionary page first, then each data page from the store.
// dictionary page first, then each data page, all taken from the store.
while self.current.is_empty() {
if let Some(blob) = self.dictionary.next() {
self.current = blob;
} else if let Some(key) = self.keys.next() {
if let Some(key) = self.keys.next() {
self.current = self.store.take(key).map_err(std::io::Error::other)?;
} else {
return Ok(0);
Expand Down Expand Up @@ -885,10 +896,10 @@ impl PageWriter for ArrowPageWriter {

buf.length += compressed_size;
if spec.page_type == PageType::DICTIONARY_PAGE {
// Held in memory and emitted first at splice — see
// `ArrowColumnChunkData::dictionary`.
buf.push_dictionary(header);
buf.push_dictionary(data);
// Recorded apart from the data pages so it is emitted first at
// splice — see `ArrowColumnChunkData::dictionary_keys`.
buf.push_dictionary(header)?;
buf.push_dictionary(data)?;
} else {
buf.push(header)?;
buf.push(data)?;
Expand Down Expand Up @@ -975,7 +986,7 @@ impl ArrowColumnChunk {
// The dictionary page is produced *after* the data pages on this path (so
// they can stream straight through) but must be written *first*, so move
// it ahead of the data pages in the recorded offsets before the splice.
let close = close.update_dictionary_location(data.dictionary_len())?;
let close = close.update_dictionary_location(data.dictionary_len)?;

let reader = StreamingColumnChunkReader::new(data);
writer.append_column_from_read(reader, close)
Expand Down Expand Up @@ -2094,6 +2105,82 @@ mod tests {
assert_eq!(read_values, values);
}

/// The dictionary page is routed through the [`PageStore`] like any other
/// page rather than held resident in memory, so a dictionary column chunk's
/// *entire* serialized size — dictionary page included — passes through the
/// store.
#[test]
fn dictionary_page_is_routed_through_the_store() {
/// A store that sums the bytes handed to `put`.
#[derive(Debug, Default)]
struct SizeRecordingPageStore {
blobs: Vec<Bytes>,
bytes_put: Arc<std::sync::atomic::AtomicUsize>,
}
impl PageStore for SizeRecordingPageStore {
fn put(&mut self, value: Bytes) -> Result<PageKey> {
self.bytes_put
.fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
let key = PageKey::new(self.blobs.len() as u64);
self.blobs.push(value);
Ok(key)
}
fn take(&mut self, key: PageKey) -> Result<Bytes> {
Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
}
}
#[derive(Debug)]
struct Factory {
bytes_put: Arc<std::sync::atomic::AtomicUsize>,
}
impl PageStoreFactory for Factory {
fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
Ok(Box::new(SizeRecordingPageStore {
bytes_put: self.bytes_put.clone(),
..Default::default()
}))
}
}

let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
// Low cardinality keeps the column dictionary-encoded with a real,
// non-empty dictionary page.
let values: Vec<&str> = (0..2048)
.map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
.collect();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
.unwrap();

let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
bytes_put: bytes_put.clone(),
}));

// A single batch / single column means exactly one row group and one
// store instance, so the bytes it saw map to one column chunk.
let mut buffer = Vec::new();
let mut writer =
ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let column = reader.metadata().row_group(0).column(0);
assert!(
column.dictionary_page_offset().is_some(),
"expected the column to be dictionary-encoded"
);

// The bytes the store was handed must account for the whole chunk,
// dictionary page included. Holding the dictionary page apart from the
// store would make this fall short by the dictionary page's size.
assert_eq!(
bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
column.compressed_size(),
"the dictionary page must pass through the store like any other page"
);
}

#[test]
fn arrow_writer() {
// define schema
Expand Down
Loading