Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr::{conjunction, EquivalenceProperties};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -769,37 +769,64 @@ impl FileSource for ParquetSource {
fn try_reverse_output(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
// Check if we have file ordering information
let file_ordering = match &self.file_ordering {
Some(ordering) => ordering,
None => return Ok(SortOrderPushdownResult::Unsupported),
};

// Create a LexOrdering from the requested order to use the is_reverse method
let Some(requested_ordering) = LexOrdering::new(order.to_vec()) else {
// Empty ordering requested, cannot optimize
if order.is_empty() {
return Ok(SortOrderPushdownResult::Unsupported);
};
}

// Check if reversing the file ordering would satisfy the requested ordering
if file_ordering.is_reverse(&requested_ordering) {
// Phase 1: Enable reverse row group scanning
let new_source = self.clone().with_reverse_row_groups(true);
// Create the reversed file ordering by flipping sort directions.
// This handles complex orderings including monotonic functions.
//
// Example:
// File ordering: [extract_year_month(ws) DESC, ws DESC]
// Reversed: [extract_year_month(ws) ASC, ws ASC]
let reversed_file_ordering: Vec<PhysicalSortExpr> = file_ordering
.iter()
.map(|expr| expr.clone().reverse())
.collect();

// Return Inexact because we're only reversing row group order,
// not guaranteeing perfect row-level ordering
return Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
});
// Build new equivalence properties with the reversed ordering.
// This allows us to check if the reversed ordering satisfies the request
// by leveraging:
// - Function monotonicity (e.g., extract_year_month preserves ordering)
// - Constant columns (from filters)
// - Other equivalence relationships
//
// Example flow:
// 1. File ordering: [extract_year_month(ws) DESC, ws DESC]
// 2. After reversal: [extract_year_month(ws) ASC, ws ASC]
// 3. Requested: [ws ASC]
// 4. Through extract_year_month's monotonicity property, the reversed
// ordering satisfies [ws ASC] even though it has additional prefix
let mut reversed_eq_properties = eq_properties.clone();
reversed_eq_properties.clear_orderings();
reversed_eq_properties.add_ordering(reversed_file_ordering.clone());

// Check if the reversed file ordering satisfies the requested ordering.
// This uses ordering_satisfy which internally:
// - Normalizes both orderings (removes constants)
// - Checks monotonic functions
// - Handles prefix matching
if !reversed_eq_properties.ordering_satisfy(order.to_vec())? {
return Ok(SortOrderPushdownResult::Unsupported);
}

// TODO Phase 2: Add support for other optimizations:
// - File reordering based on min/max statistics
// - Detection of exact ordering (return Exact to remove Sort operator)
// - Partial sort pushdown for prefix matches

Ok(SortOrderPushdownResult::Unsupported)
// The reversed ordering satisfies the request - enable reverse row group scanning.
// Note: This returns "Inexact" because we only reverse the order of row groups,
// not the rows within each row group. A Sort operator may still be needed
// upstream for perfect ordering, but this optimization helps with:
// - Limit pushdown (get first N results faster)
// - Reducing sort cost (partially sorted input)
let new_source = self.clone().with_reverse_row_groups(true);
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})
}

