Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 327 additions & 8 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::logical_expr::{
Filter, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr};
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;
Expand All @@ -44,6 +44,8 @@ use crate::error::{
};
use crate::extension_plan::{InstantManipulate, Millisecond, RangeManipulate, SeriesNormalize};

const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";

#[derive(Default, Debug, Clone)]
struct PromPlannerContext {
// query parameters
Expand Down Expand Up @@ -139,14 +141,58 @@ impl<S: ContextProvider> PromPlanner<S> {
name: "Prom Unary Expr",
}
.fail()?,
PromExpr::Binary(PromBinaryExpr { lhs, rhs, .. }) => {
let _left_input = self.prom_expr_to_plan(*lhs.clone())?;
let _right_input = self.prom_expr_to_plan(*rhs.clone())?;

UnsupportedExprSnafu {
name: "Prom Binary Expr",
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
match (
Self::try_build_literal_expr(lhs),
Self::try_build_literal_expr(rhs),
) {
// TODO(ruihang): handle literal-only expressions
(Some(_lhs), Some(_rhs)) => UnsupportedExprSnafu {
name: "Literal-only expression",
}
.fail()?,
// lhs is a literal, rhs is a column
(Some(expr), None) => {
let input = self.prom_expr_to_plan(*rhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr.clone()),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(col.into())),
}))
})?
}
// lhs is a column, rhs is a literal
(None, Some(expr)) => {
let input = self.prom_expr_to_plan(*lhs.clone())?;
self.projection_for_each_value_column(input, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(col.into())),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(expr.clone()),
}))
})?
}
// both are columns. join them on time index
(None, None) => {
let left_input = self.prom_expr_to_plan(*lhs.clone())?;
let right_input = self.prom_expr_to_plan(*rhs.clone())?;
let join_plan = self.join_on_time_index(left_input, right_input)?;
self.projection_for_each_value_column(join_plan, |col| {
Ok(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::Column(Column::new(
Some(LEFT_PLAN_JOIN_ALIAS),
col,
))),
op: Self::prom_token_to_binary_op(*op)?,
right: Box::new(DfExpr::Column(Column::new(
self.ctx.table_name.as_ref(),
col,
))),
}))
})?
}
}
.fail()?
}
PromExpr::Paren(ParenExpr { .. }) => UnsupportedExprSnafu {
name: "Prom Paren Expr",
Expand Down Expand Up @@ -511,6 +557,105 @@ impl<S: ContextProvider> PromPlanner<S> {
.collect();
Ok(exprs)
}

/// Try to build a DataFusion Literal Expression from PromQL Expr, return
/// `None` if the input is not a literal expression.
fn try_build_literal_expr(expr: &PromExpr) -> Option<DfExpr> {
match expr {
PromExpr::NumberLiteral(NumberLiteral { val }) => {
let scalar_value = ScalarValue::Float64(Some(*val));
Some(DfExpr::Literal(scalar_value))
}
PromExpr::StringLiteral(StringLiteral { val }) => {
let scalar_value = ScalarValue::Utf8(Some(val.to_string()));
Some(DfExpr::Literal(scalar_value))
}
PromExpr::VectorSelector(_)
| PromExpr::MatrixSelector(_)
| PromExpr::Call(_)
| PromExpr::Aggregate(_)
| PromExpr::Subquery(_) => None,
PromExpr::Paren(ParenExpr { expr }) => Self::try_build_literal_expr(expr),
// TODO(ruihang): support Unary operator
PromExpr::Unary(UnaryExpr { expr, .. }) => Self::try_build_literal_expr(expr),
PromExpr::Binary(PromBinaryExpr { lhs, rhs, op, .. }) => {
let lhs = Self::try_build_literal_expr(lhs)?;
let rhs = Self::try_build_literal_expr(rhs)?;
let op = Self::prom_token_to_binary_op(*op).ok()?;
Some(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(lhs),
op,
right: Box::new(rhs),
}))
}
}
}

fn prom_token_to_binary_op(token: TokenType) -> Result<Operator> {
match token {
token::T_ADD => Ok(Operator::Plus),
token::T_SUB => Ok(Operator::Minus),
token::T_MUL => Ok(Operator::Multiply),
token::T_DIV => Ok(Operator::Divide),
token::T_MOD => Ok(Operator::Modulo),
token::T_EQLC => Ok(Operator::Eq),
token::T_NEQ => Ok(Operator::NotEq),
token::T_GTR => Ok(Operator::Gt),
token::T_LSS => Ok(Operator::Lt),
token::T_GTE => Ok(Operator::GtEq),
token::T_LTE => Ok(Operator::LtEq),
// TODO(ruihang): support these two operators
// token::T_POW => Ok(Operator::Power),
// token::T_ATAN2 => Ok(Operator::Atan2),
_ => UnexpectedTokenSnafu { token }.fail(),
}
}

