Skip to content

Commit 58e2c1c

Browse files
authored
Add splice column API (apache#4155) (apache#4269)
* Add splice column API (apache#4155) * Review feedback * Re-encode offset index
1 parent 3adca53 commit 58e2c1c

File tree

2 files changed

+201
-36
lines changed

2 files changed

+201
-36
lines changed

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ impl<'a> ByteArrayWriter<'a> {
104104
/// Returns a new [`ByteArrayWriter`]
105105
pub fn new(
106106
descr: ColumnDescPtr,
107-
props: &'a WriterPropertiesPtr,
107+
props: WriterPropertiesPtr,
108108
page_writer: Box<dyn PageWriter + 'a>,
109109
on_close: OnCloseColumnChunk<'a>,
110110
) -> Result<Self> {
111111
Ok(Self {
112-
writer: GenericColumnWriter::new(descr, props.clone(), page_writer),
112+
writer: GenericColumnWriter::new(descr, props, page_writer),
113113
on_close: Some(on_close),
114114
})
115115
}

parquet/src/file/writer.rs

Lines changed: 199 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
use crate::bloom_filter::Sbbf;
2222
use crate::format as parquet;
2323
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
24-
use std::io::{BufWriter, IoSlice};
24+
use std::io::{BufWriter, IoSlice, Read};
2525
use std::{io::Write, sync::Arc};
2626
use thrift::protocol::{TCompactOutputProtocol, TSerializable};
2727

@@ -35,6 +35,7 @@ use crate::column::{
3535
};
3636
use crate::data_type::DataType;
3737
use crate::errors::{ParquetError, Result};
38+
use crate::file::reader::ChunkReader;
3839
use crate::file::{
3940
metadata::*, properties::WriterPropertiesPtr,
4041
statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC,
@@ -423,27 +424,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
423424
}
424425
}
425426

426-
/// Returns the next column writer, if available, using the factory function;
427-
/// otherwise returns `None`.
428-
pub(crate) fn next_column_with_factory<'b, F, C>(
429-
&'b mut self,
430-
factory: F,
431-
) -> Result<Option<C>>
432-
where
433-
F: FnOnce(
434-
ColumnDescPtr,
435-
&'b WriterPropertiesPtr,
436-
Box<dyn PageWriter + 'b>,
437-
OnCloseColumnChunk<'b>,
438-
) -> Result<C>,
439-
{
440-
self.assert_previous_writer_closed()?;
441-
442-
if self.column_index >= self.descr.num_columns() {
443-
return Ok(None);
444-
}
445-
let page_writer = Box::new(SerializedPageWriter::new(self.buf));
427+
/// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
428+
fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
429+
let ret = self.descr.columns().get(self.column_index)?.clone();
430+
self.column_index += 1;
431+
Some(ret)
432+
}
446433

434+
/// Returns [`OnCloseColumnChunk`] for the next writer
435+
fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
447436
let total_bytes_written = &mut self.total_bytes_written;
448437
let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
449438
let total_rows_written = &mut self.total_rows_written;
@@ -475,28 +464,115 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
475464

476465
Ok(())
477466
};
467+
(self.buf, Box::new(on_close))
468+
}
478469

479-
let column = self.descr.column(self.column_index);
480-
self.column_index += 1;
481-
482-
Ok(Some(factory(
483-
column,
484-
&self.props,
485-
page_writer,
486-
Box::new(on_close),
487-
)?))
470+
/// Returns the next column writer, if available, using the factory function;
471+
/// otherwise returns `None`.
472+
pub(crate) fn next_column_with_factory<'b, F, C>(
473+
&'b mut self,
474+
factory: F,
475+
) -> Result<Option<C>>
476+
where
477+
F: FnOnce(
478+
ColumnDescPtr,
479+
WriterPropertiesPtr,
480+
Box<dyn PageWriter + 'b>,
481+
OnCloseColumnChunk<'b>,
482+
) -> Result<C>,
483+
{
484+
self.assert_previous_writer_closed()?;
485+
Ok(match self.next_column_desc() {
486+
Some(column) => {
487+
let props = self.props.clone();
488+
let (buf, on_close) = self.get_on_close();
489+
let page_writer = Box::new(SerializedPageWriter::new(buf));
490+
Some(factory(column, props, page_writer, Box::new(on_close))?)
491+
}
492+
None => None,
493+
})
488494
}
489495

