Skip to content

Commit 7ff6c7e

Browse files
authored
Add coerce int96 option for Parquet to support different TimeUnits, test int96_from_spark.parquet from parquet-testing (#15537)
1 parent 63f37a3 commit 7ff6c7e

File tree

21 files changed

+290
-24
lines changed

21 files changed

+290
-24
lines changed

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,14 @@ config_namespace! {
459459
/// BLOB instead.
460460
pub binary_as_string: bool, default = false
461461

462+
/// (reading) If true, parquet reader will read columns of
463+
/// physical type int96 as originating from a different resolution
464+
/// than nanosecond. This is useful for reading data from systems like Spark
465+
/// which stores microsecond resolution timestamps in an int96 allowing it
466+
/// to write values with a larger date range than 64-bit timestamps with
467+
/// nanosecond resolution.
468+
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
469+
462470
// The following options affect writing to parquet files
463471
// and map to parquet::file::properties::WriterProperties
464472

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ impl ParquetOptions {
239239
bloom_filter_on_read: _, // reads not used for writer props
240240
schema_force_view_types: _,
241241
binary_as_string: _, // not used for writer props
242+
coerce_int96: _, // not used for writer props
242243
skip_arrow_metadata: _,
243244
} = self;
244245

@@ -516,6 +517,7 @@ mod tests {
516517
schema_force_view_types: defaults.schema_force_view_types,
517518
binary_as_string: defaults.binary_as_string,
518519
skip_arrow_metadata: defaults.skip_arrow_metadata,
520+
coerce_int96: None,
519521
}
520522
}
521523

@@ -622,6 +624,7 @@ mod tests {
622624
schema_force_view_types: global_options_defaults.schema_force_view_types,
623625
binary_as_string: global_options_defaults.binary_as_string,
624626
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
627+
coerce_int96: None,
625628
},
626629
column_specific_options,
627630
key_value_metadata,

datafusion/core/src/datasource/file_format/avro.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,15 @@ mod tests {
382382
let testdata = test_util::arrow_test_data();
383383
let store_root = format!("{testdata}/avro");
384384
let format = AvroFormat {};
385-
scan_format(state, &format, &store_root, file_name, projection, limit).await
385+
scan_format(
386+
state,
387+
&format,
388+
None,
389+
&store_root,
390+
file_name,
391+
projection,
392+
limit,
393+
)
394+
.await
386395
}
387396
}

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ mod tests {
250250
let exec = scan_format(
251251
&state,
252252
&format,
253+
None,
253254
root,
254255
"aggregate_test_100_with_nulls.csv",
255256
projection,
@@ -300,6 +301,7 @@ mod tests {
300301
let exec = scan_format(
301302
&state,
302303
&format,
304+
None,
303305
root,
304306
"aggregate_test_100_with_nulls.csv",
305307
projection,
@@ -582,7 +584,7 @@ mod tests {
582584
) -> Result<Arc<dyn ExecutionPlan>> {
583585
let root = format!("{}/csv", arrow_test_data());
584586
let format = CsvFormat::default().with_has_header(has_header);
585-
scan_format(state, &format, &root, file_name, projection, limit).await
587+
scan_format(state, &format, None, &root, file_name, projection, limit).await
586588
}
587589

588590
#[tokio::test]

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ mod tests {
149149
) -> Result<Arc<dyn ExecutionPlan>> {
150150
let filename = "tests/data/2.json";
151151
let format = JsonFormat::default();
152-
scan_format(state, &format, ".", filename, projection, limit).await
152+
scan_format(state, &format, None, ".", filename, projection, limit).await
153153
}
154154

155155
#[tokio::test]

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,20 @@ pub use datafusion_datasource::write;
3636

3737
#[cfg(test)]
3838
pub(crate) mod test_util {
39-
use std::sync::Arc;
40-
39+
use arrow_schema::SchemaRef;
4140
use datafusion_catalog::Session;
4241
use datafusion_common::Result;
4342
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4443
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
4544
use datafusion_execution::object_store::ObjectStoreUrl;
45+
use std::sync::Arc;
4646

4747
use crate::test::object_store::local_unpartitioned_file;
4848

4949
pub async fn scan_format(
5050
state: &dyn Session,
5151
format: &dyn FileFormat,
52+
schema: Option<SchemaRef>,
5253
store_root: &str,
5354
file_name: &str,
5455
projection: Option<Vec<usize>>,
@@ -57,9 +58,13 @@ pub(crate) mod test_util {
5758
let store = Arc::new(object_store::local::LocalFileSystem::new()) as _;
5859
let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));
5960

60-
let file_schema = format
61-
.infer_schema(state, &store, std::slice::from_ref(&meta))
62-
.await?;
61+
let file_schema = if let Some(file_schema) = schema {
62+
file_schema
63+
} else {
64+
format
65+
.infer_schema(state, &store, std::slice::from_ref(&meta))
66+
.await?
67+
};
6368

6469
let statistics = format
6570
.infer_stats(state, &store, file_schema.clone(), &meta)

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,10 @@ mod tests {
10751075
.map(|factory| factory.create(state, &Default::default()).unwrap())
10761076
.unwrap_or(Arc::new(ParquetFormat::new()));
10771077

1078-
scan_format(state, &*format, &testdata, file_name, projection, limit).await
1078+
scan_format(
1079+
state, &*format, None, &testdata, file_name, projection, limit,
1080+
)
1081+
.await
10791082
}
10801083

10811084
/// Test that 0-byte files don't break while reading

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ mod tests {
3838
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
3939
use crate::test::object_store::local_unpartitioned_file;
4040
use arrow::array::{
41-
ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
41+
ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
4242
StructArray,
4343
};
4444
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
@@ -1109,6 +1109,7 @@ mod tests {
11091109
let parquet_exec = scan_format(
11101110
&state,
11111111
&ParquetFormat::default(),
1112+
None,
11121113
&testdata,
11131114
filename,
11141115
Some(vec![0, 1, 2]),
@@ -1141,6 +1142,92 @@ mod tests {
11411142
Ok(())
11421143
}
11431144

1145+
#[tokio::test]
1146+
async fn parquet_exec_with_int96_from_spark() -> Result<()> {
1147+
// arrow-rs relies on the chrono library to convert between timestamps and strings, so
1148+
// instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1149+
// anyway, so this should be a zero-copy non-modifying cast at the SchemaAdapter.
1150+
1151+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
1152+
let testdata = datafusion_common::test_util::parquet_test_data();
1153+
let filename = "int96_from_spark.parquet";
1154+
let session_ctx = SessionContext::new();
1155+
let state = session_ctx.state();
1156+
let task_ctx = state.task_ctx();
1157+
1158+
let time_units_and_expected = vec![
1159+
(
1160+
None, // Same as "ns" time_unit
1161+
Arc::new(Int64Array::from(vec![
1162+
Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s)
1163+
Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s)
1164+
Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1165+
Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s)
1166+
None,
1167+
Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1168+
])),
1169+
),
1170+
(
1171+
Some("ns".to_string()),
1172+
Arc::new(Int64Array::from(vec![
1173+
Some(1704141296123456000),
1174+
Some(1704070800000000000),
1175+
Some(-4852191831933722624),
1176+
Some(1735599600000000000),
1177+
None,
1178+
Some(-4864435138808946688),
1179+
])),
1180+
),
1181+
(
1182+
Some("us".to_string()),
1183+
Arc::new(Int64Array::from(vec![
1184+
Some(1704141296123456),
1185+
Some(1704070800000000),
1186+
Some(253402225200000000),
1187+
Some(1735599600000000),
1188+
None,
1189+
Some(9089380393200000000),
1190+
])),
1191+
),
1192+
];
1193+
1194+
for (time_unit, expected) in time_units_and_expected {
1195+
let parquet_exec = scan_format(
1196+
&state,
1197+
&ParquetFormat::default().with_coerce_int96(time_unit.clone()),
1198+
Some(schema.clone()),
1199+
&testdata,
1200+
filename,
1201+
Some(vec![0]),
1202+
None,
1203+
)
1204+
.await
1205+
.unwrap();
1206+
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1207+
1208+
let mut results = parquet_exec.execute(0, task_ctx.clone())?;
1209+
let batch = results.next().await.unwrap()?;
1210+
1211+
assert_eq!(6, batch.num_rows());
1212+
assert_eq!(1, batch.num_columns());
1213+
1214+
assert_eq!(batch.num_columns(), 1);
1215+
let column = batch.column(0);
1216+
1217+
assert_eq!(column.len(), expected.len());
1218+
1219+
column
1220+
.as_primitive::<arrow::datatypes::Int64Type>()
1221+
.iter()
1222+
.zip(expected.iter())
1223+
.for_each(|(lhs, rhs)| {
1224+
assert_eq!(lhs, rhs);
1225+
});
1226+
}
1227+
1228+
Ok(())
1229+
}
1230+
11441231
#[tokio::test]
11451232
async fn parquet_exec_with_range() -> Result<()> {
11461233
fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,18 @@ use std::ops::Range;
2424
use std::sync::Arc;
2525

2626
use arrow::array::RecordBatch;
27+
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
28+
use datafusion_datasource::file_compression_type::FileCompressionType;
29+
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
30+
use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer};
31+
32+
use datafusion_datasource::file_format::{
33+
FileFormat, FileFormatFactory, FilePushdownSupport,
34+
};
35+
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
36+
2737
use arrow::compute::sum;
2838
use arrow::datatypes::{DataType, Field, FieldRef};
29-
use arrow::datatypes::{Fields, Schema, SchemaRef};
3039
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
3140
use datafusion_common::parsers::CompressionTypeVariant;
3241
use datafusion_common::stats::Precision;
@@ -38,15 +47,8 @@ use datafusion_common::{HashMap, Statistics};
3847
use datafusion_common_runtime::{JoinSet, SpawnedTask};
3948
use datafusion_datasource::display::FileGroupDisplay;
4049
use datafusion_datasource::file::FileSource;
41-
use datafusion_datasource::file_compression_type::FileCompressionType;
42-
use datafusion_datasource::file_format::{
43-
FileFormat, FileFormatFactory, FilePushdownSupport,
44-
};
4550
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
46-
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
4751
use datafusion_datasource::sink::{DataSink, DataSinkExec};
48-
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
49-
use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer};
5052
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
5153
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5254
use datafusion_expr::dml::InsertOp;
@@ -76,11 +78,13 @@ use parquet::arrow::arrow_writer::{
7678
};
7779
use parquet::arrow::async_reader::MetadataFetch;
7880
use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter};
81+
use parquet::basic::Type;
7982
use parquet::errors::ParquetError;
8083
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
8184
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
8285
use parquet::file::writer::SerializedFileWriter;
8386
use parquet::format::FileMetaData;
87+
use parquet::schema::types::SchemaDescriptor;
8488
use tokio::io::{AsyncWrite, AsyncWriteExt};
8589
use tokio::sync::mpsc::{self, Receiver, Sender};
8690

@@ -268,6 +272,15 @@ impl ParquetFormat {
268272
self.options.global.binary_as_string = binary_as_string;
269273
self
270274
}
275+
276+
pub fn coerce_int96(&self) -> Option<String> {
277+
self.options.global.coerce_int96.clone()
278+
}
279+
280+
pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
281+
self.options.global.coerce_int96 = time_unit;
282+
self
283+
}
271284
}
272285

273286
/// Clears all metadata (Schema level and field level) on an iterator
@@ -569,6 +582,46 @@ pub fn apply_file_schema_type_coercions(
569582
))
570583
}
571584

