Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{EquivalenceProperties, conjunction};
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
Expand All @@ -54,7 +54,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;

#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use itertools::Itertools;
use object_store::ObjectStore;
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -288,9 +288,6 @@ pub struct ParquetSource {
pub(crate) projection: ProjectionExprs,
#[cfg(feature = "parquet_encryption")]
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
/// The ordering of data within the files
/// This is set by FileScanConfig when it knows the file ordering
file_ordering: Option<LexOrdering>,
/// If true, read files in reverse order and reverse row groups within files.
/// But it's not guaranteed that rows within row groups are in reverse order,
/// so we still need to sort them after reading, so the reverse scan is inexact.
Expand Down Expand Up @@ -320,7 +317,6 @@ impl ParquetSource {
metadata_size_hint: None,
#[cfg(feature = "parquet_encryption")]
encryption_factory: None,
file_ordering: None,
reverse_row_groups: false,
}
}
Expand Down Expand Up @@ -397,12 +393,6 @@ impl ParquetSource {
self
}

/// If set, indicates the ordering of data within the files being read.
pub fn with_file_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
self.file_ordering = ordering;
self
}

/// Return the value described in [`Self::with_pushdown_filters`]
pub(crate) fn pushdown_filters(&self) -> bool {
self.table_parquet_options.global.pushdown_filters
Expand Down Expand Up @@ -769,44 +759,61 @@ impl FileSource for ParquetSource {
fn try_reverse_output(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> datafusion_common::Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
// Check if we have file ordering information
let file_ordering = match &self.file_ordering {
Some(ordering) => ordering,
None => return Ok(SortOrderPushdownResult::Unsupported),
};

// Create a LexOrdering from the requested order to use the is_reverse method
let Some(requested_ordering) = LexOrdering::new(order.to_vec()) else {
// Empty ordering requested, cannot optimize
if order.is_empty() {
return Ok(SortOrderPushdownResult::Unsupported);
};
}

// Check if reversing the file ordering would satisfy the requested ordering
if file_ordering.is_reverse(&requested_ordering) {
// Phase 1: Enable reverse row group scanning
let new_source = self.clone().with_reverse_row_groups(true);
// Build new equivalence properties with the reversed ordering.
// This allows us to check if the reversed ordering satisfies the request
// by leveraging:
// - Function monotonicity (e.g., extract_year_month preserves ordering)
// - Constant columns (from filters)
// - Other equivalence relationships
//
// Example flow:
// 1. File ordering: [extract_year_month(ws) DESC, ws DESC]
// 2. After reversal: [extract_year_month(ws) ASC, ws ASC]
// 3. Requested: [ws ASC]
// 4. Through extract_year_month's monotonicity property, the reversed
// ordering satisfies [ws ASC] even though it has additional prefix
let reversed_eq_properties = {
let mut new = eq_properties.clone();
new.clear_orderings();

// Reverse each ordering in the equivalence properties
let reversed_orderings = eq_properties
.oeq_class()
.iter()
.map(|ordering| {
ordering
.iter()
.map(|expr| expr.reverse())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

new.add_orderings(reversed_orderings);
new
};

// Return Inexact because we're only reversing row group order,
// not guaranteeing perfect row-level ordering
return Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
});
// Check if the reversed ordering satisfies the requested ordering
if !reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
return Ok(SortOrderPushdownResult::Unsupported);
}

// Return Inexact because we're only reversing row group order,
// not guaranteeing perfect row-level ordering
let new_source = self.clone().with_reverse_row_groups(true);
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})

// TODO Phase 2: Add support for other optimizations:
// - File reordering based on min/max statistics
// - Detection of exact ordering (return Exact to remove Sort operator)
// - Partial sort pushdown for prefix matches

Ok(SortOrderPushdownResult::Unsupported)
}

fn with_file_ordering_info(
&self,
ordering: Option<LexOrdering>,
) -> datafusion_common::Result<Arc<dyn FileSource>> {
Ok(Arc::new(self.clone().with_file_ordering(ordering)))
}
}

