From 60bb4473d0481fbb480928e9e5485a3dc49e2f52 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Tue, 2 Apr 2024 09:35:06 +0100 Subject: [PATCH 1/9] feat(promql): parameterize lookback --- src/frontend/src/instance/grpc.rs | 1 + src/operator/src/statement/tql.rs | 3 +++ src/query/src/parser.rs | 17 ++++++++++++++--- src/servers/src/error.rs | 2 +- src/servers/src/grpc/prom_query_gateway.rs | 2 ++ src/servers/src/http/handler.rs | 1 + src/servers/src/http/prometheus.rs | 5 +++++ src/servers/tests/mod.rs | 1 + 8 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 5dd20808f0e8..0819606d5017 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -85,6 +85,7 @@ impl GrpcQueryHandler for Instance { start: promql.start, end: promql.end, step: promql.step, + lookback: "5m".to_string(), }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await; diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index ffd470030bf2..ba2a56e85cf4 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -37,12 +37,14 @@ impl StatementExecutor { end: eval.end, step: eval.step, query: eval.query, + lookback: eval.lookback.unwrap_or("5m".to_string()), }; QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)? } Tql::Explain(explain) => { let promql = PromQuery { query: explain.query, + lookback: explain.lookback.unwrap_or("5m".to_string()), ..PromQuery::default() }; let explain_node_name = if explain.is_verbose { @@ -63,6 +65,7 @@ impl StatementExecutor { end: analyze.end, step: analyze.step, query: analyze.query, + lookback: analyze.lookback.unwrap_or("5m".to_string()), }; let analyze_node_name = if analyze.is_verbose { ANALYZE_VERBOSE_NODE_NAME diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 92cab91481b3..0f030c1be690 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -36,7 +36,6 @@ use crate::error::{ }; use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED}; -const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN"; pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE"; @@ -98,6 +97,7 @@ pub struct PromQuery { pub start: String, pub end: String, pub step: String, + pub lookback: String, } impl Default for PromQuery { @@ -107,6 +107,7 @@ impl Default for PromQuery { start: String::from("0"), end: String::from("0"), step: String::from("5m"), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), } } } @@ -165,13 +166,22 @@ impl QueryLanguageParser { query: &query.query, })?; + let lookback_delta = query + .lookback + .parse::() + .map(Duration::from_secs) + .or_else(|_| promql_parser::util::parse_duration(&query.lookback)) + .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments))) + .context(QueryParseSnafu { + query: &query.query, + })?; + let eval_stmt = EvalStmt { expr, start, end, interval: step, - // TODO(ruihang): provide a way to adjust this parameter. - lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK), + lookback_delta, }; Ok(QueryStatement::Promql(eval_stmt)) @@ -353,6 +363,7 @@ mod test { start: "2022-02-13T17:14:00Z".to_string(), end: "2023-02-13T17:14:00Z".to_string(), step: "1d".to_string(), + lookback: "5m".to_string(), }; #[cfg(not(windows))] diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2d47547e65a6..0ccd874993ae 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -340,7 +340,7 @@ pub enum Error { #[snafu(display("Failed to parse PromQL: {query:?}"))] ParsePromQL { - query: PromQuery, + query: Box, location: Location, source: query::error::Error, }, diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index e7e20c65337e..0e6b595ba7eb 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -58,6 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: range_query.start, end: range_query.end, step: range_query.step, + lookback: "5m".to_string(), } } Promql::InstantQuery(instant_query) => { @@ -71,6 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: time.clone(), end: time, step: String::from("1s"), + lookback: "5m".to_string(), } } }; diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 002bc2dd286c..0e896aea0298 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -215,6 +215,7 @@ impl From for PromQuery { start: query.start, end: query.end, step: query.step, + lookback: "5m".to_string(), } } } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index bb208fb1cee3..8913e4786831 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -178,6 +178,7 @@ pub async fn instant_query( start: time.clone(), end: time, step: "1s".to_string(), + lookback: "5m".to_string(), }; let result = handler.do_query(&prom_query, query_ctx).await; @@ -216,6 +217,7 @@ pub async fn range_query( start: params.start.or(form_params.start).unwrap_or_default(), end: params.end.or(form_params.end).unwrap_or_default(), step: params.step.or(form_params.step).unwrap_or_default(), + lookback: "5m".to_string(), }; let result = handler.do_query(&prom_query, query_ctx).await; @@ -320,6 +322,7 @@ pub async fn labels_query( start: start.clone(), end: end.clone(), step: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; @@ -597,6 +600,7 @@ pub async fn label_values_query( start: start.clone(), end: end.clone(), step: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; if let Err(err) = @@ -737,6 +741,7 @@ pub async fn series_query( end: end.clone(), // TODO: find a better value for step step: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 284935673650..e892bbe33915 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -193,6 +193,7 @@ impl GrpcQueryHandler for DummyInstance { start: promql.start, end: promql.end, step: promql.step, + lookback: "5m".to_string(), }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; From 7dcd7055a4f370545180da4d1042c5af3aa403fe Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 10 Apr 2024 15:41:03 +0100 Subject: [PATCH 2/9] chore(promql): address CR, adjusted sqlness --- src/frontend/src/instance/grpc.rs | 4 +-- src/operator/src/statement/tql.rs | 12 ++++++--- src/query/src/parser.rs | 2 +- src/servers/src/grpc/prom_query_gateway.rs | 6 ++--- src/servers/src/http/handler.rs | 4 +-- src/servers/src/http/prometheus.rs | 2 +- src/servers/tests/mod.rs | 4 +-- .../common/tql-explain-analyze/analyze.result | 2 +- .../common/tql-explain-analyze/explain.result | 26 +++++++++---------- .../cases/standalone/common/tql/basic.result | 8 ------ 10 files changed, 33 insertions(+), 37 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 0819606d5017..eda9b9f5e6eb 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -21,7 +21,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; use common_query::Output; use common_telemetry::tracing; -use query::parser::PromQuery; +use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; @@ -85,7 +85,7 @@ impl GrpcQueryHandler for Instance { start: promql.start, end: promql.end, step: promql.step, - lookback: "5m".to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await; diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index ba2a56e85cf4..02b739c28502 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -18,7 +18,7 @@ use common_query::Output; use common_telemetry::tracing; use query::parser::{ PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, - EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, + DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, }; use session::context::QueryContextRef; use snafu::ResultExt; @@ -37,14 +37,16 @@ impl StatementExecutor { end: eval.end, step: eval.step, query: eval.query, - lookback: eval.lookback.unwrap_or("5m".to_string()), + lookback: eval.lookback.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)? } Tql::Explain(explain) => { let promql = PromQuery { query: explain.query, - lookback: explain.lookback.unwrap_or("5m".to_string()), + lookback: explain + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), ..PromQuery::default() }; let explain_node_name = if explain.is_verbose { @@ -65,7 +67,9 @@ impl StatementExecutor { end: analyze.end, step: analyze.step, query: analyze.query, - lookback: analyze.lookback.unwrap_or("5m".to_string()), + lookback: analyze + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; let analyze_node_name = if analyze.is_verbose { ANALYZE_VERBOSE_NODE_NAME diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 0f030c1be690..4da25d364936 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -107,7 +107,7 @@ impl Default for PromQuery { start: String::from("0"), end: String::from("0"), step: String::from("5m"), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: String::from(DEFAULT_LOOKBACK_STRING), } } } diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 0e6b595ba7eb..9e3c398c7870 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -26,7 +26,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_time::util::current_time_rfc3339; use promql_parser::parser::ValueType; -use query::parser::PromQuery; +use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use session::context::QueryContext; use snafu::OptionExt; use tonic::{Request, Response}; @@ -58,7 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: range_query.start, end: range_query.end, step: range_query.step, - lookback: "5m".to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), } } Promql::InstantQuery(instant_query) => { @@ -72,7 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: time.clone(), end: time, step: String::from("1s"), - lookback: "5m".to_string(), + lookback: String::from(DEFAULT_LOOKBACK_STRING), } } }; diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 0e896aea0298..38192f182356 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -26,7 +26,7 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST; use common_query::{Output, OutputData}; use common_recordbatch::util; use common_telemetry::tracing; -use query::parser::PromQuery; +use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -215,7 +215,7 @@ impl From for PromQuery { start: query.start, end: query.end, step: query.step, - lookback: "5m".to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), } } } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 8913e4786831..d3a9e33f70ac 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -217,7 +217,7 @@ pub async fn range_query( start: params.start.or(form_params.start).unwrap_or_default(), end: params.end.or(form_params.end).unwrap_or_default(), step: params.step.or(form_params.step).unwrap_or_default(), - lookback: "5m".to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), }; let result = handler.do_query(&prom_query, query_ctx).await; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index e892bbe33915..7f2ca8268369 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use catalog::memory::MemoryCatalogManager; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; -use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; +use query::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING}; use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use query::{QueryEngineFactory, QueryEngineRef}; @@ -193,7 +193,7 @@ impl GrpcQueryHandler for DummyInstance { start: promql.start, end: promql.end, step: promql.step, - lookback: "5m".to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index 96b2548503df..9d063adcdc82 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -39,7 +39,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; +-+-+ | plan_type_| plan_| +-+-+ -| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[1000], time index=[j], REDACTED +| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED |_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index e50d1892f351..2666a1a4d799 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -34,21 +34,21 @@ TQL EXPLAIN (0, 10, '5s') test; -- SQLNESS REPLACE (peers.*) REDACTED TQL EXPLAIN (0, 10, '1s', '2s') test; -+---------------+-----------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------+ -| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | -| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivide: tags=["k"] | -| | MergeScan [is_placeholder=false] | -| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | ++---------------+---------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------+ +| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | +| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivide: tags=["k"] | +| | MergeScan [is_placeholder=false] | +| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | | | RepartitionExec: partitioning=REDACTED -| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 ASC NULLS LAST] | +| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 ASC NULLS LAST] | | | MergeScanExec: REDACTED -| | | -+---------------+-----------------------------------------------------------------------------------------------+ +| | | ++---------------+---------------------------------------------------------------------------------------------+ -- explain at 0s, 5s and 10s. No point at 0s. -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED diff --git a/tests/cases/standalone/common/tql/basic.result b/tests/cases/standalone/common/tql/basic.result index a0c244a48b31..053a536a7acf 100644 --- a/tests/cases/standalone/common/tql/basic.result +++ b/tests/cases/standalone/common/tql/basic.result @@ -84,14 +84,6 @@ TQL EVAL (0, 10, '1s', '2s') test{k="a"}; +-----+---------------------+---+ | 2.0 | 1970-01-01T00:00:01 | a | | 2.0 | 1970-01-01T00:00:02 | a | -| 2.0 | 1970-01-01T00:00:03 | a | -| 2.0 | 1970-01-01T00:00:04 | a | -| 2.0 | 1970-01-01T00:00:05 | a | -| 2.0 | 1970-01-01T00:00:06 | a | -| 2.0 | 1970-01-01T00:00:07 | a | -| 2.0 | 1970-01-01T00:00:08 | a | -| 2.0 | 1970-01-01T00:00:09 | a | -| 2.0 | 1970-01-01T00:00:10 | a | +-----+---------------------+---+ TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"}; From f192e46d421996c802d199923d4ea48ac87c7477 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 10 Apr 2024 15:44:09 +0100 Subject: [PATCH 3/9] chore(promql): fmt --- src/servers/src/error.rs | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 82730b3e7a85..b3c0f3205063 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -219,20 +219,13 @@ pub enum Error { error: prost::DecodeError, }, - #[snafu(display("Failed to decompress snappy prometheus remote request"))] - DecompressSnappyPromRemoteRequest { + #[snafu(display("Failed to decompress prometheus remote request"))] + DecompressPromRemoteRequest { location: Location, #[snafu(source)] error: snap::Error, }, - #[snafu(display("Failed to decompress zstd prometheus remote request"))] - DecompressZstdPromRemoteRequest { - location: Location, - #[snafu(source)] - error: std::io::Error, - }, - #[snafu(display("Failed to send prometheus remote request"))] SendPromRemoteRequest { location: Location, @@ -347,7 +340,7 @@ pub enum Error { #[snafu(display("Failed to parse PromQL: {query:?}"))] ParsePromQL { - query: Box, + query: Box, location: Location, source: query::error::Error, }, @@ -511,8 +504,7 @@ impl ErrorExt for Error { | DecodePromRemoteRequest { .. } | DecodeOtlpRequest { .. } | CompressPromRemoteRequest { .. } - | DecompressSnappyPromRemoteRequest { .. } - | DecompressZstdPromRemoteRequest { .. } + | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } | InvalidExportMetricsConfig { .. } | InvalidFlightTicket { .. } @@ -665,8 +657,7 @@ impl IntoResponse for Error { | Error::InvalidOpentsdbJsonRequest { .. } | Error::DecodePromRemoteRequest { .. } | Error::DecodeOtlpRequest { .. } - | Error::DecompressSnappyPromRemoteRequest { .. } - | Error::DecompressZstdPromRemoteRequest { .. } + | Error::DecompressPromRemoteRequest { .. } | Error::InvalidPromRemoteRequest { .. } | Error::InvalidQuery { .. } | Error::TimePrecision { .. } => HttpStatusCode::BAD_REQUEST, From 9cf9c64c5670e62e47d72b25f76b1649db3df7e7 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 10 Apr 2024 15:56:22 +0100 Subject: [PATCH 4/9] chore(promql): fix accidental removal --- src/servers/src/error.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index b3c0f3205063..ba8144a7c512 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -219,13 +219,20 @@ pub enum Error { error: prost::DecodeError, }, - #[snafu(display("Failed to decompress prometheus remote request"))] - DecompressPromRemoteRequest { + #[snafu(display("Failed to decompress snappy prometheus remote request"))] + DecompressSnappyPromRemoteRequest { location: Location, #[snafu(source)] error: snap::Error, }, + #[snafu(display("Failed to decompress zstd prometheus remote request"))] + DecompressZstdPromRemoteRequest { + location: Location, + #[snafu(source)] + error: std::io::Error, + }, + #[snafu(display("Failed to send prometheus remote request"))] SendPromRemoteRequest { location: Location, @@ -504,7 +511,8 @@ impl ErrorExt for Error { | DecodePromRemoteRequest { .. } | DecodeOtlpRequest { .. } | CompressPromRemoteRequest { .. } - | DecompressPromRemoteRequest { .. } + | DecompressSnappyPromRemoteRequest { .. } + | DecompressZstdPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } | InvalidExportMetricsConfig { .. } | InvalidFlightTicket { .. } @@ -657,7 +665,8 @@ impl IntoResponse for Error { | Error::InvalidOpentsdbJsonRequest { .. } | Error::DecodePromRemoteRequest { .. } | Error::DecodeOtlpRequest { .. } - | Error::DecompressPromRemoteRequest { .. } + | Error::DecompressSnappyPromRemoteRequest { .. } + | Error::DecompressZstdPromRemoteRequest { .. } | Error::InvalidPromRemoteRequest { .. } | Error::InvalidQuery { .. } | Error::TimePrecision { .. } => HttpStatusCode::BAD_REQUEST, From 3be16e4cb2d6c30f7bc0ccebd5fa102be3b87fb3 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Thu, 11 Apr 2024 21:32:51 +0100 Subject: [PATCH 5/9] fix(promql): address CR --- src/servers/src/http/handler.rs | 5 ++++- src/servers/src/http/prometheus.rs | 33 +++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 38192f182356..bf91f11a9a99 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -205,6 +205,7 @@ pub struct PromqlQuery { pub start: String, pub end: String, pub step: String, + pub lookback: Option, pub db: Option, } @@ -215,7 +216,9 @@ impl From for PromQuery { start: query.start, end: query.end, step: query.step, - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: query + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), } } } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index d3a9e33f70ac..0eebf43367c1 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -152,6 +152,7 @@ pub async fn build_info_query() -> PrometheusJsonResponse { #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct InstantQuery { query: Option, + lookback: Option, time: Option, timeout: Option, db: Option, @@ -178,7 +179,10 @@ pub async fn instant_query( start: time.clone(), end: time, step: "1s".to_string(), - lookback: "5m".to_string(), + lookback: params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; let result = handler.do_query(&prom_query, query_ctx).await; @@ -197,6 +201,7 @@ pub struct RangeQuery { start: Option, end: Option, step: Option, + lookback: Option, timeout: Option, db: Option, } @@ -217,7 +222,10 @@ pub async fn range_query( start: params.start.or(form_params.start).unwrap_or_default(), end: params.end.or(form_params.end).unwrap_or_default(), step: params.step.or(form_params.step).unwrap_or_default(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()), }; let result = handler.do_query(&prom_query, query_ctx).await; @@ -237,6 +245,7 @@ struct Matches(Vec); pub struct LabelsQuery { start: Option, end: Option, + lookback: Option, #[serde(flatten)] matches: Matches, db: Option, @@ -312,6 +321,11 @@ pub async fn labels_query( .or(form_params.end) .unwrap_or_else(current_time_rfc3339); + let lookback = params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()); + let mut labels = HashSet::new(); let _ = labels.insert(METRIC_NAME.to_string()); @@ -322,7 +336,7 @@ pub async fn labels_query( start: start.clone(), end: end.clone(), step: DEFAULT_LOOKBACK_STRING.to_string(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: lookback.clone(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; @@ -549,6 +563,7 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { pub struct LabelValueQuery { start: Option, end: Option, + lookback: Option, #[serde(flatten)] matches: Matches, db: Option, @@ -590,6 +605,9 @@ pub async fn label_values_query( let start = params.start.unwrap_or_else(yesterday_rfc3339); let end = params.end.unwrap_or_else(current_time_rfc3339); + let lookback = params + .lookback + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()); let mut label_values = HashSet::new(); @@ -600,7 +618,7 @@ pub async fn label_values_query( start: start.clone(), end: end.clone(), step: DEFAULT_LOOKBACK_STRING.to_string(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: lookback.clone(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; if let Err(err) = @@ -699,6 +717,7 @@ async fn retrieve_label_values_from_record_batch( pub struct SeriesQuery { start: Option, end: Option, + lookback: Option, #[serde(flatten)] matches: Matches, db: Option, @@ -730,6 +749,10 @@ pub async fn series_query( .end .or(form_params.end) .unwrap_or_else(current_time_rfc3339); + let lookback = params + .lookback + .or(form_params.lookback) + .unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()); let mut series = Vec::new(); let mut merge_map = HashMap::new(); @@ -741,7 +764,7 @@ pub async fn series_query( end: end.clone(), // TODO: find a better value for step step: DEFAULT_LOOKBACK_STRING.to_string(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: lookback.clone(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; From f7bd923bd24e49ec756faa03284c3b3a0bfbdf39 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Sat, 13 Apr 2024 14:56:46 +0100 Subject: [PATCH 6/9] fix(promql): address CR --- tests/cases/standalone/common/tql/basic.result | 1 - tests/cases/standalone/common/tql/basic.sql | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/cases/standalone/common/tql/basic.result b/tests/cases/standalone/common/tql/basic.result index 053a536a7acf..f38055ea1a08 100644 --- a/tests/cases/standalone/common/tql/basic.result +++ b/tests/cases/standalone/common/tql/basic.result @@ -76,7 +76,6 @@ TQL EVAL (0, 10, '5s') test{k="a"}; | 2.0 | 1970-01-01T00:00:10 | a | +-----+---------------------+---+ --- 'lookback' parameter is not fully supported, the test has to be updated TQL EVAL (0, 10, '1s', '2s') test{k="a"}; +-----+---------------------+---+ diff --git a/tests/cases/standalone/common/tql/basic.sql b/tests/cases/standalone/common/tql/basic.sql index 27ed5153f134..3cd8da8fcf4e 100644 --- a/tests/cases/standalone/common/tql/basic.sql +++ b/tests/cases/standalone/common/tql/basic.sql @@ -25,7 +25,6 @@ TQL EVAL (0, 10, '5s') {__name__!="test"}; -- the point at 1ms will be shadowed by the point at 2ms TQL EVAL (0, 10, '5s') test{k="a"}; --- 'lookback' parameter is not fully supported, the test has to be updated TQL EVAL (0, 10, '1s', '2s') test{k="a"}; TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"}; From 312a1b924b557374eccb7374b9600f3cf58f316b Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Sat, 13 Apr 2024 15:27:33 +0100 Subject: [PATCH 7/9] feat(promql): add initial lookback parameter grpc support --- Cargo.toml | 2 +- src/client/src/database.rs | 1 + src/frontend/src/instance/grpc.rs | 4 ++-- src/servers/src/grpc/prom_query_gateway.rs | 6 +++--- src/servers/tests/mod.rs | 4 ++-- tests-integration/src/grpc.rs | 1 + tests-integration/tests/grpc.rs | 3 +++ 7 files changed, 13 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0ecd7d2cafcd..ac91b4eb83b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" } +greptime-proto = { git = "https://github.com/etolbakov/greptime-proto.git", rev = "1da8c466f9920ec85141bc673c544c627fe79416" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/client/src/database.rs b/src/client/src/database.rs index fe02032a7be6..8776f90f61fe 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -215,6 +215,7 @@ impl Database { start: start.to_string(), end: end.to_string(), step: step.to_string(), + lookback: "".to_string(), })), })) .await diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index ed0e35237f80..73ec35df5d49 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -21,7 +21,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; use common_query::Output; use common_telemetry::tracing; -use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; +use query::parser::PromQuery; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; @@ -85,7 +85,7 @@ impl GrpcQueryHandler for Instance { start: promql.start, end: promql.end, step: promql.step, - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: promql.lookback, }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await; diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 9e3c398c7870..ebd1a6d2556b 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -26,7 +26,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_time::util::current_time_rfc3339; use promql_parser::parser::ValueType; -use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; +use query::parser::PromQuery; use session::context::QueryContext; use snafu::OptionExt; use tonic::{Request, Response}; @@ -58,7 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: range_query.start, end: range_query.end, step: range_query.step, - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: range_query.lookback, } } Promql::InstantQuery(instant_query) => { @@ -72,7 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService { start: time.clone(), end: time, step: String::from("1s"), - lookback: String::from(DEFAULT_LOOKBACK_STRING), + lookback: instant_query.lookback, } } }; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 7f2ca8268369..8913c879d74b 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use catalog::memory::MemoryCatalogManager; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; -use query::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING}; +use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use query::{QueryEngineFactory, QueryEngineRef}; @@ -193,7 +193,7 @@ impl GrpcQueryHandler for DummyInstance { start: promql.start, end: promql.end, step: promql.step, - lookback: DEFAULT_LOOKBACK_STRING.to_string(), + lookback: promql.lookback, }; let mut result = SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index c115de52ab07..6d7179b18da1 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -790,6 +790,7 @@ CREATE TABLE {table_name} ( start: "1672557973".to_owned(), end: "1672557978".to_owned(), step: "1s".to_owned(), + lookback: "5m".to_string(), })), }); let output = query(instance, request).await; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 3298ead4e4a9..0b38ac252c30 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -504,6 +504,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { let instant_query = PromInstantQuery { query: "test".to_string(), time: "5".to_string(), + lookback: "5m".to_string(), }; let instant_query_request = PromqlRequest { header: Some(header.clone()), @@ -555,6 +556,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { start: "0".to_string(), end: "10".to_string(), step: "5s".to_string(), + lookback: "5m".to_string(), }; let range_query_request: PromqlRequest = PromqlRequest { header: Some(header.clone()), @@ -605,6 +607,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { start: "1000000000".to_string(), end: "1000001000".to_string(), step: "5s".to_string(), + lookback: "5m".to_string(), }; let range_query_request: PromqlRequest = PromqlRequest { header: Some(header), From 643f254fc105e055263b9fa72e83c1499319db06 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Mon, 15 Apr 2024 09:24:50 +0100 Subject: [PATCH 8/9] fix: update greptime-proto revision --- Cargo.lock | 3 +-- Cargo.toml | 2 +- src/client/src/database.rs | 4 +++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d321df6a510c..d9a18be84680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1632,7 +1632,6 @@ dependencies = [ "substrait 0.7.2", "table", "temp-env", - "tempfile", "tikv-jemallocator", "tokio", "toml 0.8.8", @@ -3775,7 +3774,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e#b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=04d78b6e025ceb518040fdd10858c2a9d9345820#04d78b6e025ceb518040fdd10858c2a9d9345820" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index ac91b4eb83b5..788bc68798e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/etolbakov/greptime-proto.git", rev = "1da8c466f9920ec85141bc673c544c627fe79416" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "04d78b6e025ceb518040fdd10858c2a9d9345820" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 8776f90f61fe..70dc7397f5ed 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -37,6 +37,8 @@ use snafu::{ensure, ResultExt}; use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu}; use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter}; +pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; + #[derive(Clone, Debug, Default)] pub struct Database { // The "catalog" and "schema" to be used in processing the requests at the server side. @@ -215,7 +217,7 @@ impl Database { start: start.to_string(), end: end.to_string(), step: step.to_string(), - lookback: "".to_string(), + lookback: DEFAULT_LOOKBACK_STRING.to_string(), })), })) .await From 934bbcf30d64797042fbc46ee963ba2af4deca25 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Mon, 15 Apr 2024 09:59:00 +0100 Subject: [PATCH 9/9] chore: restore accidental removal --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index d9a18be84680..d4f44c1d8374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1632,6 +1632,7 @@ dependencies = [ "substrait 0.7.2", "table", "temp-env", + "tempfile", "tikv-jemallocator", "tokio", "toml 0.8.8",