490496
/// Returns the next column writer, if available; otherwise returns `None`.
491497
/// In case of any IO error or Thrift error, or if row group writer has already been
492498
/// closed returns `Err`.
493499
pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
494500
self.next_column_with_factory(|descr, props, page_writer, on_close| {
495-
let column_writer = get_column_writer(descr, props.clone(), page_writer);
501+
let column_writer = get_column_writer(descr, props, page_writer);
496502
Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
497503
})
498504
}
499505

506+
/// Append an encoded column chunk from another source without decoding it
507+
///
508+
/// This can be used for efficiently concatenating or projecting parquet data,
509+
/// or encoding parquet data to temporary in-memory buffers
510+
///
511+
/// See [`Self::next_column`] for writing data that isn't already encoded
512+
pub fn append_column<R: ChunkReader>(
513+
&mut self,
514+
reader: &R,
515+
mut close: ColumnCloseResult,
516+
) -> Result<()> {
517+
self.assert_previous_writer_closed()?;
518+
let desc = self.next_column_desc().ok_or_else(|| {
519+
general_err!("exhausted columns in SerializedRowGroupWriter")
520+
})?;
521+
522+
let metadata = close.metadata;
523+
524+
if metadata.column_descr() != desc.as_ref() {
525+
return Err(general_err!(
526+
"column descriptor mismatch, expected {:?} got {:?}",
527+
desc,
528+
metadata.column_descr()
529+
));
530+
}
531+
532+
let src_dictionary_offset = metadata.dictionary_page_offset();
533+
let src_data_offset = metadata.data_page_offset();
534+
let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
535+
let src_length = metadata.compressed_size();
536+
537+
let write_offset = self.buf.bytes_written();
538+
let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
539+
let write_length = std::io::copy(&mut read, &mut self.buf)?;
540+
541+
if src_length as u64 != write_length {
542+
return Err(general_err!(
543+
"Failed to splice column data, expected {read_length} got {write_length}"
544+
));
545+
}
546+
547+
let file_offset = self.buf.bytes_written() as i64;
548+
549+
let map_offset = |x| x - src_offset + write_offset as i64;
550+
let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
551+
.set_compression(metadata.compression())
552+
.set_encodings(metadata.encodings().clone())
553+
.set_file_offset(file_offset)
554+
.set_total_compressed_size(metadata.compressed_size())
555+
.set_total_uncompressed_size(metadata.uncompressed_size())
556+
.set_num_values(metadata.num_values())
557+
.set_data_page_offset(map_offset(src_data_offset))
558+
.set_dictionary_page_offset(src_dictionary_offset.map(map_offset));
559+
560+
if let Some(statistics) = metadata.statistics() {
561+
builder = builder.set_statistics(statistics.clone())
562+
}
563+
close.metadata = builder.build()?;
564+
565+
if let Some(offsets) = close.offset_index.as_mut() {
566+
for location in &mut offsets.page_locations {
567+
location.offset = map_offset(location.offset)
568+
}
569+
}
570+
571+
SerializedPageWriter::new(self.buf).write_metadata(&metadata)?;
572+
let (_, on_close) = self.get_on_close();
573+
on_close(close)
574+
}
575+
500576
/// Closes this row group writer and returns row group metadata.
501577
pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
502578
if self.row_group_metadata.is_none() {
@@ -516,9 +592,9 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
516592
if let Some(on_close) = self.on_close.take() {
517593
on_close(
518594
metadata,
519-
self.bloom_filters.clone(),
520-
self.column_indexes.clone(),
521-
self.offset_indexes.clone(),
595+
self.bloom_filters,
596+
self.column_indexes,
597+
self.offset_indexes,
522598
)?
523599
}
524600
}
@@ -720,9 +796,11 @@ mod tests {
720796

721797
use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
722798
use crate::column::page::PageReader;
799+
use crate::column::reader::get_typed_column_reader;
723800
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
724801
use crate::data_type::{BoolType, Int32Type};
725802
use crate::file::reader::ChunkReader;
803+
use crate::file::serialized_reader::ReadOptionsBuilder;
726804
use crate::file::{
727805
properties::{ReaderProperties, WriterProperties, WriterVersion},
728806
reader::{FileReader, SerializedFileReader, SerializedPageReader},
@@ -1540,4 +1618,91 @@ mod tests {
15401618
assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
15411619
assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
15421620
}
1621+
1622+
#[test]
1623+
fn test_spliced_write() {
1624+
let message_type = "
1625+
message test_schema {
1626+
REQUIRED INT32 i32 (INTEGER(32,true));
1627+
REQUIRED INT32 u32 (INTEGER(32,false));
1628+
}
1629+
";
1630+
let schema = Arc::new(parse_message_type(message_type).unwrap());
1631+
let props = Arc::new(WriterProperties::builder().build());
1632+
1633+
let mut file = Vec::with_capacity(1024);
1634+
let mut file_writer =
1635+
SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1636+
1637+
let columns = file_writer.descr.columns();
1638+
let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1639+
.iter()
1640+
.map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1641+
.collect();
1642+
1643+
let mut column_state_slice = column_state.as_mut_slice();
1644+
let mut column_writers = Vec::with_capacity(columns.len());
1645+
for c in columns {
1646+
let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1647+
column_state_slice = tail;
1648+
1649+
let page_writer = Box::new(SerializedPageWriter::new(buf));
1650+
let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1651+
column_writers.push(SerializedColumnWriter::new(
1652+
col_writer,
1653+
Some(Box::new(|on_close| {
1654+
*out = Some(on_close);
1655+
Ok(())
1656+
})),
1657+
));
1658+
}
1659+
1660+
let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1661+
1662+
// Interleaved writing to the column writers
1663+
for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1664+
let writer = writer.typed::<Int32Type>();
1665+
writer.write_batch(&batch, None, None).unwrap();
1666+
}
1667+
1668+
// Close the column writers
1669+
for writer in column_writers {
1670+
writer.close().unwrap()
1671+
}
1672+
1673+
// Splice column data into a row group
1674+
let mut row_group_writer = file_writer.next_row_group().unwrap();
1675+
for (write, close) in column_state {
1676+
let buf = Bytes::from(write.into_inner().unwrap());
1677+
row_group_writer
1678+
.append_column(&buf, close.unwrap())
1679+
.unwrap();
1680+
}
1681+
row_group_writer.close().unwrap();
1682+
file_writer.close().unwrap();
1683+
1684+
// Check data was written correctly
1685+
let file = Bytes::from(file);
1686+
let test_read = |reader: SerializedFileReader<Bytes>| {
1687+
let row_group = reader.get_row_group(0).unwrap();
1688+
1689+
let mut out = [0; 4];
1690+
let c1 = row_group.get_column_reader(0).unwrap();
1691+
let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1692+
c1.read_batch(4, None, None, &mut out).unwrap();
1693+
assert_eq!(out, column_data[0]);
1694+
1695+
let c2 = row_group.get_column_reader(1).unwrap();
1696+
let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1697+
c2.read_batch(4, None, None, &mut out).unwrap();
1698+
assert_eq!(out, column_data[1]);
1699+
};
1700+
1701+
let reader = SerializedFileReader::new(file.clone()).unwrap();
1702+
test_read(reader);
1703+
1704+
let options = ReadOptionsBuilder::new().with_page_index().build();
1705+
let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1706+
test_read(reader);
1707+
}
15431708
}

0 commit comments

Comments
 (0)