From 97418a761355dd7f93cc050c3e4d367e2c90ddfa Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 31 Jan 2025 14:31:13 -0800 Subject: [PATCH 1/8] allow for reading improperly encode UINT_8 and UINT_16 parquet data --- .../src/arrow/array_reader/primitive_array.rs | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 709d0f8bb16e..1acdac4a97f7 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -23,11 +23,10 @@ use crate::column::page::PageIterator; use crate::data_type::{DataType, Int96}; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use arrow_array::Decimal256Array; use arrow_array::{ builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array, - Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array, - UInt64Array, + Decimal256Array, Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_buffer::{i256, BooleanBuffer, Buffer}; use arrow_data::ArrayDataBuilder; @@ -210,7 +209,29 @@ where // These are: // - date64: cast int32 to date32, then date32 to date64. // - decimal: cast int32 to decimal, int64 to decimal + // + // Some Parquet writers do not properly write UINT_8 and UINT_16 types + // (they will emit a negative 32-bit integer in some cases). To handle + // these incorrect files, we need to do some explicit casting to unsigned, + // rather than relying on num::cast::cast as used by arrow-cast (it will not + // cast a negative INT32 to UINT8 or UINT16). let array = match target_type { + ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => { + let array = array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i as u8) as UInt8Array; + Arc::new(array) as ArrayRef + } + ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => { + let array = array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i as u16) as UInt16Array; + Arc::new(array) as ArrayRef + } ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => { // this is cheap as it internally reinterprets the data let a = arrow_cast::cast(&array, &ArrowType::Date32)?; From 6ca4aad5e9ff40f5f545b3c6db919c63e8359e67 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 5 Feb 2025 10:51:47 -0800 Subject: [PATCH 2/8] add some benchmarks --- arrow-array/src/array/primitive_array.rs | 1 + parquet/benches/arrow_reader.rs | 60 +++++++++++++++++++ .../src/arrow/array_reader/primitive_array.rs | 8 +++ 3 files changed, 69 insertions(+) diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 57aa23bf9040..0ad46ff41fc5 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1002,6 +1002,7 @@ impl PrimitiveArray { match op(unsafe { self.value_unchecked(idx) }) { Some(v) => unsafe { *slice.get_unchecked_mut(idx) = v }, None => { + println!("oof {:?}", self.value(idx)); out_null_count += 1; null_builder.set_bit(idx, false); } diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index e5165fee212c..2cf41d5d3d1d 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -76,6 +76,14 @@ fn build_test_schema() -> SchemaDescPtr { OPTIONAL FIXED_LEN_BYTE_ARRAY (8) optional_flba8_leaf; REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_flba16_leaf; OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_flba16_leaf; + REQUIRED INT32 mandatory_uint8_leaf (INTEGER(8, false)); + OPTIONAL INT32 optional_uint8_leaf (INTEGER(8, false)); + REQUIRED INT32 mandatory_uint16_leaf (INTEGER(16, false)); + OPTIONAL INT32 optional_uint16_leaf (INTEGER(16, false)); + REQUIRED INT32 mandatory_uint32_leaf (INTEGER(32, false)); + OPTIONAL INT32 optional_uint32_leaf (INTEGER(32, false)); + REQUIRED INT32 mandatory_int8_leaf (INTEGER(8, true)); + OPTIONAL INT32 optional_int8_leaf (INTEGER(8, true)); } "; parse_message_type(message_type) @@ -1279,6 +1287,14 @@ fn add_benches(c: &mut Criterion) { let string_list_desc = schema.column(14); let mandatory_binary_column_desc = schema.column(15); let optional_binary_column_desc = schema.column(16); + let mandatory_uint8_column_desc = schema.column(27); + let optional_uint8_column_desc = schema.column(28); + let mandatory_uint16_column_desc = schema.column(29); + let optional_uint16_column_desc = schema.column(30); + let mandatory_uint32_column_desc = schema.column(31); + let optional_uint32_column_desc = schema.column(32); + let mandatory_int8_column_desc = schema.column(33); + let optional_int8_column_desc = schema.column(34); // primitive / int32 benchmarks // ============================= @@ -1293,6 +1309,50 @@ fn add_benches(c: &mut Criterion) { ); group.finish(); + // primitive int32 / logical uint8 benchmarks + let mut group = c.benchmark_group("arrow_array_reader/UInt8Array"); + bench_primitive::( + &mut group, + &mandatory_uint8_column_desc, + &optional_uint8_column_desc, + 0, + 256, + ); + group.finish(); + + // primitive int32 / logical int8 benchmarks + let mut group = c.benchmark_group("arrow_array_reader/Int8Array"); + bench_primitive::( + &mut group, + &mandatory_int8_column_desc, + &optional_int8_column_desc, + 0, + 128, + ); + group.finish(); + + // primitive int32 / logical uint16 benchmarks + let mut group = c.benchmark_group("arrow_array_reader/UInt16Array"); + bench_primitive::( + &mut group, + &mandatory_uint16_column_desc, + &optional_uint16_column_desc, + 0, + 65536, + ); + group.finish(); + + // primitive int32 / logical uint32 benchmarks + let mut group = c.benchmark_group("arrow_array_reader/UInt32Array"); + bench_primitive::( + &mut group, + &mandatory_uint32_column_desc, + &optional_uint32_column_desc, + 0, + 1000, + ); + group.finish(); + // primitive / int64 benchmarks // ============================= diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 1acdac4a97f7..e6b613631875 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -224,6 +224,14 @@ where .unary(|i| i as u8) as UInt8Array; Arc::new(array) as ArrayRef } + ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => { + let array = array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i as i8) as arrow_array::Int8Array; + Arc::new(array) as ArrayRef + } ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => { let array = array .as_any() From 244e4b4c932af5d1f434a6f2d1badf7bba1d1e7e Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 7 Feb 2025 15:38:52 -0800 Subject: [PATCH 3/8] remove print --- arrow-array/src/array/primitive_array.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 0ad46ff41fc5..57aa23bf9040 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1002,7 +1002,6 @@ impl PrimitiveArray { match op(unsafe { self.value_unchecked(idx) }) { Some(v) => unsafe { *slice.get_unchecked_mut(idx) = v }, None => { - println!("oof {:?}", self.value(idx)); out_null_count += 1; null_builder.set_bit(idx, false); } From 75adc3d9ec0c4de550106b4ca36a5eb96adc7640 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 13 Feb 2025 08:36:34 -0800 Subject: [PATCH 4/8] checkpoint some experimental code --- parquet/src/arrow/array_reader/primitive_array.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index e6b613631875..172e340e2d3a 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -29,6 +29,7 @@ use arrow_array::{ UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow_buffer::{i256, BooleanBuffer, Buffer}; +use arrow_cast::CastOptions; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, TimeUnit}; use std::any::Any; @@ -225,12 +226,15 @@ where Arc::new(array) as ArrayRef } ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => { + /* let array = array .as_any() .downcast_ref::() .unwrap() .unary(|i| i as i8) as arrow_array::Int8Array; - Arc::new(array) as ArrayRef + Arc::new(array) as ArrayRef*/ + arrow_cast::cast_with_options(&array, target_type, &CastOptions{safe: false, ..Default::default()})? + //arrow_cast::cast(&array, target_type)? } ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => { let array = array From 3d2b87f464805c341c46e00aa055c13a2648d164 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 7 Apr 2025 12:57:27 -0700 Subject: [PATCH 5/8] checkpoint --- .../src/arrow/array_reader/primitive_array.rs | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 5a3112ea89b8..bf94620bc60e 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -29,12 +29,11 @@ use arrow_array::{ TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder, }, ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array, - Int32Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, - UInt8Array, + Int16Array, Int32Array, Int64Array, Int8Array, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, }; use arrow_buffer::{i256, BooleanBuffer, Buffer}; -use arrow_cast::CastOptions; use arrow_data::ArrayDataBuilder; use arrow_schema::{DataType as ArrowType, TimeUnit}; use std::any::Any; @@ -278,22 +277,12 @@ where Arc::new(array) as ArrayRef } ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => { - /* let array = array .as_any() .downcast_ref::() .unwrap() - .unary(|i| i as i8) as arrow_array::Int8Array; - Arc::new(array) as ArrayRef*/ - arrow_cast::cast_with_options( - &array, - target_type, - &CastOptions { - safe: false, - ..Default::default() - }, - )? - //arrow_cast::cast(&array, target_type)? + .unary(|i| i as i8) as Int8Array; + Arc::new(array) as ArrayRef } ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => { let array = array @@ -303,6 +292,14 @@ where .unary(|i| i as u16) as UInt16Array; Arc::new(array) as ArrayRef } + ArrowType::Int16 if *(array.data_type()) == ArrowType::Int32 => { + let array = array + .as_any() + .downcast_ref::() + .unwrap() + .unary(|i| i as i16) as Int16Array; + Arc::new(array) as ArrayRef + } ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => { // this is cheap as it internally reinterprets the data let a = arrow_cast::cast(&array, &ArrowType::Date32)?; From db0823a6cc72b3a4dc1c0919e339fd4a64308625 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 10 Apr 2025 13:42:24 -0700 Subject: [PATCH 6/8] add a few more types --- parquet/benches/arrow_reader.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 83de9dffd940..321424b8206c 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -84,6 +84,10 @@ fn build_test_schema() -> SchemaDescPtr { OPTIONAL INT32 optional_uint32_leaf (INTEGER(32, false)); REQUIRED INT32 mandatory_int8_leaf (INTEGER(8, true)); OPTIONAL INT32 optional_int8_leaf (INTEGER(8, true)); + REQUIRED INT32 mandatory_int16_leaf (INTEGER(16, true)); + OPTIONAL INT32 optional_int16_leaf (INTEGER(16, true)); + REQUIRED INT64 mandatory_uint64_leaf (INTEGER(64, false)); + OPTIONAL INT64 optional_uint64_leaf (INTEGER(64, false)); } "; parse_message_type(message_type) @@ -1296,6 +1300,10 @@ fn add_benches(c: &mut Criterion) { let optional_uint32_column_desc = schema.column(32); let mandatory_int8_column_desc = schema.column(33); let optional_int8_column_desc = schema.column(34); + let mandatory_int16_column_desc = schema.column(35); + let optional_int16_column_desc = schema.column(36); + let mandatory_uint64_column_desc = schema.column(37); + let optional_uint64_column_desc = schema.column(38); // primitive / int32 benchmarks // ============================= @@ -1343,6 +1351,17 @@ fn add_benches(c: &mut Criterion) { ); group.finish(); + // primitive int32 / logical int16 benchmarks + let mut group = c.benchmark_group("arrow_array_reader/Int16Array"); + bench_primitive::( + &mut group, + &mandatory_int16_column_desc, + &optional_int16_column_desc, + 0, + 32768, + ); + group.finish(); + // primitive int32 / logical uint32 benchmarks let mut group = c.benchmark_group("arrow_array_reader/UInt32Array"); bench_primitive::( @@ -1367,6 +1386,17 @@ fn add_benches(c: &mut Criterion) { ); group.finish(); + // primitive int64 / logical uint64 benchmarks + let mut group = c.benchmark_group("arrow_array_reader/UInt64Array"); + bench_primitive::( + &mut group, + &mandatory_uint64_column_desc, + &optional_uint64_column_desc, + 0, + 1000, + ); + group.finish(); + // string benchmarks //============================== From dd9e262f92cb6b932ccbc6bd9fdc60e4a811d9b7 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 10 Apr 2025 15:22:02 -0700 Subject: [PATCH 7/8] modify comment --- parquet/src/arrow/array_reader/primitive_array.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index bf94620bc60e..d9a0c1bb1b85 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -261,13 +261,14 @@ where // These are: // - date64: cast int32 to date32, then date32 to date64. // - decimal: cast int32 to decimal, int64 to decimal - // - // Some Parquet writers do not properly write UINT_8 and UINT_16 types - // (they will emit a negative 32-bit integer in some cases). To handle - // these incorrect files, we need to do some explicit casting to unsigned, - // rather than relying on num::cast::cast as used by arrow-cast (it will not - // cast a negative INT32 to UINT8 or UINT16). let array = match target_type { + // Using `arrow_cast::cast` has been found to be very slow for converting + // INT32 physical type to lower bitwidth logical types. Since rust casts + // are infallible, instead use `unary` which is much faster. + // One consequence of this approach is that some malformed integer columns + // will return (an arguably correct) result rather than null. + // See https://github.com/apache/arrow-rs/issues/7040 for a discussion of this + // issue. ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => { let array = array .as_any() From 6f583c41ce828d4ff7a8e5b2fa892e83a8ef6a9c Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 10 Apr 2025 15:24:34 -0700 Subject: [PATCH 8/8] another edit --- parquet/src/arrow/array_reader/primitive_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index d9a0c1bb1b85..76b1e1cad52d 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -264,7 +264,7 @@ where let array = match target_type { // Using `arrow_cast::cast` has been found to be very slow for converting // INT32 physical type to lower bitwidth logical types. Since rust casts - // are infallible, instead use `unary` which is much faster. + // are infallible, instead use `unary` which is much faster (by up to 40%). // One consequence of this approach is that some malformed integer columns // will return (an arguably correct) result rather than null. // See https://github.com/apache/arrow-rs/issues/7040 for a discussion of this