diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index e18e466867cf..83e2f90b27b0 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -476,15 +476,37 @@ impl BatchingEngine { Ok(()) } + /// Only flush the dirty windows of the flow task with given flow id, by running the query on it. + /// As flush the whole time range is usually prohibitively expensive. pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result { debug!("Try flush flow {flow_id}"); + // need to wait a bit to ensure previous mirror insert is handled + // this is only useful for the case when we are flushing the flow right after inserting data into it + // TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend? + tokio::time::sleep(std::time::Duration::from_millis(100)).await; let task = self.tasks.read().await.get(&flow_id).cloned(); let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?; - task.mark_all_windows_as_dirty()?; + let time_window_size = task + .config + .time_window_expr + .as_ref() + .and_then(|expr| *expr.time_window_size()); + + let cur_dirty_window_cnt = time_window_size.map(|time_window_size| { + task.state + .read() + .unwrap() + .dirty_time_windows + .effective_count(&time_window_size) + }); let res = task - .gen_exec_once(&self.query_engine, &self.frontend_client) + .gen_exec_once( + &self.query_engine, + &self.frontend_client, + cur_dirty_window_cnt, + ) .await?; let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 7050f7ab94b4..a816b663bb12 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -22,7 +22,7 @@ use common_telemetry::tracing::warn; use common_time::Timestamp; use datatypes::value::Value; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::oneshot; use tokio::time::Instant; @@ -31,7 +31,8 @@ use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, + METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE, + METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE, }; use crate::{Error, FlowId}; @@ -178,6 +179,33 @@ impl DirtyTimeWindows { self.windows.len() } + /// Get the effective count of time windows, which is the number of time windows that can be + /// used for query, compute from total time window range divided by `window_size`. + pub fn effective_count(&self, window_size: &Duration) -> usize { + if self.windows.is_empty() { + return 0; + } + let window_size = + chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero()); + let total_window_time_range = + self.windows + .iter() + .fold(chrono::Duration::zero(), |acc, (start, end)| { + if let Some(end) = end { + acc + end.sub(start).unwrap_or(chrono::Duration::zero()) + } else { + acc + window_size + } + }); + + // not sure window_size is zero have any meaning, but just in case + if window_size.num_seconds() == 0 { + 0 + } else { + (total_window_time_range.num_seconds() / window_size.num_seconds()) as usize + } + } + /// Generate all filter expressions consuming all time windows /// /// there is two limits: @@ -192,6 +220,13 @@ impl DirtyTimeWindows { flow_id: FlowId, task_ctx: Option<&BatchingTask>, ) -> Result, Error> { + ensure!( + window_size.num_seconds() > 0, + UnexpectedSnafu { + reason: "window_size is zero, can't generate filter exprs", + } + ); + debug!( "expire_lower_bound: {:?}, window_size: {:?}", expire_lower_bound.map(|t| t.to_iso8601_string()), @@ -228,62 +263,94 @@ impl DirtyTimeWindows { // get the first `window_cnt` time windows let max_time_range = window_size * window_cnt as i32; - let nth = { - let mut cur_time_range = chrono::Duration::zero(); - let mut nth_key = None; - for (idx, (start, end)) in self.windows.iter().enumerate() { - // if time range is too long, stop - if cur_time_range > max_time_range { - nth_key = Some(*start); - break; - } - // if we have enough time windows, stop - if idx >= window_cnt { - nth_key = Some(*start); - break; - } - - if let Some(end) = end { - if let Some(x) = end.sub(start) { - cur_time_range += x; - } - } + let mut to_be_query = BTreeMap::new(); + let mut new_windows = self.windows.clone(); + let mut cur_time_range = chrono::Duration::zero(); + for (idx, (start, end)) in self.windows.iter().enumerate() { + let first_end = start + .add_duration(window_size.to_std().unwrap()) + .context(TimeSnafu)?; + let end = end.unwrap_or(first_end); + + // if time range is too long, stop + if cur_time_range >= max_time_range { + break; } - nth_key - }; - let first_nth = { - if let Some(nth) = nth { - let mut after = self.windows.split_off(&nth); - std::mem::swap(&mut self.windows, &mut after); + // if we have enough time windows, stop + if idx >= window_cnt { + break; + } - after + let Some(x) = end.sub(start) else { + continue; + }; + if cur_time_range + x <= max_time_range { + to_be_query.insert(*start, Some(end)); + new_windows.remove(start); + cur_time_range += x; } else { - std::mem::take(&mut self.windows) + // too large a window, split it + // split at window_size * times + let surplus = max_time_range - cur_time_range; + if surplus.num_seconds() <= window_size.num_seconds() { + // Skip splitting if surplus is smaller than window_size + break; + } + let times = surplus.num_seconds() / window_size.num_seconds(); + + let split_offset = window_size * times as i32; + let split_at = start + .add_duration(split_offset.to_std().unwrap()) + .context(TimeSnafu)?; + to_be_query.insert(*start, Some(split_at)); + + // remove the original window + new_windows.remove(start); + new_windows.insert(split_at, Some(end)); + cur_time_range += split_offset; + break; } - }; + } + + self.windows = new_windows; METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT .with_label_values(&[flow_id.to_string().as_str()]) - .observe(first_nth.len() as f64); + .observe(to_be_query.len() as f64); - let full_time_range = first_nth + let full_time_range = to_be_query .iter() .fold(chrono::Duration::zero(), |acc, (start, end)| { if let Some(end) = end { acc + end.sub(start).unwrap_or(chrono::Duration::zero()) } else { - acc + acc + window_size } }) .num_seconds() as f64; - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE + METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE .with_label_values(&[flow_id.to_string().as_str()]) .observe(full_time_range); + let stalled_time_range = + self.windows + .iter() + .fold(chrono::Duration::zero(), |acc, (start, end)| { + if let Some(end) = end { + acc + end.sub(start).unwrap_or(chrono::Duration::zero()) + } else { + acc + window_size + } + }); + + METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE + .with_label_values(&[flow_id.to_string().as_str()]) + .observe(stalled_time_range.num_seconds() as f64); + let mut expr_lst = vec![]; - for (start, end) in first_nth.into_iter() { + for (start, end) in to_be_query.into_iter() { // align using time window exprs let (start, end) = if let Some(ctx) = task_ctx { let Some(time_window_expr) = &ctx.config.time_window_expr else { @@ -517,6 +584,64 @@ mod test { "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))", ) ), + // split range + ( + Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once( + Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)), + ))), + (chrono::Duration::seconds(3), None), + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second( + 60 + )), + ), + ( + Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)), + Some(Timestamp::new_second( + 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3 + )), + )]), + Some( + "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))", + ) + ), + // split 2 min into 1 min + ( + Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))), + (chrono::Duration::seconds(3), None), + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second( + 40 * 3 + )), + )]), + Some( + "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))", + ) + ), + // split 3s + 1min into 3s + 57s + ( + Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))), + (chrono::Duration::seconds(3), None), + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second( + 3 + )), + ),( + Timestamp::new_second(20), + Some(Timestamp::new_second( + 140 + )), + )]), + Some( + "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))", + ) + ), // expired ( vec![ @@ -533,6 +658,8 @@ mod test { None ), ]; + // let len = testcases.len(); + // let testcases = testcases[(len - 2)..(len - 1)].to_vec(); for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in testcases { diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index aef9eea5e8fc..9f203bb8d34d 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -211,8 +211,9 @@ impl BatchingTask { &self, engine: &QueryEngineRef, frontend_client: &Arc, + max_window_cnt: Option, ) -> Result, Error> { - if let Some(new_query) = self.gen_insert_plan(engine).await? { + if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? { debug!("Generate new query: {}", new_query); self.execute_logical_plan(frontend_client, &new_query).await } else { @@ -224,6 +225,7 @@ impl BatchingTask { pub async fn gen_insert_plan( &self, engine: &QueryEngineRef, + max_window_cnt: Option, ) -> Result, Error> { let (table, df_schema) = get_table_info_df_schema( self.config.catalog_manager.clone(), @@ -232,7 +234,7 @@ impl BatchingTask { .await?; let new_query = self - .gen_query_with_time_window(engine.clone(), &table.meta.schema) + .gen_query_with_time_window(engine.clone(), &table.meta.schema, max_window_cnt) .await?; let insert_into = if let Some((new_query, _column_cnt)) = new_query { @@ -437,7 +439,7 @@ impl BatchingTask { .with_label_values(&[&flow_id_str]) .inc(); - let new_query = match self.gen_insert_plan(&engine).await { + let new_query = match self.gen_insert_plan(&engine, None).await { Ok(new_query) => new_query, Err(err) => { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id); @@ -521,6 +523,7 @@ impl BatchingTask { &self, engine: QueryEngineRef, sink_table_schema: &Arc, + max_window_cnt: Option, ) -> Result, Error> { let query_ctx = self.state.read().unwrap().query_ctx.clone(); let start = SystemTime::now(); @@ -574,8 +577,8 @@ impl BatchingTask { }; debug!( - "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}", - self.config.flow_id, l, u + "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}", + self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows ); let window_size = u.sub(&l).with_context(|| UnexpectedSnafu { reason: format!("Can't get window size from {u:?} - {l:?}"), @@ -601,7 +604,7 @@ impl BatchingTask { &col_name, Some(l), window_size, - DirtyTimeWindows::MAX_FILTER_NUM, + max_window_cnt.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM), self.config.flow_id, Some(self), )?; diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 66ec7423e0c3..58c01793ccdb 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -50,10 +50,18 @@ lazy_static! { vec![0.0, 5., 10., 20., 40.] ) .unwrap(); - pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec = + pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec = register_histogram_vec!( - "greptime_flow_batching_engine_query_time_range_secs", - "flow batching engine query time range(seconds)", + "greptime_flow_batching_engine_query_window_size_secs", + "flow batching engine query window size(seconds)", + &["flow_id"], + vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_stalled_window_size_secs", + "flow batching engine stalled window size(seconds)", &["flow_id"], vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) diff --git a/tests/cases/standalone/common/flow/flow_flush.result b/tests/cases/standalone/common/flow/flow_flush.result index f9a8a43af8cd..44906218d950 100644 --- a/tests/cases/standalone/common/flow/flow_flush.result +++ b/tests/cases/standalone/common/flow/flow_flush.result @@ -42,11 +42,8 @@ SELECT FROM out_num_cnt_basic; -+---------------------------------+-------------------------+ -| sum(numbers_input_basic.number) | time_window | -+---------------------------------+-------------------------+ -| 42 | 2021-07-01T00:00:00.100 | -+---------------------------------+-------------------------+ +++ +++ DROP FLOW test_numbers_basic; diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index 70946a173943..51daec97160a 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -690,25 +690,21 @@ CREATE TABLE IF NOT EXISTS `api_stats` ( `key` STRING NULL, `qpm` BIGINT NULL, `rpm` BIGINT NULL, - `update_at` TIMESTAMP(3) NULL, TIME INDEX (`time`), PRIMARY KEY (`key`) -) ENGINE=mito WITH( - append_mode = 'false', - merge_mode = 'last_row' -); +) ENGINE=mito; Affected Rows: 0 CREATE FLOW IF NOT EXISTS api_stats_flow -SINK TO api_stats EXPIRE AFTER '10 minute'::INTERVAL AS +SINK TO api_stats AS SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`) FROM api_log GROUP BY `time1`, `key`; Affected Rows: 0 -INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (0::TimestampSecond, '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); Affected Rows: 1 @@ -721,13 +717,13 @@ ADMIN FLUSH_FLOW('api_stats_flow'); | FLOW_FLUSHED | +------------------------------------+ -SELECT key FROM api_stats; +SELECT * FROM api_stats; -+-----+ -| key | -+-----+ -| 1 | -+-----+ ++---------------------+-----+-----+-----+ +| time | key | qpm | rpm | ++---------------------+-----+-----+-----+ +| 1970-01-01T00:00:00 | 1 | 1 | 1 | ++---------------------+-----+-----+-----+ -- SQLNESS ARG restart=true SELECT 1; @@ -739,12 +735,11 @@ SELECT 1; +----------+ -- SQLNESS SLEEP 5s -INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (0::TimestampSecond, '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); Affected Rows: 1 -- wait more time so flownode have time to recover flows --- SQLNESS SLEEP 5s -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); @@ -754,14 +749,15 @@ ADMIN FLUSH_FLOW('api_stats_flow'); | FLOW_FLUSHED | +------------------------------------+ -SELECT key FROM api_stats; - -+-----+ -| key | -+-----+ -| 1 | -| 2 | -+-----+ +-- SQLNESS SLEEP 5s +SELECT * FROM api_stats; + ++---------------------+-----+-----+-----+ +| time | key | qpm | rpm | ++---------------------+-----+-----+-----+ +| 1970-01-01T00:00:00 | 1 | 1 | 1 | +| 1970-01-01T00:00:00 | 2 | 1 | 1 | ++---------------------+-----+-----+-----+ DROP FLOW api_stats_flow; diff --git a/tests/cases/standalone/common/flow/flow_rebuild.sql b/tests/cases/standalone/common/flow/flow_rebuild.sql index 170696ed26c7..9e62309c0b43 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.sql +++ b/tests/cases/standalone/common/flow/flow_rebuild.sql @@ -377,39 +377,35 @@ CREATE TABLE IF NOT EXISTS `api_stats` ( `key` STRING NULL, `qpm` BIGINT NULL, `rpm` BIGINT NULL, - `update_at` TIMESTAMP(3) NULL, TIME INDEX (`time`), PRIMARY KEY (`key`) -) ENGINE=mito WITH( - append_mode = 'false', - merge_mode = 'last_row' -); +) ENGINE=mito; CREATE FLOW IF NOT EXISTS api_stats_flow -SINK TO api_stats EXPIRE AFTER '10 minute'::INTERVAL AS +SINK TO api_stats AS SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`) FROM api_log GROUP BY `time1`, `key`; -INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (0::TimestampSecond, '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); -SELECT key FROM api_stats; +SELECT * FROM api_stats; -- SQLNESS ARG restart=true SELECT 1; -- SQLNESS SLEEP 5s -INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (0::TimestampSecond, '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); -- wait more time so flownode have time to recover flows --- SQLNESS SLEEP 5s -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); -SELECT key FROM api_stats; +-- SQLNESS SLEEP 5s +SELECT * FROM api_stats; DROP FLOW api_stats_flow;