Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8da84a04b137c4104262459807eab1c04b92f3cc" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "04d78b6e025ceb518040fdd10858c2a9d9345820" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
3 changes: 3 additions & 0 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use snafu::{ensure, ResultExt};
use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu};
use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter};

pub const DEFAULT_LOOKBACK_STRING: &str = "5m";

#[derive(Clone, Debug, Default)]
pub struct Database {
// The "catalog" and "schema" to be used in processing the requests at the server side.
Expand Down Expand Up @@ -215,6 +217,7 @@ impl Database {
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
lookback: DEFAULT_LOOKBACK_STRING.to_string(),
})),
}))
.await
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl GrpcQueryHandler for Instance {
start: promql.start,
end: promql.end,
step: promql.step,
lookback: promql.lookback,
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
Expand Down
9 changes: 8 additions & 1 deletion src/operator/src/statement/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_query::Output;
use common_telemetry::tracing;
use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand All @@ -37,12 +37,16 @@ impl StatementExecutor {
end: eval.end,
step: eval.step,
query: eval.query,
lookback: eval.lookback.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
Comment thread
etolbakov marked this conversation as resolved.
};
QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)?
}
Tql::Explain(explain) => {
let promql = PromQuery {
query: explain.query,
lookback: explain
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
..PromQuery::default()
};
let explain_node_name = if explain.is_verbose {
Expand All @@ -63,6 +67,9 @@ impl StatementExecutor {
end: analyze.end,
step: analyze.step,
query: analyze.query,
lookback: analyze
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};
let analyze_node_name = if analyze.is_verbose {
ANALYZE_VERBOSE_NODE_NAME
Expand Down
17 changes: 14 additions & 3 deletions src/query/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::error::{
};
use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};

const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
Expand Down Expand Up @@ -98,6 +97,7 @@ pub struct PromQuery {
pub start: String,
pub end: String,
pub step: String,
pub lookback: String,
}

impl Default for PromQuery {
Expand All @@ -107,6 +107,7 @@ impl Default for PromQuery {
start: String::from("0"),
end: String::from("0"),
step: String::from("5m"),
lookback: String::from(DEFAULT_LOOKBACK_STRING),
}
}
}
Expand Down Expand Up @@ -165,13 +166,22 @@ impl QueryLanguageParser {
query: &query.query,
})?;

let lookback_delta = query
.lookback
.parse::<u64>()
.map(Duration::from_secs)
.or_else(|_| promql_parser::util::parse_duration(&query.lookback))
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
.context(QueryParseSnafu {
query: &query.query,
})?;

let eval_stmt = EvalStmt {
expr,
start,
end,
interval: step,
// TODO(ruihang): provide a way to adjust this parameter.
lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK),
lookback_delta,
};

Ok(QueryStatement::Promql(eval_stmt))
Expand Down Expand Up @@ -353,6 +363,7 @@ mod test {
start: "2022-02-13T17:14:00Z".to_string(),
end: "2023-02-13T17:14:00Z".to_string(),
step: "1d".to_string(),
lookback: "5m".to_string(),
};

#[cfg(not(windows))]
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub enum Error {

#[snafu(display("Failed to parse PromQL: {query:?}"))]
ParsePromQL {
query: PromQuery,
query: Box<PromQuery>,
location: Location,
source: query::error::Error,
},
Expand Down
2 changes: 2 additions & 0 deletions src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService {
start: range_query.start,
end: range_query.end,
step: range_query.step,
lookback: range_query.lookback,
}
}
Promql::InstantQuery(instant_query) => {
Expand All @@ -71,6 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService {
start: time.clone(),
end: time,
step: String::from("1s"),
lookback: instant_query.lookback,
}
}
};
Expand Down
6 changes: 5 additions & 1 deletion src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::tracing;
use query::parser::PromQuery;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -205,6 +205,7 @@ pub struct PromqlQuery {
pub start: String,
pub end: String,
pub step: String,
pub lookback: Option<String>,
pub db: Option<String>,
}