fn with_file_ordering_info(
Expand Down
37 changes: 34 additions & 3 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, not_impl_err};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
Expand Down Expand Up @@ -150,15 +150,46 @@ pub trait FileSource: Send + Sync {

/// Try to create a new FileSource that can produce data in the specified sort order.
///
/// This method attempts to optimize data retrieval to match the requested ordering.
/// It receives both the requested ordering and equivalence properties that describe
/// relationships between expressions (e.g., constant columns, monotonic functions).
///
/// # Parameters
/// * `order` - The requested sort ordering
/// * `eq_properties` - Equivalence properties that can be used to determine if a reversed
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
/// ordering satisfies the request. This includes information about:
/// - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
/// - Monotonic functions (e.g., `extract_year_month(timestamp)`)
/// - Other equivalence relationships
///
/// # Examples
///
/// ## Example 1: Simple reverse
/// ```text
/// File ordering: [a ASC, b DESC]
/// Requested: [a DESC]
/// Reversed file: [a DESC, b ASC]
/// Result: Satisfies request (prefix match) → Inexact
/// ```
///
/// ## Example 2: Monotonic function
/// ```text
/// File ordering: [extract_year_month(ts) ASC, ts ASC]
/// Requested: [ts DESC]
/// Reversed file: [extract_year_month(ts) DESC, ts DESC]
/// Result: Through monotonicity, satisfies [ts DESC] → Inexact
/// ```
///
/// # Returns
/// * `Exact` - Created a source that guarantees perfect ordering
/// * `Inexact` - Created a source optimized for ordering (e.g., reordered files) but not perfectly sorted
/// * `Exact` - Created a source that guarantees perfect ordering (e.g., file reordering)
/// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
/// * `Unsupported` - Cannot optimize for this ordering
///
/// Default implementation returns `Unsupported`.
fn try_reverse_output(
&self,
_order: &[PhysicalSortExpr],
_eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
Expand Down
40 changes: 34 additions & 6 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,25 +835,53 @@ impl DataSource for FileScanConfig {
return Ok(SortOrderPushdownResult::Unsupported);
}

// Use the trait method instead of downcasting
// Try to provide file ordering info to the source
// If not supported (e.g., CsvSource), fall back to original source
// Build equivalence properties which include:
// - Orderings from file schema
// - Constant columns from filters (e.g., ticker = 'AAPL')
// - Equivalence relationships
let eq_properties = self.eq_properties();

// First check if the requested ordering is already satisfied in the forward direction.
// ordering_satisfy internally:
// - Normalizes the ordering (removes constant columns)
// - Checks monotonic functions
// - Handles prefix matching
//
// This handles cases where:
// - Exact match: [a ASC] requested, [a ASC, b DESC] available
// - Through monotonicity: [ws DESC] requested, [extract_year_month(ws) DESC, ws DESC] available
// - Through constants: [a ASC] requested, but 'a' is constant (from WHERE a = 5)
if eq_properties.ordering_satisfy(order.to_vec())? {
// Already satisfied - no optimization needed
return Ok(SortOrderPushdownResult::Unsupported);
}

// Provide the file ordering to FileSource so it has context
// about what ordering the files naturally provide.
let file_source_with_ordering = self
.file_source
.with_file_ordering_info(file_ordering)
.unwrap_or_else(|_| Arc::clone(&self.file_source));

// Try to reverse the datasource with ordering info,
// and currently only ParquetSource supports it with inexact reverse with row groups.
let pushdown_result = file_source_with_ordering.try_reverse_output(order)?;
// Delegate to FileSource to check if reverse scanning can satisfy the request.
// FileSource will:
// 1. Reverse the file ordering
// 2. Use eq_properties.ordering_satisfy() to check if reversed ordering works
// 3. Return Inexact/Exact if it can help, Unsupported otherwise
let pushdown_result = file_source_with_ordering
.try_reverse_output(order, &eq_properties)?;

match pushdown_result {
SortOrderPushdownResult::Exact { inner } => {
// FileSource guarantees exact ordering.
// Rebuild FileScanConfig with reversed file groups.
Ok(SortOrderPushdownResult::Exact {
inner: self.rebuild_with_source(inner, true)?,
})
}
SortOrderPushdownResult::Inexact { inner } => {
// FileSource provides approximate ordering (e.g., reversed row groups).
// A Sort operator will still be needed, but this helps with limit pushdown.
Ok(SortOrderPushdownResult::Inexact {
inner: self.rebuild_with_source(inner, false)?,
})
Expand Down
24 changes: 23 additions & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use itertools::Itertools;

use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
};
use crate::coalesce::LimitedBatchCoalescer;
use crate::coalesce::PushBatchStatus::LimitReached;
Expand Down Expand Up @@ -65,6 +65,7 @@ use datafusion_physical_expr::{
};

use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down Expand Up @@ -603,6 +604,27 @@ impl ExecutionPlan for FilterExec {
})
}

fn try_pushdown_sort(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A big picture question on this one: is it beneficial to push down sort order past a filter? If it's "free" of course. But if there's runtime cost to re-ordering rows, it may be cheaper to filter then re-order (if the filter is very selective).

I think in practice, for Parquet, everything ends up happening in the scan itself and filters always get applied first, so this is inconsequential. But in theory it could hurt data sources that e.g. don't support filter pushdown and hence rely on a FilterExec.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @adriangb for review, i agree, but currently i think it's good, i only add the following 3 basic cases, and don't add specific sort pushdown, just bypass to child sort pushdown to make it work:

FilterExec::try_pushdown_sort 
ProjectionExec::try_pushdown_sort 
CooperativeExec::try_pushdown_sort 

And they all just bypass self to the child sort pushdown, i was testing and also the new slt testing here, it seems need this, and they are the basic cases i think.

I also created a issue for the following operators to add more try_pushdown_sort implementation, we can investigate more for it:

#19394

&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let child = self.input();

match child.try_pushdown_sort(order)? {
SortOrderPushdownResult::Exact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
}
SortOrderPushdownResult::Inexact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
}
SortOrderPushdownResult::Unsupported => {
Ok(SortOrderPushdownResult::Unsupported)
}
}
}

fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(Self {
predicate: Arc::clone(&self.predicate),
Expand Down
27 changes: 25 additions & 2 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::expressions::{Column, Literal};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
};
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
Expand All @@ -51,7 +51,9 @@ use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::projection::Projector;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortExpr,
};
// Re-exported from datafusion-physical-expr for backwards compatibility
// We recommend updating your imports to use datafusion-physical-expr directly
pub use datafusion_physical_expr::projection::{
Expand Down Expand Up @@ -357,6 +359,27 @@ impl ExecutionPlan for ProjectionExec {
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}

fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the sort depends on the output of the projection? Presumably then the projection should say the sort can't be pushed down?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @adriangb ! I will try to address this and add more tests.

) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let child = self.input();

match child.try_pushdown_sort(order)? {
SortOrderPushdownResult::Exact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
}
SortOrderPushdownResult::Inexact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
}
SortOrderPushdownResult::Unsupported => {
Ok(SortOrderPushdownResult::Unsupported)
}
}
}
}

impl ProjectionStream {
Expand Down
Loading
Loading