Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.80"

[dependencies]
arrow = "55"
arrow-schema = "55"
arrow = "55.1.0"
arrow-schema = "55.1.0"
async-trait = "0.1"
dashmap = "6"
datafusion = "47"
datafusion-common = "47"
datafusion-expr = "47"
datafusion-functions = "47"
datafusion-functions-aggregate = "47"
datafusion-optimizer = "47"
datafusion-physical-expr = "47"
datafusion-physical-plan = "47"
datafusion-sql = "47"
datafusion = "48"
datafusion-common = "48"
datafusion-expr = "48"
datafusion-functions = "48"
datafusion-functions-aggregate = "48"
datafusion-optimizer = "48"
datafusion-physical-expr = "48"
datafusion-physical-plan = "48"
datafusion-sql = "48"
futures = "0.3"
itertools = "0.14"
log = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl TableFunctionImpl for StaleFilesUdtf {
/// Extract table name from args passed to TableFunctionImpl::call()
fn get_table_name(args: &[Expr]) -> Result<&String> {
match &args[0] {
Expr::Literal(ScalarValue::Utf8(Some(table_name))) => Ok(table_name),
Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) => Ok(table_name),
_ => Err(DataFusionError::Plan(
"expected a single string literal argument to mv_dependencies".to_string(),
)),
Expand Down
9 changes: 8 additions & 1 deletion src/rewrite/exploitation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,14 @@ impl ExecutionPlan for OneOfExec {
}

fn statistics(&self) -> Result<datafusion_common::Statistics> {
self.candidates[self.best].statistics()
self.candidates[self.best].partition_statistics(None)
}

fn partition_statistics(
&self,
partition: Option<usize>,
) -> Result<datafusion_common::Statistics> {
self.candidates[self.best].partition_statistics(partition)
}
}

Expand Down
23 changes: 14 additions & 9 deletions src/rewrite/normal_form.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ impl Predicate {
self.eq_classes[idx].columns.insert(c2.clone());
}
(Some(&i), Some(&j)) => {
if i == j {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF48 will generate the following plan:

+--------------+---------------------------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                                            |
+--------------+---------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: t1.column1, t1.column2                                                                                              |
|              |   Filter: t1.column3 = t1.column1 AND t1.column1 >= Utf8("2022")                                                                |
|              |     TableScan: t1 projection=[column1, column2, column3], partial_filters=[t1.column3 = t1.column1, t1.column1 >= Utf8("2022")] |
+--------------+---------------------------------------------------------------------------------------------------------------------------------+

That is, t1.column3 = t1.column1 will be processed twice, and the second time they have the same index -> 0. If we don't add the fast return, it'll make column2's index 0, which will break the test (I guess we didn't consider the i==j case before)

// The two columns are already in the same equivalence class.
return Ok(());
}
// We need to merge two existing column eq classes.

// Delete the eq class with a larger index,
Expand Down Expand Up @@ -593,15 +597,15 @@ impl Predicate {
/// Add a binary expression to our collection of filters.
fn insert_binary_expr(&mut self, left: &Expr, op: Operator, right: &Expr) -> Result<()> {
match (left, op, right) {
(Expr::Column(c), op, Expr::Literal(v)) => {
(Expr::Column(c), op, Expr::Literal(v, _)) => {
if let Err(e) = self.add_range(c, &op, v) {
// Add a range can fail in some cases, so just fallthrough
log::debug!("failed to add range filter: {e}");
} else {
return Ok(());
}
}
(Expr::Literal(_), op, Expr::Column(_)) => {
(Expr::Literal(_, _), op, Expr::Column(_)) => {
if let Some(swapped) = op.swap() {
return self.insert_binary_expr(right, swapped, left);
}
Expand Down Expand Up @@ -714,22 +718,22 @@ impl Predicate {
extra_range_filters.push(Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(other_column.clone())),
op: Operator::Eq,
right: Box::new(Expr::Literal(range.lower().clone())),
right: Box::new(Expr::Literal(range.lower().clone(), None)),
}))
} else {
if !range.lower().is_null() {
extra_range_filters.push(Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(other_column.clone())),
op: Operator::GtEq,
right: Box::new(Expr::Literal(range.lower().clone())),
right: Box::new(Expr::Literal(range.lower().clone(), None)),
}))
}

if !range.upper().is_null() {
extra_range_filters.push(Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(other_column.clone())),
op: Operator::LtEq,
right: Box::new(Expr::Literal(range.upper().clone())),
right: Box::new(Expr::Literal(range.upper().clone(), None)),
}))
}
}
Expand Down Expand Up @@ -984,7 +988,8 @@ mod test {
let ctx = SessionContext::new_with_config(
SessionConfig::new()
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
.set_bool("datafusion.explain.logical_plan_only", true),
.set_bool("datafusion.explain.logical_plan_only", true)
.set_bool("datafusion.sql_parser.map_varchar_to_utf8view", false),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF48 made the config true by default, so it'll break the test_rewrite test:

TestCase {
    name: "range filter + equality predicate",
    base:
        "SELECT column1, column2 FROM t1 WHERE column1 = column3 AND column1 >= '2022'",
    query:
    // Since column1 = column3 in the original view,
    // we are allowed to substitute column1 for column3 and vice versa.
        "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'",
},

->

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                                                                                  |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: t1.column1, t1.column2                                                                                                                    |
|              |   Filter: t1.column1 = CAST(t1.column3 AS Utf8View) AND t1.column1 >= Utf8View("2022")                                                                |
|              |     TableScan: t1 projection=[column1, column2, column3], partial_filters=[t1.column1 = CAST(t1.column3 AS Utf8View), t1.column1 >= Utf8View("2022")] |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+

Currently, the insert_binary_expr method in Predicate can't infer eq class from t1.column1 = CAST(t1.column3 AS Utf8View).

t1 is from here:

CREATE EXTERNAL TABLE t1 (
    column1 VARCHAR, 
    column2 BIGINT, 
    column3 CHAR
)

So I'm wondering if CHAR also should be mapped to utf8view.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue in upstream: apache/datafusion#16277

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unfortunate, it may be possible to generalize the equivalence class logic to handle certain types of casts (particularly invertible ones). It would probably be somewhat complex to implement though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can add this to our backlist.

);

let t1_path = tempdir()?;
Expand All @@ -996,11 +1001,11 @@ mod test {
ctx.sql(&format!(
"
CREATE EXTERNAL TABLE t1 (
column1 VARCHAR,
column2 BIGINT,
column1 VARCHAR,
column2 BIGINT,
column3 CHAR
)
STORED AS PARQUET
STORED AS PARQUET
LOCATION '{}'",
t1_path.path().to_string_lossy()
))
Expand Down