Expand All @@ -215,6 +216,9 @@ impl From<PromqlQuery> for PromQuery {
start: query.start,
end: query.end,
step: query.step,
lookback: query
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/servers/src/http/prometheus.rs
Comment thread
tisonkun marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub async fn build_info_query() -> PrometheusJsonResponse {
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct InstantQuery {
query: Option<String>,
lookback: Option<String>,
time: Option<String>,
timeout: Option<String>,
db: Option<String>,
Expand All @@ -178,6 +179,10 @@ pub async fn instant_query(
start: time.clone(),
end: time,
step: "1s".to_string(),
lookback: params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};

let result = handler.do_query(&prom_query, query_ctx).await;
Expand All @@ -196,6 +201,7 @@ pub struct RangeQuery {
start: Option<String>,
end: Option<String>,
step: Option<String>,
lookback: Option<String>,
timeout: Option<String>,
db: Option<String>,
}
Expand All @@ -216,6 +222,10 @@ pub async fn range_query(
start: params.start.or(form_params.start).unwrap_or_default(),
end: params.end.or(form_params.end).unwrap_or_default(),
step: params.step.or(form_params.step).unwrap_or_default(),
lookback: params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};

let result = handler.do_query(&prom_query, query_ctx).await;
Expand All @@ -235,6 +245,7 @@ struct Matches(Vec<String>);
pub struct LabelsQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -310,6 +321,11 @@ pub async fn labels_query(
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);

let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut labels = HashSet::new();
let _ = labels.insert(METRIC_NAME.to_string());

Expand All @@ -320,6 +336,7 @@ pub async fn labels_query(
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
};

let result = handler.do_query(&prom_query, query_ctx.clone()).await;
Expand Down Expand Up @@ -546,6 +563,7 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
pub struct LabelValueQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -587,6 +605,9 @@ pub async fn label_values_query(

let start = params.start.unwrap_or_else(yesterday_rfc3339);
let end = params.end.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut label_values = HashSet::new();

Expand All @@ -597,6 +618,7 @@ pub async fn label_values_query(
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
Comment thread
tisonkun marked this conversation as resolved.
lookback: lookback.clone(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) =
Expand Down Expand Up @@ -695,6 +717,7 @@ async fn retrieve_label_values_from_record_batch(
pub struct SeriesQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -726,6 +749,10 @@ pub async fn series_query(
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut series = Vec::new();
let mut merge_map = HashMap::new();
Expand All @@ -737,6 +764,7 @@ pub async fn series_query(
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;

Expand Down
1 change: 1 addition & 0 deletions src/servers/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl GrpcQueryHandler for DummyInstance {
start: promql.start,
end: promql.end,
step: promql.step,
lookback: promql.lookback,
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await;
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ CREATE TABLE {table_name} (
start: "1672557973".to_owned(),
end: "1672557978".to_owned(),
step: "1s".to_owned(),
lookback: "5m".to_string(),
})),
});
let output = query(instance, request).await;
Expand Down
3 changes: 3 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
let instant_query = PromInstantQuery {
query: "test".to_string(),
time: "5".to_string(),
lookback: "5m".to_string(),
};
let instant_query_request = PromqlRequest {
header: Some(header.clone()),
Expand Down Expand Up @@ -555,6 +556,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
start: "0".to_string(),
end: "10".to_string(),
step: "5s".to_string(),
lookback: "5m".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header.clone()),
Expand Down Expand Up @@ -605,6 +607,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
start: "1000000000".to_string(),
end: "1000001000".to_string(),
step: "5s".to_string(),
lookback: "5m".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
+-+-+
| plan_type_| plan_|
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[1000], time index=[j], REDACTED
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED
Comment thread
tisonkun marked this conversation as resolved.
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
Expand Down
26 changes: 13 additions & 13 deletions tests/cases/standalone/common/tql-explain-analyze/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ TQL EXPLAIN (0, 10, '5s') test;
-- SQLNESS REPLACE (peers.*) REDACTED
TQL EXPLAIN (0, 10, '1s', '2s') test;

+---------------+-----------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
| | |
+---------------+---------------------------------------------------------------------------------------------+

-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
Expand Down
Loading