/// Build a inner join on time index column to concat two logical plans.
/// The left plan will be alised as [`LEFT_PLAN_JOIN_ALIAS`].
fn join_on_time_index(&self, left: LogicalPlan, right: LogicalPlan) -> Result<LogicalPlan> {
let time_index_column = Column::from_name(
self.ctx
.time_index_column
.clone()
.context(TimeIndexNotFoundSnafu { table: "unknown" })?,
);
// Inner Join on time index column to concat two operator
LogicalPlanBuilder::from(left)
.alias(LEFT_PLAN_JOIN_ALIAS)
.context(DataFusionPlanningSnafu)?
.join(
right,
JoinType::Inner,
(vec![time_index_column.clone()], vec![time_index_column]),
None,
)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}

// Build a projection that project and perform operation expr for every value columns.
fn projection_for_each_value_column<F>(
&self,
input: LogicalPlan,
name_to_expr: F,
) -> Result<LogicalPlan>
where
F: Fn(&String) -> Result<DfExpr>,
{
let value_columns = self
.ctx
.value_columns
.iter()
.map(name_to_expr)
.collect::<Result<Vec<_>>>()?;
LogicalPlanBuilder::from(input)
.project(value_columns)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -970,4 +1115,178 @@ mod test {
}

// TODO(ruihang): add range fn tests once exprs are ready.

// {
// input: "some_metric{tag_0="foo"} + some_metric{tag_0="bar"}",
// expected: &BinaryExpr{
// Op: ADD,
// LHS: &VectorSelector{
// Name: "a",
// LabelMatchers: []*labels.Matcher{
// MustLabelMatcher(labels.MatchEqual, "tag_0", "foo"),
// MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
// },
// },
// RHS: &VectorSelector{
// Name: "sum",
// LabelMatchers: []*labels.Matcher{
// MustLabelMatcher(labels.MatchxEqual, "tag_0", "bar"),
// MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "some_metric"),
// },
// },
// VectorMatching: &VectorMatching{},
// },
// },
#[tokio::test]
async fn binary_op_column_column() {
let prom_expr = PromExpr::Binary(PromBinaryExpr {
lhs: Box::new(PromExpr::VectorSelector(VectorSelector {
name: Some("some_metric".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "tag_0".to_string(),
value: "foo".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: METRIC_NAME.to_string(),
value: "some_metric".to_string(),
},
],
},
})),
op: token::T_ADD,
rhs: Box::new(PromExpr::VectorSelector(VectorSelector {
name: Some("some_metric".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "tag_0".to_string(),
value: "bar".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: METRIC_NAME.to_string(),
value: "some_metric".to_string(),
},
],
},
})),
matching: None,
return_bool: false,
});

let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();

let expected = String::from(
"Projection: lhs.field_0 + some_metric.field_0 [lhs.field_0 + some_metric.field_0:Float64;N]\
\n Inner Join: lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 = Utf8(\"foo\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn binary_op_literal_column() {
let prom_expr = PromExpr::Binary(PromBinaryExpr {
lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })),
op: token::T_ADD,
rhs: Box::new(PromExpr::VectorSelector(VectorSelector {
name: Some("some_metric".to_owned()),
offset: None,
start_or_end: None,
label_matchers: Matchers {
matchers: vec![
Matcher {
op: MatchOp::Equal,
name: "tag_0".to_string(),
value: "bar".to_string(),
},
Matcher {
op: MatchOp::Equal,
name: METRIC_NAME.to_string(),
value: "some_metric".to_string(),
},
],
},
})),
matching: None,
return_bool: false,
});

let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan = PromPlanner::stmt_to_plan(eval_stmt, context_provider).unwrap();

let expected = String::from(
"Projection: Float64(1) + some_metric.field_0 [Float64(1) + some_metric.field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n Filter: tag_0 = Utf8(\"bar\") AND timestamp >= TimestampMillisecond(0, None) AND timestamp <= TimestampMillisecond(100000000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(0, None), timestamp <= TimestampMillisecond(100000000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
);

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn binary_op_literal_literal() {
let prom_expr = PromExpr::Binary(PromBinaryExpr {
lhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })),
op: token::T_ADD,
rhs: Box::new(PromExpr::NumberLiteral(NumberLiteral { val: 1.0 })),
matching: None,
return_bool: false,
});

let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let context_provider = build_test_context_provider("some_metric".to_string(), 1, 1).await;
let plan_result = PromPlanner::stmt_to_plan(eval_stmt, context_provider);
assert!(plan_result.is_err());
}
}