-
Notifications
You must be signed in to change notification settings - Fork 10
Description
This is a bit of a weird corner case, but imagine I have the following fragment:
SELECT *
FROM dummy_fun_1() AS a
LEFT JOIN dummy_fun_2() AS bwhere dummy_fun_1 and dummy_fun_2 are table-valued UDFs.
If dummy_fun_1() fails before ever providing any values, dummy_fun_2() is still executed even after dummy_fun_1() fails.
Concretely:
use partiql_catalog::call_defs::{CallDef, CallSpec, CallSpecArg};
use partiql_catalog::{Extension, TableFunction};
#[derive(Debug)]
struct DummyCatalogExtensions;
impl Extension for DummyCatalogExtensions {
fn name(&self) -> String {
"dummy".to_string()
}
fn load(
&self,
catalog: &mut dyn partiql_catalog::Catalog,
) -> Result<(), Box<dyn std::error::Error>> {
catalog.add_table_function(TableFunction::new(Box::new(DummyFun1::new())))?;
catalog.add_table_function(TableFunction::new(Box::new(DummyFun2::new())))?;
Ok(())
}
}
#[derive(Debug)]
struct DummyFun1 {
call_def: CallDef,
}
impl DummyFun1 {
fn new() -> Self {
Self {
call_def: CallDef {
names: vec!["dummy_fun_1"],
overloads: vec![CallSpec {
input: vec![CallSpecArg::Positional],
output: Box::new(|args| {
partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
name: partiql_logical::CallName::ByName("dummy_fun_1".to_string()),
arguments: args,
})
}),
}],
},
}
}
}
impl partiql_catalog::BaseTableFunctionInfo for DummyFun1 {
fn call_def(&self) -> &CallDef {
&self.call_def
}
fn plan_eval(&self) -> Box<dyn partiql_catalog::BaseTableExpr> {
Box::new(DummyFun1Eval)
}
}
#[derive(Debug)]
struct DummyFun1Eval;
impl partiql_catalog::BaseTableExpr for DummyFun1Eval {
fn evaluate<'c>(
&self,
args: &[std::borrow::Cow<'_, Value>],
ctx: &'c dyn partiql_catalog::context::SessionContext<'c>,
) -> partiql_catalog::BaseTableExprResult<'c> {
println!("I'm about to end this plan's entire career");
Err("this is an error".into())
}
}
#[derive(Debug)]
struct DummyFun2 {
call_def: CallDef,
}
impl DummyFun2 {
fn new() -> Self {
Self {
call_def: CallDef {
names: vec!["dummy_fun_2"],
overloads: vec![CallSpec {
input: vec![CallSpecArg::Positional],
output: Box::new(|args| {
partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
name: partiql_logical::CallName::ByName("dummy_fun_2".to_string()),
arguments: args,
})
}),
}],
},
}
}
}
impl partiql_catalog::BaseTableFunctionInfo for DummyFun2 {
fn call_def(&self) -> &CallDef {
&self.call_def
}
fn plan_eval(&self) -> Box<dyn partiql_catalog::BaseTableExpr> {
Box::new(DummyFun2Eval)
}
}
#[derive(Debug)]
struct DummyFun2Eval;
impl partiql_catalog::BaseTableExpr for DummyFun2Eval {
fn evaluate<'c>(
&self,
args: &[std::borrow::Cow<'_, Value>],
ctx: &'c dyn partiql_catalog::context::SessionContext<'c>,
) -> partiql_catalog::BaseTableExprResult<'c> {
println!("Hello from dummy fun 2!");
Err("some error in dummy fun 2".into())
}
}
#[test]
fn left_join_evaluates_both_branches_even_if_left_branch_fails() {
let mut logical_plan = LogicalPlan::new();
let join_op_id =
logical_plan.add_operator(partiql_logical::BindingsOp::Join(partiql_logical::Join {
kind: partiql_logical::JoinKind::Left,
left: Box::new(partiql_logical::BindingsOp::Scan(partiql_logical::Scan {
expr: partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
name: partiql_logical::CallName::ByName("dummy_fun_1".to_string()),
arguments: vec![],
}),
as_key: "_1".to_string(),
at_key: None,
})),
right: Box::new(partiql_logical::BindingsOp::Scan(partiql_logical::Scan {
expr: partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
name: partiql_logical::CallName::ByName("dummy_fun_2".to_string()),
arguments: vec![],
}),
as_key: "_2".to_string(),
at_key: None,
})),
on: None,
}));
let sink_op_id = logical_plan.add_operator(partiql_logical::BindingsOp::Sink);
logical_plan.add_flow(join_op_id, sink_op_id);
// I've omitted the `SELECT *` from here, but it doesn't change things
let bindings = MapBindings::default();
let mut catalog = PartiqlCatalog::default();
DummyCatalogExtensions.load(&mut catalog).unwrap();
let mut planner = plan::EvaluatorPlanner::new(EvaluationMode::Strict, &catalog);
let mut plan = planner.compile(&logical_plan).expect("Expect no plan error");
println!("{}", plan.to_dot_graph());
let sys = SystemContext {
now: DateTime::from_system_now_utc(),
};
let ctx = BasicContext::new(bindings, sys);
let _ = dbg!(plan.execute_mut(&ctx));
}Pasting this into one of the test modules in the partiql-logical-planner subcrate and running:
$ cargo test -p partiql-logical-planner -- left_join_evaluates_both_branches_even_if_left_branch_fails --nocapture
<snip>
Running unittests src/lib.rs (target/debug/deps/partiql_logical_planner-ffc8a2604465b618)
running 1 test
digraph {
0 [ label = "Left JOIN" ]
1 [ label = "SINK" ]
0 -> 1 [ label = "0" ]
}
I'm about to end this plan's entire career
Hello from dummy fun 2!
[partiql-logical-planner/src/lib.rs:300:17] plan.execute_mut(&ctx) = Err(
EvalErr {
errors: [
ExtensionResultError(
"this is an error",
),
ExtensionResultError(
"some error in dummy fun 2",
),
],
},
)
test tests::left_join_evaluates_both_branches_even_if_left_branch_fails ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 9 filtered out; finished in 0.00s
Doc-tests partiql-logical-planner
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00sI don't think this should happen -- I would expect that plan execution would halt after the first failure instead barrelling onwards. Worse yet, dummy_fun_2() will get passed arguments it may not be expecting if dummy_fun_2()'s arguments depend on the output from dummy_fun_1(), as all of those arguments are set to MISSING.
I haven't tried this with other kinds of joins, but I suspect there are probably similar issues there.