Skip to content

Commit 7178a63

Browse files
committed
Add late pruning of files based on file level statistics
1 parent 25727d4 commit 7178a63

File tree

16 files changed

+518
-31
lines changed

16 files changed

+518
-31
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow_ipc::reader::FileDecoder;
2929
use datafusion_common::Statistics;
3030
use datafusion_datasource::file::FileSource;
3131
use datafusion_datasource::file_scan_config::FileScanConfig;
32+
use datafusion_datasource::PartitionedFile;
3233
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3334

3435
use futures::StreamExt;
@@ -121,7 +122,11 @@ pub struct ArrowOpener {
121122
}
122123

123124
impl FileOpener for ArrowOpener {
124-
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
125+
fn open(
126+
&self,
127+
file_meta: FileMeta,
128+
_file: PartitionedFile,
129+
) -> Result<FileOpenFuture> {
125130
let object_store = Arc::clone(&self.object_store);
126131
let projection = self.projection.clone();
127132
Ok(Box::pin(async move {

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,8 @@ mod tests {
369369
.build();
370370

371371
// Add partition columns
372-
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
372+
config.table_partition_cols =
373+
vec![Arc::new(Field::new("date", DataType::Utf8, false))];
373374
config.file_groups[0][0].partition_values = vec![ScalarValue::from("2021-10-26")];
374375

375376
// We should be able to project on the partition column

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ pub struct TestOpener {
5757
}
5858

5959
impl FileOpener for TestOpener {
60-
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
60+
fn open(
61+
&self,
62+
_file_meta: FileMeta,
63+
_file: PartitionedFile,
64+
) -> Result<FileOpenFuture> {
6165
let mut batches = self.batches.clone();
6266
if let Some(batch_size) = self.batch_size {
6367
let batch = concat_batches(&batches[0].schema(), &batches)?;

datafusion/datasource-avro/src/source.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ mod private {
145145
use super::*;
146146

147147
use bytes::Buf;
148-
use datafusion_datasource::{file_meta::FileMeta, file_stream::FileOpenFuture};
148+
use datafusion_datasource::{
149+
file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
150+
};
149151
use futures::StreamExt;
150152
use object_store::{GetResultPayload, ObjectStore};
151153

@@ -155,7 +157,11 @@ mod private {
155157
}
156158

157159
impl FileOpener for AvroOpener {
158-
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
160+
fn open(
161+
&self,
162+
file_meta: FileMeta,
163+
_file: PartitionedFile,
164+
) -> Result<FileOpenFuture> {
159165
let config = Arc::clone(&self.config);
160166
let object_store = Arc::clone(&self.object_store);
161167
Ok(Box::pin(async move {

datafusion/datasource-csv/src/source.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
2929
use datafusion_datasource::file_meta::FileMeta;
3030
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3131
use datafusion_datasource::{
32-
as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation,
32+
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
33+
RangeCalculation,
3334
};
3435

3536
use arrow::csv;
@@ -322,7 +323,11 @@ impl FileOpener for CsvOpener {
322323
/// A,1,2,3,4,5,6,7,8,9\n
323324
/// A},1,2,3,4,5,6,7,8,9\n
324325
/// The lines read would be: [1, 2]
325-
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
326+
fn open(
327+
&self,
328+
file_meta: FileMeta,
329+
_file: PartitionedFile,
330+
) -> Result<FileOpenFuture> {
326331
// `self.config.has_header` controls whether to skip reading the 1st line header
327332
// If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
328333
// partition, then don't skip first line

datafusion/datasource-json/src/source.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion_datasource::file_meta::FileMeta;
3232
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3333
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3434
use datafusion_datasource::{
35-
as_file_source, calculate_range, ListingTableUrl, RangeCalculation,
35+
as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
3636
};
3737
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3838

@@ -176,7 +176,11 @@ impl FileOpener for JsonOpener {
176176
/// are applied to determine which lines to read:
177177
/// 1. The first line of the partition is the line in which the index of the first character >= `start`.
178178
/// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
179-
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
179+
fn open(
180+
&self,
181+
file_meta: FileMeta,
182+
_file: PartitionedFile,
183+
) -> Result<FileOpenFuture> {
180184
let store = Arc::clone(&self.object_store);
181185
let schema = Arc::clone(&self.projected_schema);
182186
let batch_size = self.batch_size;

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ use datafusion_physical_plan::metrics::{
2727
/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
2828
#[derive(Debug, Clone)]
2929
pub struct ParquetFileMetrics {
30+
/// Number of files pruned by partition of file level statistics
31+
/// This often happens at planning time but may happen at execution time
32+
/// if dynamic filters (e.g. from a join) result in additional pruning.
33+
pub files_pruned_statistics: Count,
3034
/// Number of times the predicate could not be evaluated
3135
pub predicate_evaluation_errors: Count,
3236
/// Number of row groups whose bloom filters were checked and matched (not pruned)
@@ -122,7 +126,11 @@ impl ParquetFileMetrics {
122126
.with_new_label("filename", filename.to_string())
123127
.subset_time("metadata_load_time", partition);
124128

129+
let files_pruned_statistics =
130+
MetricBuilder::new(metrics).counter("files_pruned_statistics", partition);
131+
125132
Self {
133+
files_pruned_statistics,
126134
predicate_evaluation_errors,
127135
row_groups_matched_bloom_filter,
128136
row_groups_pruned_bloom_filter,

0 commit comments

Comments
 (0)