585+
/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
586+
pub fn coerce_int96_to_resolution(
587+
parquet_schema: &SchemaDescriptor,
588+
file_schema: &Schema,
589+
time_unit: &TimeUnit,
590+
) -> Option<Schema> {
591+
let mut transform = false;
592+
let parquet_fields: HashMap<_, _> = parquet_schema
593+
.columns()
594+
.iter()
595+
.map(|f| {
596+
let dt = f.physical_type();
597+
if dt.eq(&Type::INT96) {
598+
transform = true;
599+
}
600+
(f.name(), dt)
601+
})
602+
.collect();
603+
604+
if !transform {
605+
return None;
606+
}
607+
608+
let transformed_fields: Vec<Arc<Field>> = file_schema
609+
.fields
610+
.iter()
611+
.map(|field| match parquet_fields.get(field.name().as_str()) {
612+
Some(Type::INT96) => {
613+
field_with_new_type(field, DataType::Timestamp(*time_unit, None))
614+
}
615+
_ => Arc::clone(field),
616+
})
617+
.collect();
618+
619+
Some(Schema::new_with_metadata(
620+
transformed_fields,
621+
file_schema.metadata.clone(),
622+
))
623+
}
624+
572625
/// Coerces the file schema if the table schema uses a view type.
573626
#[deprecated(
574627
since = "47.0.0",

0 commit comments

Comments
 (0)