Expand Down
53 changes: 33 additions & 20 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, not_impl_err};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
Expand Down Expand Up @@ -150,15 +150,46 @@ pub trait FileSource: Send + Sync {

/// Try to create a new FileSource that can produce data in the specified sort order.
///
/// This method attempts to optimize data retrieval to match the requested ordering.
/// It receives both the requested ordering and equivalence properties that describe
/// relationships between expressions (e.g., constant columns, monotonic functions).
///
/// # Parameters
/// * `order` - The requested sort ordering
/// * `eq_properties` - Equivalence properties that can be used to determine if a reversed
Comment thread
zhuqi-lucas marked this conversation as resolved.
Outdated
/// ordering satisfies the request. This includes information about:
/// - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
/// - Monotonic functions (e.g., `extract_year_month(timestamp)`)
/// - Other equivalence relationships
///
/// # Examples
///
/// ## Example 1: Simple reverse
/// ```text
/// File ordering: [a ASC, b DESC]
/// Requested: [a DESC]
/// Reversed file: [a DESC, b ASC]
/// Result: Satisfies request (prefix match) → Inexact
/// ```
///
/// ## Example 2: Monotonic function
/// ```text
/// File ordering: [extract_year_month(ts) ASC, ts ASC]
/// Requested: [ts DESC]
/// Reversed file: [extract_year_month(ts) DESC, ts DESC]
/// Result: Through monotonicity, satisfies [ts DESC] → Inexact
/// ```
///
/// # Returns
/// * `Exact` - Created a source that guarantees perfect ordering
/// * `Inexact` - Created a source optimized for ordering (e.g., reordered files) but not perfectly sorted
/// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
/// * `Unsupported` - Cannot optimize for this ordering
///
/// Default implementation returns `Unsupported`.
fn try_reverse_output(
&self,
_order: &[PhysicalSortExpr],
_eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
Expand Down Expand Up @@ -192,24 +223,6 @@ pub trait FileSource: Send + Sync {
Ok(None)
}

/// Set the file ordering information
///
/// This allows the file source to know how the files are sorted,
/// enabling it to make informed decisions about sort pushdown.
///
/// # Default Implementation
///
/// Returns `not_impl_err!`. FileSource implementations that support
/// sort optimization should override this method.
fn with_file_ordering_info(
&self,
_ordering: Option<LexOrdering>,
) -> Result<Arc<dyn FileSource>> {
// Default: clone self without modification
// ParquetSource will override this
not_impl_err!("with_file_ordering_info not implemented for this FileSource")
}

/// Deprecated: Set optional schema adapter factory.
///
/// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
Expand Down
19 changes: 3 additions & 16 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,23 +829,10 @@ impl DataSource for FileScanConfig {
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
let file_ordering = self.output_ordering.first().cloned();

if file_ordering.is_none() {
return Ok(SortOrderPushdownResult::Unsupported);
}

// Use the trait method instead of downcasting
// Try to provide file ordering info to the source
// If not supported (e.g., CsvSource), fall back to original source
let file_source_with_ordering = self
// Delegate to FileSource to check if reverse scanning can satisfy the request.
let pushdown_result = self
.file_source
.with_file_ordering_info(file_ordering)
.unwrap_or_else(|_| Arc::clone(&self.file_source));

// Try to reverse the datasource with ordering info,
// and currently only ParquetSource supports it with inexact reverse with row groups.
let pushdown_result = file_source_with_ordering.try_reverse_output(order)?;
.try_reverse_output(order, &self.eq_properties())?;

match pushdown_result {
SortOrderPushdownResult::Exact { inner } => {
Expand Down
24 changes: 23 additions & 1 deletion datafusion/physical-plan/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use crate::filter_pushdown::{
use crate::projection::ProjectionExec;
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
SendableRecordBatchStream, SortOrderPushdownResult,
};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
Expand All @@ -91,6 +91,7 @@ use datafusion_execution::TaskContext;

use crate::execution_plan::SchedulingType;
use crate::stream::RecordBatchStreamAdapter;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use futures::{Stream, StreamExt};

/// A stream that passes record batches through unchanged while cooperating with the Tokio runtime.
Expand Down Expand Up @@ -328,6 +329,27 @@ impl ExecutionPlan for CooperativeExec {
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}

fn try_pushdown_sort(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good improvement but is orthogonal to the change to use EqProperties (just noting)

&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let child = self.input();

match child.try_pushdown_sort(order)? {
SortOrderPushdownResult::Exact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
}
SortOrderPushdownResult::Inexact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
}
SortOrderPushdownResult::Unsupported => {
Ok(SortOrderPushdownResult::Unsupported)
}
}
}
}

/// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`].
Expand Down
27 changes: 25 additions & 2 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::expressions::{Column, Literal};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
};
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
Expand All @@ -51,7 +51,9 @@ use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::projection::Projector;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortExpr,
};
// Re-exported from datafusion-physical-expr for backwards compatibility
// We recommend updating your imports to use datafusion-physical-expr directly
pub use datafusion_physical_expr::projection::{
Expand Down Expand Up @@ -357,6 +359,27 @@ impl ExecutionPlan for ProjectionExec {
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}

fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the sort depends on the output of the projection? Presumably then the projection should say the sort can't be pushed down?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @adriangb ! I will try to address this and add more tests.

) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let child = self.input();

match child.try_pushdown_sort(order)? {
SortOrderPushdownResult::Exact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
}
SortOrderPushdownResult::Inexact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
}
SortOrderPushdownResult::Unsupported => {
Ok(SortOrderPushdownResult::Unsupported)
}
}
}
}

impl ProjectionStream {
Expand Down
Loading