Skip to content

feat: truly limit time range by split window #6295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/flow/src/batching_mode/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,37 @@ impl BatchingEngine {
Ok(())
}

/// Only flush the dirty windows of the flow task with given flow id, by running the query on it.
/// As flush the whole time range is usually prohibitively expensive.
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
debug!("Try flush flow {flow_id}");
// need to wait a bit to ensure previous mirror insert is handled
// this is only useful for the case when we are flushing the flow right after inserting data into it
// TODO(discord9): find a better way to ensure the data is ready, maybe inform flownode from frontend?
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it turned out, it's the mirror insert(which is a async task) that happen before flush_flow in sql file, which might not complete insert into when admin flush_flow is called, this temp fix is to wait a bit so previous sql might be able to complete, proper fix need some kind of sync but it's complex

let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;

task.mark_all_windows_as_dirty()?;
let time_window_size = task
.config
.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size());

let cur_dirty_window_cnt = time_window_size.map(|time_window_size| {
task.state
.read()
.unwrap()
.dirty_time_windows
.effective_count(&time_window_size)
});

let res = task
.gen_exec_once(&self.query_engine, &self.frontend_client)
.gen_exec_once(
&self.query_engine,
&self.frontend_client,
cur_dirty_window_cnt,
)
.await?;

let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize;
Expand Down
199 changes: 163 additions & 36 deletions src/flow/src/batching_mode/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_telemetry::tracing::warn;
use common_time::Timestamp;
use datatypes::value::Value;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::oneshot;
use tokio::time::Instant;

Expand All @@ -31,7 +31,8 @@ use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
};
use crate::{Error, FlowId};

Expand Down Expand Up @@ -178,6 +179,33 @@ impl DirtyTimeWindows {
self.windows.len()
}

/// Get the effective count of time windows, which is the number of time windows that can be
/// used for query, compute from total time window range divided by `window_size`.
pub fn effective_count(&self, window_size: &Duration) -> usize {
if self.windows.is_empty() {
return 0;
}
let window_size =
chrono::Duration::from_std(*window_size).unwrap_or(chrono::Duration::zero());
let total_window_time_range =
self.windows
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc + window_size
}
});

// not sure window_size is zero have any meaning, but just in case
if window_size.num_seconds() == 0 {
0
} else {
(total_window_time_range.num_seconds() / window_size.num_seconds()) as usize
}
}

/// Generate all filter expressions consuming all time windows
///
/// there is two limits:
Expand All @@ -192,6 +220,13 @@ impl DirtyTimeWindows {
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<datafusion_expr::Expr>, Error> {
ensure!(
window_size.num_seconds() > 0,
UnexpectedSnafu {
reason: "window_size is zero, can't generate filter exprs",
}
);

debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
Expand Down Expand Up @@ -228,62 +263,94 @@ impl DirtyTimeWindows {

// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}

// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}

if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
let mut to_be_query = BTreeMap::new();
let mut new_windows = self.windows.clone();
let mut cur_time_range = chrono::Duration::zero();
for (idx, (start, end)) in self.windows.iter().enumerate() {
let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);

// if time range is too long, stop
if cur_time_range >= max_time_range {
break;
}

nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
// if we have enough time windows, stop
if idx >= window_cnt {
break;
}

after
let Some(x) = end.sub(start) else {
continue;
};
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
std::mem::take(&mut self.windows)
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
if surplus.num_seconds() <= window_size.num_seconds() {
// Skip splitting if surplus is smaller than window_size
break;
}
let times = surplus.num_seconds() / window_size.num_seconds();

let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));

// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
break;
}
};
}

self.windows = new_windows;

METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
.observe(to_be_query.len() as f64);

let full_time_range = first_nth
let full_time_range = to_be_query
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc
acc + window_size
}
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(full_time_range);

let stalled_time_range =
self.windows
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc + window_size
}
});

METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(stalled_time_range.num_seconds() as f64);

let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
for (start, end) in to_be_query.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
Expand Down Expand Up @@ -517,6 +584,64 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
)
),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired
(
vec![
Expand All @@ -533,6 +658,8 @@ mod test {
None
),
];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases
{
Expand Down
15 changes: 9 additions & 6 deletions src/flow/src/batching_mode/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ impl BatchingTask {
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
max_window_cnt: Option<usize>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
debug!("Generate new query: {}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
} else {
Expand All @@ -224,6 +225,7 @@ impl BatchingTask {
pub async fn gen_insert_plan(
&self,
engine: &QueryEngineRef,
max_window_cnt: Option<usize>,
) -> Result<Option<LogicalPlan>, Error> {
let (table, df_schema) = get_table_info_df_schema(
self.config.catalog_manager.clone(),
Expand All @@ -232,7 +234,7 @@ impl BatchingTask {
.await?;

let new_query = self
.gen_query_with_time_window(engine.clone(), &table.meta.schema)
.gen_query_with_time_window(engine.clone(), &table.meta.schema, max_window_cnt)
.await?;

let insert_into = if let Some((new_query, _column_cnt)) = new_query {
Expand Down Expand Up @@ -437,7 +439,7 @@ impl BatchingTask {
.with_label_values(&[&flow_id_str])
.inc();

let new_query = match self.gen_insert_plan(&engine).await {
let new_query = match self.gen_insert_plan(&engine, None).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
Expand Down Expand Up @@ -521,6 +523,7 @@ impl BatchingTask {
&self,
engine: QueryEngineRef,
sink_table_schema: &Arc<Schema>,
max_window_cnt: Option<usize>,
) -> Result<Option<(LogicalPlan, usize)>, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let start = SystemTime::now();
Expand Down Expand Up @@ -574,8 +577,8 @@ impl BatchingTask {
};

debug!(
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}",
self.config.flow_id, l, u
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows
);
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
Expand All @@ -601,7 +604,7 @@ impl BatchingTask {
&col_name,
Some(l),
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
max_window_cnt.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM),
self.config.flow_id,
Some(self),
)?;
Expand Down
14 changes: 11 additions & 3 deletions src/flow/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,18 @@ lazy_static! {
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec =
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
"greptime_flow_batching_engine_query_window_size_secs",
"flow batching engine query window size(seconds)",
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_window_size_secs",
"flow batching engine stalled window size(seconds)",
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
Expand Down
Loading
Loading