diff --git a/Cargo.lock b/Cargo.lock index 1a6a468e7054..d4f44c1d8374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3775,7 +3775,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=8da84a04b137c4104262459807eab1c04b92f3cc#8da84a04b137c4104262459807eab1c04b92f3cc" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=04d78b6e025ceb518040fdd10858c2a9d9345820#04d78b6e025ceb518040fdd10858c2a9d9345820" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index bbba87e3ae81..788bc68798e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8da84a04b137c4104262459807eab1c04b92f3cc" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "04d78b6e025ceb518040fdd10858c2a9d9345820" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/client/src/database.rs b/src/client/src/database.rs index fe02032a7be6..70dc7397f5ed 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -37,6 +37,8 @@ use snafu::{ensure, ResultExt}; use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter}; +pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; + #[derive(Clone, Debug, Default)] pub struct Database { // The "catalog" and "schema" to be used in processing the requests at the server side. @@ -215,6 +217,7 @@ impl Database { start: start.to_string(), end: end.to_string(), step: step.to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), })), })) .await diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 30fdb0489c16..73ec35df5d49 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -85,6 +85,7 @@ impl GrpcQueryHandler for Instance { start: promql.start, end: promql.end, step: promql.step, + lookback: promql.lookback, }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await; diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index ffd470030bf2..02b739c28502 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -18,7 +18,7 @@ use common_query::Output; use common_telemetry::tracing; use query::parser::{ PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, - EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, + DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, }; use session::context::QueryContextRef; use snafu::ResultExt; @@ -37,12 +37,16 @@ impl StatementExecutor { end: eval.end, step: eval.step, query: eval.query, + lookback: eval.lookback.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)? } Tql::Explain(explain) => { let promql = PromQuery { query: explain.query, + lookback: explain + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), ..PromQuery::default() }; let explain_node_name = if explain.is_verbose { @@ -63,6 +67,9 @@ impl StatementExecutor { end: analyze.end, step: analyze.step, query: analyze.query, + lookback: analyze + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; let analyze_node_name = if analyze.is_verbose { ANALYZE_VERBOSE_NODE_NAME diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 92cab91481b3..4da25d364936 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -36,7 +36,6 @@ use crate::error::{ }; use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED}; -const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN"; pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE"; @@ -98,6 +97,7 @@ pub struct PromQuery { pub start: String, pub end: String, pub step: String, + pub lookback: String, } impl Default for PromQuery { @@ -107,6 +107,7 @@ impl Default for PromQuery { start: String::from("0"), end: String::from("0"), step: String::from("5m"), + lookback: String::from(DEFAULT_LOOKBACK_STRING), } } } @@ -165,13 +166,22 @@ impl QueryLanguageParser { query: &query.query, })?; + let lookback_delta = query + .lookback + .parse::() + .map(Duration::from_secs) + .or_else(|_| promql_parser::util::parse_duration(&query.lookback)) + .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments))) + .context(QueryParseSnafu { + query: &query.query, + })?; + let eval_stmt = EvalStmt { expr, start, end, interval: step, - // TODO(ruihang): provide a way to adjust this parameter. - lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK), + lookback_delta, }; Ok(QueryStatement::Promql(eval_stmt)) @@ -353,6 +363,7 @@ mod test { start: "2022-02-13T17:14:00Z".to_string(), end: "2023-02-13T17:14:00Z".to_string(), step: "1d".to_string(), + lookback: "5m".to_string(), }; #[cfg(not(windows))] diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 1d4baa605e77..ba8144a7c512 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -347,7 +347,7 @@ pub enum Error { #[snafu(display("Failed to parse PromQL: {query:?}"))] ParsePromQL { - query: PromQuery, + query: Box, location: Location, source: query::error::Error, }, diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index e7e20c65337e..ebd1a6d2556b 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -58,6 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: range_query.start, end: range_query.end, step: range_query.step, + lookback: range_query.lookback, } } Promql::InstantQuery(instant_query) => { @@ -71,6 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: time.clone(), end: time, step: String::from("1s"), + lookback: instant_query.lookback, } } }; diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 002bc2dd286c..bf91f11a9a99 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -26,7 +26,7 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST; use common_query::{Output, OutputData}; use common_recordbatch::util; use common_telemetry::tracing; -use query::parser::PromQuery; +use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -205,6 +205,7 @@ pub struct PromqlQuery { pub start: String, pub end: String, pub step: String, + pub lookback: Option, pub db: Option, } @@ -215,6 +216,9 @@ impl From for PromQuery { start: query.start, end: query.end, step: query.step, + lookback: query + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), } } } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index bb208fb1cee3..0eebf43367c1 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -152,6 +152,7 @@ pub async fn build_info_query() -> PrometheusJsonResponse { #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct InstantQuery { query: Option, + lookback: Option, time: Option, timeout: Option, db: Option, @@ -178,6 +179,10 @@ pub async fn instant_query( start: time.clone(), end: time, step: "1s".to_string(), + lookback: params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; let result = handler.do_query(&prom_query, query_ctx).await; @@ -196,6 +201,7 @@ pub struct RangeQuery { start: Option, end: Option, step: Option, + lookback: Option, timeout: Option, db: Option, } @@ -216,6 +222,10 @@ pub async fn range_query( start: params.start.or(form_params.start).unwrap_or_default(), end: params.end.or(form_params.end).unwrap_or_default(), step: params.step.or(form_params.step).unwrap_or_default(), + lookback: params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; let result = handler.do_query(&prom_query, query_ctx).await; @@ -235,6 +245,7 @@ struct Matches(Vec); pub struct LabelsQuery { start: Option, end: Option, + lookback: Option, #[serde(flatten)] matches: Matches, db: Option, @@ -310,6 +321,11 @@ pub async fn labels_query( .or(form_params.end) .unwrap_or_else(current_time_rfc3339); + let lookback = params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()); + let mut labels = HashSet::new(); let _ = labels.insert(METRIC_NAME.to_string()); @@ -320,6 +336,7 @@ pub async fn labels_query( start: start.clone(), end: end.clone(), step: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: lookback.clone(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; @@ -546,6 +563,7 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { pub struct LabelValueQuery { start: Option, end: Option, + lookback: Option, #[serde(flatten)] matches: Matches, db: Option, @@ -587,6 +605,9 @@ pub async fn label_values_query( let start = params.start.unwrap_or_else(yesterday_rfc3339); let end = params.end.unwrap_or_else(current_time_rfc3339); + let lookback = params + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()); let mut label_values = HashSet::new(); @@ -597,6 +618,7 @@ pub async fn label_values_query( start: start.clone(), end: end.clone(), step: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: lookback.clone(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; if let Err(err) = @@ -695,6 +717,7 @@ async fn retrieve_label_values_from_record_batch( pub struct SeriesQuery { start: Option, end: Option, + lookback: Option, #[serde(flatten)] matches: Matches, db: Option, @@ -726,6 +749,10 @@ pub async fn series_query( .end .or(form_params.end) .unwrap_or_else(current_time_rfc3339); + let lookback = params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()); let mut series = Vec::new(); let mut merge_map = HashMap::new(); @@ -737,6 +764,7 @@ pub async fn series_query( end: end.clone(), // TODO: find a better value for step step: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: lookback.clone(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 284935673650..8913c879d74b 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -193,6 +193,7 @@ impl GrpcQueryHandler for DummyInstance { start: promql.start, end: promql.end, step: promql.step, + lookback: promql.lookback, }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index c115de52ab07..6d7179b18da1 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -790,6 +790,7 @@ CREATE TABLE {table_name} ( start: "1672557973".to_owned(), end: "1672557978".to_owned(), step: "1s".to_owned(), + lookback: "5m".to_string(), })), }); let output = query(instance, request).await; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 3298ead4e4a9..0b38ac252c30 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -504,6 +504,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { let instant_query = PromInstantQuery { query: "test".to_string(), time: "5".to_string(), + lookback: "5m".to_string(), }; let instant_query_request = PromqlRequest { header: Some(header.clone()), @@ -555,6 +556,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { start: "0".to_string(), end: "10".to_string(), step: "5s".to_string(), + lookback: "5m".to_string(), }; let range_query_request: PromqlRequest = PromqlRequest { header: Some(header.clone()), @@ -605,6 +607,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { start: "1000000000".to_string(), end: "1000001000".to_string(), step: "5s".to_string(), + lookback: "5m".to_string(), }; let range_query_request: PromqlRequest = PromqlRequest { header: Some(header), diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index 96b2548503df..9d063adcdc82 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -39,7 +39,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; +-+-+ | plan_type_| plan_| +-+-+ -| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[1000], time index=[j], REDACTED +| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED |_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index e50d1892f351..2666a1a4d799 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -34,21 +34,21 @@ TQL EXPLAIN (0, 10, '5s') test; -- SQLNESS REPLACE (peers.*) REDACTED TQL EXPLAIN (0, 10, '1s', '2s') test; -+---------------+-----------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------+ -| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | -| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivide: tags=["k"] | -| | MergeScan [is_placeholder=false] | -| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | ++---------------+---------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------+ +| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | +| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivide: tags=["k"] | +| | MergeScan [is_placeholder=false] | +| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | | | RepartitionExec: partitioning=REDACTED -| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 ASC NULLS LAST] | +| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 ASC NULLS LAST] | | | MergeScanExec: REDACTED -| | | -+---------------+-----------------------------------------------------------------------------------------------+ +| | | ++---------------+---------------------------------------------------------------------------------------------+ -- explain at 0s, 5s and 10s. No point at 0s. -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED diff --git a/tests/cases/standalone/common/tql/basic.result b/tests/cases/standalone/common/tql/basic.result index a0c244a48b31..f38055ea1a08 100644 --- a/tests/cases/standalone/common/tql/basic.result +++ b/tests/cases/standalone/common/tql/basic.result @@ -76,7 +76,6 @@ TQL EVAL (0, 10, '5s') test{k="a"}; | 2.0 | 1970-01-01T00:00:10 | a | +-----+---------------------+---+ --- 'lookback' parameter is not fully supported, the test has to be updated TQL EVAL (0, 10, '1s', '2s') test{k="a"}; +-----+---------------------+---+ @@ -84,14 +83,6 @@ TQL EVAL (0, 10, '1s', '2s') test{k="a"}; +-----+---------------------+---+ | 2.0 | 1970-01-01T00:00:01 | a | | 2.0 | 1970-01-01T00:00:02 | a | -| 2.0 | 1970-01-01T00:00:03 | a | -| 2.0 | 1970-01-01T00:00:04 | a | -| 2.0 | 1970-01-01T00:00:05 | a | -| 2.0 | 1970-01-01T00:00:06 | a | -| 2.0 | 1970-01-01T00:00:07 | a | -| 2.0 | 1970-01-01T00:00:08 | a | -| 2.0 | 1970-01-01T00:00:09 | a | -| 2.0 | 1970-01-01T00:00:10 | a | +-----+---------------------+---+ TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"}; diff --git a/tests/cases/standalone/common/tql/basic.sql b/tests/cases/standalone/common/tql/basic.sql index 27ed5153f134..3cd8da8fcf4e 100644 --- a/tests/cases/standalone/common/tql/basic.sql +++ b/tests/cases/standalone/common/tql/basic.sql @@ -25,7 +25,6 @@ TQL EVAL (0, 10, '5s') {__name__!="test"}; -- the point at 1ms will be shadowed by the point at 2ms TQL EVAL (0, 10, '5s') test{k="a"}; --- 'lookback' parameter is not fully supported, the test has to be updated TQL EVAL (0, 10, '1s', '2s') test{k="a"}; TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};