Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use query::parser::PromQuery;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
Expand Down Expand Up @@ -85,6 +85,7 @@ impl GrpcQueryHandler for Instance {
start: promql.start,
end: promql.end,
step: promql.step,
lookback: DEFAULT_LOOKBACK_STRING.to_string(),
Comment thread
tisonkun marked this conversation as resolved.
Outdated
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
Expand Down
9 changes: 8 additions & 1 deletion src/operator/src/statement/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_query::Output;
use common_telemetry::tracing;
use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand All @@ -37,12 +37,16 @@ impl StatementExecutor {
end: eval.end,
step: eval.step,
query: eval.query,
lookback: eval.lookback.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
Comment thread
etolbakov marked this conversation as resolved.
};
QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)?
}
Tql::Explain(explain) => {
let promql = PromQuery {
query: explain.query,
lookback: explain
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
..PromQuery::default()
};
let explain_node_name = if explain.is_verbose {
Expand All @@ -63,6 +67,9 @@ impl StatementExecutor {
end: analyze.end,
step: analyze.step,
query: analyze.query,
lookback: analyze
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};
let analyze_node_name = if analyze.is_verbose {
ANALYZE_VERBOSE_NODE_NAME
Expand Down
17 changes: 14 additions & 3 deletions src/query/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::error::{
};
use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};

const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
Expand Down Expand Up @@ -98,6 +97,7 @@ pub struct PromQuery {
pub start: String,
pub end: String,
pub step: String,
pub lookback: String,
}

impl Default for PromQuery {
Expand All @@ -107,6 +107,7 @@ impl Default for PromQuery {
start: String::from("0"),
end: String::from("0"),
step: String::from("5m"),
lookback: String::from(DEFAULT_LOOKBACK_STRING),
}
}
}
Expand Down Expand Up @@ -165,13 +166,22 @@ impl QueryLanguageParser {
query: &query.query,
})?;

let lookback_delta = query
.lookback
.parse::<u64>()
.map(Duration::from_secs)
.or_else(|_| promql_parser::util::parse_duration(&query.lookback))
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
.context(QueryParseSnafu {
query: &query.query,
})?;

let eval_stmt = EvalStmt {
expr,
start,
end,
interval: step,
// TODO(ruihang): provide a way to adjust this parameter.
lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK),
lookback_delta,
};

Ok(QueryStatement::Promql(eval_stmt))
Expand Down Expand Up @@ -353,6 +363,7 @@ mod test {
start: "2022-02-13T17:14:00Z".to_string(),
end: "2023-02-13T17:14:00Z".to_string(),
step: "1d".to_string(),
lookback: "5m".to_string(),
};

#[cfg(not(windows))]
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub enum Error {

#[snafu(display("Failed to parse PromQL: {query:?}"))]
ParsePromQL {
query: PromQuery,
query: Box<PromQuery>,
location: Location,
source: query::error::Error,
},
Expand Down
4 changes: 3 additions & 1 deletion src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_time::util::current_time_rfc3339;
use promql_parser::parser::ValueType;
use query::parser::PromQuery;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use session::context::QueryContext;
use snafu::OptionExt;
use tonic::{Request, Response};
Expand Down Expand Up @@ -58,6 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService {
start: range_query.start,
end: range_query.end,
step: range_query.step,
lookback: DEFAULT_LOOKBACK_STRING.to_string(),
}
}
Promql::InstantQuery(instant_query) => {
Expand All @@ -71,6 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService {
start: time.clone(),
end: time,
step: String::from("1s"),
lookback: String::from(DEFAULT_LOOKBACK_STRING),
}
}
};
Expand Down
6 changes: 5 additions & 1 deletion src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::tracing;
use query::parser::PromQuery;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -205,6 +205,7 @@ pub struct PromqlQuery {
pub start: String,
pub end: String,
pub step: String,
pub lookback: Option<String>,
pub db: Option<String>,
}

