diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 5728746e904b..d0af96329b5f 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -29,6 +29,7 @@ use arrow_ipc::reader::FileDecoder; use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::PartitionedFile; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::StreamExt; @@ -121,7 +122,11 @@ pub struct ArrowOpener { } impl FileOpener for ArrowOpener { - fn open(&self, file_meta: FileMeta) -> Result { + fn open( + &self, + file_meta: FileMeta, + _file: PartitionedFile, + ) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 3ef403013452..e33761a0abb3 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -369,7 +369,8 @@ mod tests { .build(); // Add partition columns - config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)]; + config.table_partition_cols = + vec![Arc::new(Field::new("date", DataType::Utf8, false))]; config.file_groups[0][0].partition_values = vec![ScalarValue::from("2021-10-26")]; // We should be able to project on the partition column diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 87fa70c07a69..958dd8f59e93 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -57,7 +57,11 @@ pub struct TestOpener { } impl FileOpener for TestOpener { - fn open(&self, _file_meta: FileMeta) -> Result { + fn open( + &self, + _file_meta: FileMeta, + _file: PartitionedFile, + ) -> Result { let mut batches = self.batches.clone(); if let Some(batch_size) = self.batch_size { let batch = concat_batches(&batches[0].schema(), &batches)?; diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 3254f48bab39..948049f5a747 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -145,7 +145,9 @@ mod private { use super::*; use bytes::Buf; - use datafusion_datasource::{file_meta::FileMeta, file_stream::FileOpenFuture}; + use datafusion_datasource::{ + file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile, + }; use futures::StreamExt; use object_store::{GetResultPayload, ObjectStore}; @@ -155,7 +157,11 @@ mod private { } impl FileOpener for AvroOpener { - fn open(&self, file_meta: FileMeta) -> Result { + fn open( + &self, + file_meta: FileMeta, + _file: PartitionedFile, + ) -> Result { let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); Ok(Box::pin(async move { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 3af1f2b345ba..6c994af940d1 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -29,7 +29,8 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ - as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation, + as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile, + RangeCalculation, }; use arrow::csv; @@ -322,7 +323,11 @@ impl FileOpener for CsvOpener { /// A,1,2,3,4,5,6,7,8,9\n /// A},1,2,3,4,5,6,7,8,9\n /// The lines read would be: [1, 2] - fn open(&self, file_meta: FileMeta) -> Result { + fn open( + &self, + file_meta: FileMeta, + _file: PartitionedFile, + ) -> Result { // `self.config.has_header` controls whether to skip reading the 1st line header // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle // partition, then don't skip first line diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index af37e1033ef1..d318928e5c6b 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -32,7 +32,7 @@ use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ - as_file_source, calculate_range, ListingTableUrl, RangeCalculation, + as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation, }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -176,7 +176,11 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. - fn open(&self, file_meta: FileMeta) -> Result { + fn open( + &self, + file_meta: FileMeta, + _file: PartitionedFile, + ) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 3213d0201295..9acd39b37e83 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -27,6 +27,10 @@ use datafusion_physical_plan::metrics::{ /// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory #[derive(Debug, Clone)] pub struct ParquetFileMetrics { + /// Number of files pruned by partition of file level statistics + /// This often happens at planning time but may happen at execution time + /// if dynamic filters (e.g. from a join) result in additional pruning. + pub files_pruned_statistics: Count, /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: Count, /// Number of row groups whose bloom filters were checked and matched (not pruned) @@ -122,7 +126,11 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .subset_time("metadata_load_time", partition); + let files_pruned_statistics = + MetricBuilder::new(metrics).counter("files_pruned_statistics", partition); + Self { + files_pruned_statistics, predicate_evaluation_errors, row_groups_matched_bloom_filter, row_groups_pruned_bloom_filter, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 9e14425074f7..285044803d73 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -29,14 +29,20 @@ use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use arrow::datatypes::{SchemaRef, TimeUnit}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit}; use arrow::error::ArrowError; +use datafusion_common::pruning::{ + CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics, + PruningStatistics, +}; use datafusion_common::{exec_err, Result}; +use datafusion_datasource::PartitionedFile; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; @@ -58,6 +64,8 @@ pub(super) struct ParquetOpener { /// Schema of the output table without partition columns. /// This is the schema we coerce the physical file schema into. pub logical_file_schema: SchemaRef, + /// Partition columns + pub partition_fields: Vec, /// Optional hint for how large the initial request to read parquet metadata /// should be pub metadata_size_hint: Option, @@ -85,7 +93,10 @@ pub(super) struct ParquetOpener { } impl FileOpener for ParquetOpener { - fn open(&self, file_meta: FileMeta) -> Result { + fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result { + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + let file_range = file_meta.range.clone(); let extensions = file_meta.extensions.clone(); let file_name = file_meta.location().to_string(); @@ -97,7 +108,7 @@ impl FileOpener for ParquetOpener { let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, - file_meta, + file_meta.clone(), metadata_size_hint, &self.metrics, )?; @@ -112,6 +123,7 @@ impl FileOpener for ParquetOpener { .create(projected_schema, Arc::clone(&self.logical_file_schema)); let predicate = self.predicate.clone(); let logical_file_schema = Arc::clone(&self.logical_file_schema); + let partition_fields = self.partition_fields.clone(); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; let coerce_int96 = self.coerce_int96; @@ -119,12 +131,69 @@ impl FileOpener for ParquetOpener { let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; - let predicate_creation_errors = MetricBuilder::new(&self.metrics) - .global_counter("num_predicate_creation_errors"); - let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { + // Prune this file using the file level statistics. + // Since dynamic filters may have been updated since planning it is possible that we are able + // to prune files now that we couldn't prune at planning time. + if let Some(predicate) = &predicate { + // Build a pruning schema that combines the file fields and partition fields. + // Partition fileds are always at the end. + let pruning_schema = Arc::new( + Schema::new( + logical_file_schema + .fields() + .iter() + .cloned() + .chain(partition_fields.iter().cloned()) + .collect_vec(), + ) + .with_metadata(logical_file_schema.metadata().clone()), + ); + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + &pruning_schema, + &predicate_creation_errors, + ); + if let Some(pruning_predicate) = pruning_predicate { + // The partition column schema is the schema of the table - the schema of the file + let mut pruning = Box::new(PartitionPruningStatistics::try_new( + vec![file.partition_values], + partition_fields.clone(), + )?) + as Box; + if let Some(stats) = file.statistics { + let stats_pruning = Box::new(PrunableStatistics::new( + vec![stats], + Arc::clone(&pruning_schema), + )); + pruning = Box::new(CompositePruningStatistics::new(vec![ + pruning, + stats_pruning, + ])); + } + match pruning_predicate.prune(pruning.as_ref()) { + Ok(values) => { + assert!(values.len() == 1); + // We expect a single container -> if all containers are false skip this file + if values.into_iter().all(|v| !v) { + // Return an empty stream + file_metrics.files_pruned_statistics.add(1); + return Ok(futures::stream::empty().boxed()); + } + } + // Stats filter array could not be built, so we can't prune + Err(e) => { + debug!( + "Ignoring error building pruning predicate for file '{}': {e}", + file_meta.location(), + ); + predicate_creation_errors.add(1); + } + } + } + } // Don't load the page index yet. Since it is not stored inline in // the footer, loading the page index if it is not needed will do // unecessary I/O. We decide later if it is needed to evaluate the @@ -454,3 +523,360 @@ fn should_enable_page_index( .map(|p| p.filter_number() > 0) .unwrap_or(false) } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema}; + use bytes::{BufMut, BytesMut}; + use chrono::Utc; + use datafusion_common::{ + record_batch, stats::Precision, ColumnStatistics, ScalarValue, Statistics, + }; + use datafusion_datasource::{ + file_meta::FileMeta, file_stream::FileOpener, + schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile, + }; + use datafusion_expr::{col, lit}; + use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use futures::{Stream, StreamExt}; + use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; + use parquet::arrow::ArrowWriter; + + use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; + + async fn count_batches_and_rows( + mut stream: std::pin::Pin< + Box< + dyn Stream< + Item = Result< + arrow::array::RecordBatch, + arrow::error::ArrowError, + >, + > + Send, + >, + >, + ) -> (usize, usize) { + let mut num_batches = 0; + let mut num_rows = 0; + while let Some(Ok(batch)) = stream.next().await { + num_rows += batch.num_rows(); + num_batches += 1; + } + (num_batches, num_rows) + } + + async fn write_parquet( + store: Arc, + filename: &str, + batch: arrow::record_batch::RecordBatch, + ) -> usize { + let mut out = BytesMut::new().writer(); + { + let mut writer = + ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + let data_len = data.len(); + store.put(&Path::from(filename), data.into()).await.unwrap(); + data_len + } + + #[tokio::test] + async fn test_prune_on_statistics() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(2)]), + ("b", Float32, vec![Some(1.0), Some(2.0), None]) + ) + .unwrap(); + + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let schema = batch.schema(); + let file = PartitionedFile::new( + "file.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ) + .with_statistics(Arc::new( + Statistics::new_unknown(&schema) + .add_column_statistics(ColumnStatistics::new_unknown()) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float32(Some(1.0)))) + .with_max_value(Precision::Exact(ScalarValue::Float32(Some(2.0)))) + .with_null_count(Precision::Exact(1)), + ), + )); + + let make_opener = |predicate| { + ParquetOpener { + partition_index: 0, + projection: Arc::new([0, 1]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + logical_file_schema: schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(Arc::clone(&store)), + ), + partition_fields: vec![], + pushdown_filters: false, // note that this is false! + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: true, + coerce_int96: None, + } + }; + + let make_meta = || FileMeta { + object_meta: ObjectMeta { + location: Path::from("test.parquet"), + last_modified: Utc::now(), + size: u64::try_from(data_size).unwrap(), + e_tag: None, + version: None, + }, + range: None, + extensions: None, + metadata_size_hint: None, + }; + + // A filter on "a" should not exclude any rows even if it matches the data + let expr = col("a").eq(lit(1)); + let predicate = logical2physical(&expr, &schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 3); + + // A filter on `b = 5.0` should exclude all rows + let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0)))); + let predicate = logical2physical(&expr, &schema); + let opener = make_opener(predicate); + let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + } + + #[tokio::test] + async fn test_prune_on_partition_statistics() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await; + + let file_schema = batch.schema(); + let mut file = PartitionedFile::new( + "part=1/file.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + file.partition_values = vec![ScalarValue::Int32(Some(1))]; + + let table_schema = Arc::new(Schema::new(vec![ + Field::new("part", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + + let make_opener = |predicate| { + ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + logical_file_schema: file_schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(Arc::clone(&store)), + ), + partition_fields: vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))], + pushdown_filters: false, // note that this is false! + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: true, + coerce_int96: None, + } + }; + + let make_meta = || FileMeta { + object_meta: ObjectMeta { + location: Path::from("part=1/file.parquet"), + last_modified: Utc::now(), + size: u64::try_from(data_size).unwrap(), + e_tag: None, + version: None, + }, + range: None, + extensions: None, + metadata_size_hint: None, + }; + + // Filter should match the partition value + let expr = col("part").eq(lit(1)); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 3); + + // Filter should not match the partition value + let expr = col("part").eq(lit(2)); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + } + + #[tokio::test] + async fn test_prune_on_partition_values_and_file_statistics() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(3)]), + ("b", Float64, vec![Some(1.0), Some(2.0), None]) + ) + .unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await; + let file_schema = batch.schema(); + let mut file = PartitionedFile::new( + "part=1/file.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + file.partition_values = vec![ScalarValue::Int32(Some(1))]; + file.statistics = Some(Arc::new( + Statistics::new_unknown(&file_schema) + .add_column_statistics(ColumnStatistics::new_unknown()) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(1.0)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(2.0)))) + .with_null_count(Precision::Exact(1)), + ), + )); + let table_schema = Arc::new(Schema::new(vec![ + Field::new("part", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float32, true), + ])); + let make_opener = |predicate| { + ParquetOpener { + partition_index: 0, + projection: Arc::new([0]), + batch_size: 1024, + limit: None, + predicate: Some(predicate), + logical_file_schema: file_schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new( + DefaultParquetFileReaderFactory::new(Arc::clone(&store)), + ), + partition_fields: vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))], + pushdown_filters: false, // note that this is false! + reorder_filters: false, + enable_page_index: false, + enable_bloom_filter: false, + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + enable_row_group_stats_pruning: true, + coerce_int96: None, + } + }; + let make_meta = || FileMeta { + object_meta: ObjectMeta { + location: Path::from("part=1/file.parquet"), + last_modified: Utc::now(), + size: u64::try_from(data_size).unwrap(), + e_tag: None, + version: None, + }, + range: None, + extensions: None, + metadata_size_hint: None, + }; + + // Filter should match the partition value and file statistics + let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 1); + assert_eq!(num_rows, 3); + + // Should prune based on partition value but not file statistics + let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + + // Should prune based on file statistics but not partition value + let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener + .open(make_meta(), file.clone()) + .unwrap() + .await + .unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + + // Should prune based on both partition value and file statistics + let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0))); + let predicate = logical2physical(&expr, &table_schema); + let opener = make_opener(predicate); + let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let (num_batches, num_rows) = count_batches_and_rows(stream).await; + assert_eq!(num_batches, 0); + assert_eq!(num_rows, 0); + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 0412288d6875..004952d5bd24 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -492,6 +492,7 @@ impl FileSource for ParquetSource { limit: base_config.limit, predicate: self.predicate.clone(), logical_file_schema: Arc::clone(&base_config.file_schema), + partition_fields: base_config.table_partition_cols.clone(), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics().clone(), parquet_file_reader_factory, diff --git a/datafusion/datasource/src/file_meta.rs b/datafusion/datasource/src/file_meta.rs index 098a15eeb38a..ed7d958c6020 100644 --- a/datafusion/datasource/src/file_meta.rs +++ b/datafusion/datasource/src/file_meta.rs @@ -22,6 +22,7 @@ use object_store::{path::Path, ObjectMeta}; use crate::FileRange; /// A single file or part of a file that should be read, along with its schema, statistics +#[derive(Debug, Clone)] pub struct FileMeta { /// Path for the file (e.g. URL, filesystem path, etc) pub object_meta: ObjectMeta, diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index b6cdd05e5230..5dff8bcaeeb5 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -35,6 +35,7 @@ use crate::{ statistics::MinMaxStatistics, PartitionedFile, }; +use arrow::datatypes::FieldRef; use arrow::{ array::{ ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch, @@ -172,7 +173,7 @@ pub struct FileScanConfig { /// all records after filtering are returned. pub limit: Option, /// The partitioning columns - pub table_partition_cols: Vec, + pub table_partition_cols: Vec, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, /// File compression type @@ -253,7 +254,7 @@ pub struct FileScanConfigBuilder { limit: Option, projection: Option>, - table_partition_cols: Vec, + table_partition_cols: Vec, constraints: Option, file_groups: Vec, statistics: Option, @@ -317,7 +318,10 @@ impl FileScanConfigBuilder { /// Set the partitioning columns pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { - self.table_partition_cols = table_partition_cols; + self.table_partition_cols = table_partition_cols + .into_iter() + .map(|f| Arc::new(f) as FieldRef) + .collect(); self } @@ -735,7 +739,9 @@ impl FileScanConfig { self.file_schema.field(idx).clone() } else { let partition_idx = idx - self.file_schema.fields().len(); - self.table_partition_cols[partition_idx].clone() + Arc::unwrap_or_clone(Arc::clone( + &self.table_partition_cols[partition_idx], + )) } }) .collect(); @@ -798,7 +804,10 @@ impl FileScanConfig { /// Set the partitioning columns of the files #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { - self.table_partition_cols = table_partition_cols; + self.table_partition_cols = table_partition_cols + .into_iter() + .map(|f| Arc::new(f) as FieldRef) + .collect(); self } @@ -2363,6 +2372,7 @@ mod tests { let new_config = new_builder.build(); // Verify properties match + let partition_cols = partition_cols.into_iter().map(Arc::new).collect::>(); assert_eq!(new_config.object_store_url, object_store_url); assert_eq!(new_config.file_schema, schema); assert_eq!(new_config.projection, Some(vec![0, 2])); diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 1dc53bd6b931..25546b3263c9 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -120,16 +120,17 @@ impl FileStream { let part_file = self.file_iter.pop_front()?; let file_meta = FileMeta { - object_meta: part_file.object_meta, - range: part_file.range, - extensions: part_file.extensions, + object_meta: part_file.object_meta.clone(), + range: part_file.range.clone(), + extensions: part_file.extensions.clone(), metadata_size_hint: part_file.metadata_size_hint, }; + let partition_values = part_file.partition_values.clone(); Some( self.file_opener - .open(file_meta) - .map(|future| (future, part_file.partition_values)), + .open(file_meta, part_file) + .map(|future| (future, partition_values)), ) } @@ -367,7 +368,7 @@ impl Default for OnError { pub trait FileOpener: Unpin + Send + Sync { /// Asynchronously open the specified file and return a stream /// of [`RecordBatch`] - fn open(&self, file_meta: FileMeta) -> Result; + fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result; } /// Represents the state of the next `FileOpenFuture`. Since we need to poll @@ -555,7 +556,11 @@ mod tests { } impl FileOpener for TestOpener { - fn open(&self, _file_meta: FileMeta) -> Result { + fn open( + &self, + _file_meta: FileMeta, + _file: PartitionedFile, + ) -> Result { let idx = self.current_idx.fetch_add(1, Ordering::SeqCst); if self.error_opening_idx.contains(&idx) { diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index e2378b5f42df..95dc5888d36b 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -469,7 +469,10 @@ impl PruningPredicate { /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions. /// /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html - pub fn prune(&self, statistics: &S) -> Result> { + pub fn prune( + &self, + statistics: &S, + ) -> Result> { let mut builder = BoolVecBuilder::new(statistics.num_containers()); // Try to prove the predicate can't be true for the containers based on @@ -851,7 +854,7 @@ impl From> for RequiredColumns { /// -------+-------- /// 5 | 1000 /// ``` -fn build_statistics_record_batch( +fn build_statistics_record_batch( statistics: &S, required_columns: &RequiredColumns, ) -> Result { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d1b1f51ae107..d0538e7df3cf 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -506,7 +506,7 @@ pub fn serialize_file_scan_config( .iter() .cloned() .collect::>(); - fields.extend(conf.table_partition_cols.iter().cloned().map(Arc::new)); + fields.extend(conf.table_partition_cols.iter().cloned()); let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone())); Ok(protobuf::FileScanExecConf { diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 3922e0d45d88..4f938f6c2758 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -146,6 +146,14 @@ working but no one knows due to lack of test coverage). [api deprecation guidelines]: https://datafusion.apache.org/contributor-guide/api-health.html#deprecation-guidelines +### `PartitionedFile` added as an arguement to the `FileOpener` trait + +This is necessary to properly fix filter pushdown for filters that combine partition +columns and file columns (e.g. `day = username['dob']`). + +If you implemented a custom `FileOpener` you will need to add the `PartitionedFile` argument +but are not required to use it in any way. + ## DataFusion `47.0.0` This section calls out some of the major changes in the `47.0.0` release of DataFusion.