Skip to content

Add late pruning of Parquet files based on file level statistics #16014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow_ipc::reader::FileDecoder;
use datafusion_common::Statistics;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::PartitionedFile;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

use futures::StreamExt;
Expand Down Expand Up @@ -121,7 +122,11 @@ pub struct ArrowOpener {
}

impl FileOpener for ArrowOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
let object_store = Arc::clone(&self.object_store);
let projection = self.projection.clone();
Ok(Box::pin(async move {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ mod tests {
.build();

// Add partition columns
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
config.table_partition_cols =
vec![Arc::new(Field::new("date", DataType::Utf8, false))];
config.file_groups[0][0].partition_values = vec![ScalarValue::from("2021-10-26")];

// We should be able to project on the partition column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ pub struct TestOpener {
}

impl FileOpener for TestOpener {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
fn open(
&self,
_file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
let mut batches = self.batches.clone();
if let Some(batch_size) = self.batch_size {
let batch = concat_batches(&batches[0].schema(), &batches)?;
Expand Down
10 changes: 8 additions & 2 deletions datafusion/datasource-avro/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ mod private {
use super::*;

use bytes::Buf;
use datafusion_datasource::{file_meta::FileMeta, file_stream::FileOpenFuture};
use datafusion_datasource::{
file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
};
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};

Expand All @@ -155,7 +157,11 @@ mod private {
}

impl FileOpener for AvroOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
let object_store = Arc::clone(&self.object_store);
Ok(Box::pin(async move {
Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation,
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
RangeCalculation,
};

use arrow::csv;
Expand Down Expand Up @@ -322,7 +323,11 @@ impl FileOpener for CsvOpener {
/// A,1,2,3,4,5,6,7,8,9\n
/// A},1,2,3,4,5,6,7,8,9\n
/// The lines read would be: [1, 2]
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
// `self.config.has_header` controls whether to skip reading the 1st line header
// If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
// partition, then don't skip first line
Expand Down
8 changes: 6 additions & 2 deletions datafusion/datasource-json/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
as_file_source, calculate_range, ListingTableUrl, RangeCalculation,
as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

Expand Down Expand Up @@ -176,7 +176,11 @@ impl FileOpener for JsonOpener {
/// are applied to determine which lines to read:
/// 1. The first line of the partition is the line in which the index of the first character >= `start`.
/// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
let store = Arc::clone(&self.object_store);
let schema = Arc::clone(&self.projected_schema);
let batch_size = self.batch_size;
Expand Down
8 changes: 8 additions & 0 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ use datafusion_physical_plan::metrics::{
/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
#[derive(Debug, Clone)]
pub struct ParquetFileMetrics {
/// Number of files pruned by partition of file level statistics
/// This often happens at planning time but may happen at execution time
/// if dynamic filters (e.g. from a join) result in additional pruning.
pub files_pruned_statistics: Count,
/// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: Count,
/// Number of row groups whose bloom filters were checked and matched (not pruned)
Expand Down Expand Up @@ -122,7 +126,11 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.subset_time("metadata_load_time", partition);

let files_pruned_statistics =
MetricBuilder::new(metrics).counter("files_pruned_statistics", partition);

Self {
files_pruned_statistics,
predicate_evaluation_errors,
row_groups_matched_bloom_filter,
row_groups_pruned_bloom_filter,
Expand Down
Loading