Expand All @@ -215,6 +216,9 @@ impl From<PromqlQuery> for PromQuery {
start: query.start,
end: query.end,
step: query.step,
lookback: query
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/servers/src/http/prometheus.rs
Comment thread
tisonkun marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub async fn build_info_query() -> PrometheusJsonResponse {
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct InstantQuery {
query: Option<String>,
lookback: Option<String>,
time: Option<String>,
timeout: Option<String>,
db: Option<String>,
Expand All @@ -178,6 +179,10 @@ pub async fn instant_query(
start: time.clone(),
end: time,
step: "1s".to_string(),
lookback: params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};

let result = handler.do_query(&prom_query, query_ctx).await;
Expand All @@ -196,6 +201,7 @@ pub struct RangeQuery {
start: Option<String>,
end: Option<String>,
step: Option<String>,
lookback: Option<String>,
timeout: Option<String>,
db: Option<String>,
}
Expand All @@ -216,6 +222,10 @@ pub async fn range_query(
start: params.start.or(form_params.start).unwrap_or_default(),
end: params.end.or(form_params.end).unwrap_or_default(),
step: params.step.or(form_params.step).unwrap_or_default(),
lookback: params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};

let result = handler.do_query(&prom_query, query_ctx).await;
Expand All @@ -235,6 +245,7 @@ struct Matches(Vec<String>);
pub struct LabelsQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -310,6 +321,11 @@ pub async fn labels_query(
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);

let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut labels = HashSet::new();
let _ = labels.insert(METRIC_NAME.to_string());

Expand All @@ -320,6 +336,7 @@ pub async fn labels_query(
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
};

let result = handler.do_query(&prom_query, query_ctx.clone()).await;
Expand Down Expand Up @@ -546,6 +563,7 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
pub struct LabelValueQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -587,6 +605,9 @@ pub async fn label_values_query(

let start = params.start.unwrap_or_else(yesterday_rfc3339);
let end = params.end.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut label_values = HashSet::new();

Expand All @@ -597,6 +618,7 @@ pub async fn label_values_query(
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
Comment thread
tisonkun marked this conversation as resolved.
lookback: lookback.clone(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) =
Expand Down Expand Up @@ -695,6 +717,7 @@ async fn retrieve_label_values_from_record_batch(
pub struct SeriesQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -726,6 +749,10 @@ pub async fn series_query(
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut series = Vec::new();
let mut merge_map = HashMap::new();
Expand All @@ -737,6 +764,7 @@ pub async fn series_query(
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;

Expand Down
3 changes: 2 additions & 1 deletion src/servers/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use catalog::memory::MemoryCatalogManager;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
Expand Down Expand Up @@ -193,6 +193,7 @@ impl GrpcQueryHandler for DummyInstance {
start: promql.start,
end: promql.end,
step: promql.step,
lookback: DEFAULT_LOOKBACK_STRING.to_string(),
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
+-+-+
| plan_type_| plan_|
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[1000], time index=[j], REDACTED
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED
Comment thread
tisonkun marked this conversation as resolved.
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
Expand Down
26 changes: 13 additions & 13 deletions tests/cases/standalone/common/tql-explain-analyze/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ TQL EXPLAIN (0, 10, '5s') test;
-- SQLNESS REPLACE (peers.*) REDACTED
TQL EXPLAIN (0, 10, '1s', '2s') test;

+---------------+-----------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
| | |
+---------------+---------------------------------------------------------------------------------------------+

-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
Expand Down
8 changes: 0 additions & 8 deletions tests/cases/standalone/common/tql/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ TQL EVAL (0, 10, '1s', '2s') test{k="a"};
+-----+---------------------+---+
| 2.0 | 1970-01-01T00:00:01 | a |
| 2.0 | 1970-01-01T00:00:02 | a |
| 2.0 | 1970-01-01T00:00:03 | a |
Comment thread
etolbakov marked this conversation as resolved.
| 2.0 | 1970-01-01T00:00:04 | a |
| 2.0 | 1970-01-01T00:00:05 | a |
| 2.0 | 1970-01-01T00:00:06 | a |
| 2.0 | 1970-01-01T00:00:07 | a |
| 2.0 | 1970-01-01T00:00:08 | a |
| 2.0 | 1970-01-01T00:00:09 | a |
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+---------------------+---+

TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};
Expand Down