Skip to content

Commit 45e990b

Browse files
authored
refactor: propagate flush reasons through FlushRegions path (GreptimeTeam#8051)
* feat: propagate flush reasons through FlushRegions path Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com> * refactor: address flush reason review feedback Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com> * refactor: keep flush instruction helper name Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com> --------- Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
1 parent b8951a3 commit 45e990b

24 files changed

Lines changed: 378 additions & 127 deletions

src/common/meta/src/instruction.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::time::Duration;
1818

1919
use serde::{Deserialize, Deserializer, Serialize, Serializer};
2020
use store_api::region_engine::SyncRegionFromRequest;
21+
use store_api::region_request::RegionFlushReason;
2122
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
2223
use strum::Display;
2324
use table::metadata::TableId;
@@ -338,14 +339,17 @@ pub struct FlushRegions {
338339
/// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
339340
#[serde(default)]
340341
pub error_strategy: FlushErrorStrategy,
342+
/// The source that triggered this flush.
343+
#[serde(default, skip_serializing_if = "Option::is_none")]
344+
pub reason: Option<RegionFlushReason>,
341345
}
342346

343347
impl Display for FlushRegions {
344348
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
345349
write!(
346350
f,
347-
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
348-
self.region_ids, self.strategy, self.error_strategy
351+
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?}, reason={:?})",
352+
self.region_ids, self.strategy, self.error_strategy, self.reason
349353
)
350354
}
351355
}
@@ -357,6 +361,7 @@ impl FlushRegions {
357361
region_ids: vec![region_id],
358362
strategy: FlushStrategy::Sync,
359363
error_strategy: FlushErrorStrategy::FailFast,
364+
reason: None,
360365
}
361366
}
362367

@@ -366,6 +371,7 @@ impl FlushRegions {
366371
region_ids,
367372
strategy: FlushStrategy::Async,
368373
error_strategy: FlushErrorStrategy::TryAll,
374+
reason: None,
369375
}
370376
}
371377

@@ -375,9 +381,15 @@ impl FlushRegions {
375381
region_ids,
376382
strategy: FlushStrategy::Sync,
377383
error_strategy,
384+
reason: None,
378385
}
379386
}
380387

388+
pub fn with_reason(mut self, reason: RegionFlushReason) -> Self {
389+
self.reason = Some(reason);
390+
self
391+
}
392+
381393
/// Check if this is a single region flush.
382394
pub fn is_single_region(&self) -> bool {
383395
self.region_ids.len() == 1
@@ -1363,6 +1375,7 @@ mod tests {
13631375
assert!(!single_sync.is_hint());
13641376
assert!(single_sync.is_sync());
13651377
assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
1378+
assert_eq!(single_sync.reason, None);
13661379
assert!(single_sync.is_single_region());
13671380
assert_eq!(single_sync.single_region_id(), Some(region_id));
13681381

@@ -1374,6 +1387,7 @@ mod tests {
13741387
assert!(batch_async.is_hint());
13751388
assert!(!batch_async.is_sync());
13761389
assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
1390+
assert_eq!(batch_async.reason, None);
13771391
assert!(!batch_async.is_single_region());
13781392
assert_eq!(batch_async.single_region_id(), None);
13791393

@@ -1384,6 +1398,10 @@ mod tests {
13841398
assert!(!batch_sync.is_hint());
13851399
assert!(batch_sync.is_sync());
13861400
assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
1401+
assert_eq!(batch_sync.reason, None);
1402+
1403+
let with_reason = batch_sync.with_reason(RegionFlushReason::RemoteWalPrune);
1404+
assert_eq!(with_reason.reason, Some(RegionFlushReason::RemoteWalPrune));
13871405
}
13881406

13891407
#[test]
@@ -1401,6 +1419,7 @@ mod tests {
14011419
region_ids: vec![region_id],
14021420
strategy: FlushStrategy::Async,
14031421
error_strategy: FlushErrorStrategy::TryAll,
1422+
reason: None,
14041423
};
14051424
assert_eq!(flush_regions.region_ids, vec![region_id]);
14061425
assert_eq!(flush_regions.strategy, FlushStrategy::Async);
@@ -1450,13 +1469,40 @@ mod tests {
14501469
let instruction = Instruction::FlushRegions(flush_regions.clone());
14511470

14521471
let serialized = serde_json::to_string(&instruction).unwrap();
1472+
assert!(!serialized.contains("reason"));
14531473
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
14541474

14551475
match deserialized {
14561476
Instruction::FlushRegions(fr) => {
14571477
assert_eq!(fr.region_ids, vec![region_id]);
14581478
assert_eq!(fr.strategy, FlushStrategy::Sync);
14591479
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1480+
assert_eq!(fr.reason, None);
1481+
}
1482+
_ => panic!("Expected FlushRegions instruction"),
1483+
}
1484+
1485+
let legacy = r#"{"FlushRegions":{"region_ids":[4398046511105],"strategy":"Sync","error_strategy":"FailFast"}}"#;
1486+
let deserialized: Instruction = serde_json::from_str(legacy).unwrap();
1487+
match deserialized {
1488+
Instruction::FlushRegions(fr) => {
1489+
assert_eq!(fr.region_ids, vec![region_id]);
1490+
assert_eq!(fr.strategy, FlushStrategy::Sync);
1491+
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
1492+
assert_eq!(fr.reason, None);
1493+
}
1494+
_ => panic!("Expected FlushRegions instruction"),
1495+
}
1496+
1497+
let flush_regions = FlushRegions::async_batch(vec![region_id])
1498+
.with_reason(RegionFlushReason::RemoteWalPrune);
1499+
let instruction = Instruction::FlushRegions(flush_regions);
1500+
let serialized = serde_json::to_string(&instruction).unwrap();
1501+
assert!(serialized.contains(r#""reason":"RemoteWalPrune""#));
1502+
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
1503+
match deserialized {
1504+
Instruction::FlushRegions(fr) => {
1505+
assert_eq!(fr.reason, Some(RegionFlushReason::RemoteWalPrune));
14601506
}
14611507
_ => panic!("Expected FlushRegions instruction"),
14621508
}
@@ -1479,6 +1525,7 @@ mod tests {
14791525
assert!(!fr.is_hint());
14801526
assert!(fr.is_sync());
14811527
assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
1528+
assert_eq!(fr.reason, None);
14821529
}
14831530
_ => panic!("Expected FlushRegions instruction"),
14841531
}

src/datanode/src/heartbeat/handler/downgrade_region.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,7 @@ impl DowngradeRegionsHandler {
113113
region_server_moved
114114
.handle_request(
115115
region_id,
116-
RegionRequest::Flush(RegionFlushRequest {
117-
row_group_size: None,
118-
}),
116+
RegionRequest::Flush(RegionFlushRequest::default()),
119117
)
120118
.await?;
121119

src/datanode/src/heartbeat/handler/flush_region.rs

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use common_meta::instruction::{
1818
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
1919
};
2020
use common_telemetry::{debug, warn};
21-
use store_api::region_request::{RegionFlushRequest, RegionRequest};
21+
use store_api::region_request::{RegionFlushReason, RegionFlushRequest, RegionRequest};
2222
use store_api::storage::RegionId;
2323

2424
use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
@@ -39,14 +39,17 @@ impl InstructionHandler for FlushRegionsHandler {
3939
let strategy = flush_regions.strategy;
4040
let region_ids = flush_regions.region_ids;
4141
let error_strategy = flush_regions.error_strategy;
42+
let reason = flush_regions.reason;
4243

4344
let reply = if matches!(strategy, FlushStrategy::Async) {
4445
// Asynchronous hint mode: fire-and-forget, no reply expected
45-
ctx.handle_flush_hint(region_ids).await;
46+
ctx.handle_flush_hint(region_ids, reason).await;
4647
None
4748
} else {
4849
// Synchronous mode: return reply with results
49-
let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
50+
let reply = ctx
51+
.handle_flush_sync(region_ids, error_strategy, reason)
52+
.await;
5053
Some(InstructionReply::FlushRegions(reply))
5154
};
5255

@@ -62,9 +65,14 @@ impl InstructionHandler for FlushRegionsHandler {
6265

6366
impl HandlerContext {
6467
/// Performs the actual region flush operation.
65-
async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
68+
async fn perform_region_flush(
69+
&self,
70+
region_id: RegionId,
71+
reason: Option<RegionFlushReason>,
72+
) -> Result<()> {
6673
let request = RegionRequest::Flush(RegionFlushRequest {
67-
row_group_size: None,
74+
reason,
75+
..Default::default()
6876
});
6977
self.region_server
7078
.handle_request(region_id, request)
@@ -73,10 +81,14 @@ impl HandlerContext {
7381
}
7482

7583
/// Handles asynchronous flush hints (fire-and-forget).
76-
async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
84+
async fn handle_flush_hint(
85+
&self,
86+
region_ids: Vec<RegionId>,
87+
reason: Option<RegionFlushReason>,
88+
) {
7789
let start_time = Instant::now();
7890
for region_id in &region_ids {
79-
let result = self.perform_region_flush(*region_id).await;
91+
let result = self.perform_region_flush(*region_id, reason).await;
8092
match result {
8193
Ok(_) => {}
8294
Err(error::Error::RegionNotFound { .. }) => {
@@ -102,11 +114,12 @@ impl HandlerContext {
102114
&self,
103115
region_ids: Vec<RegionId>,
104116
error_strategy: FlushErrorStrategy,
117+
reason: Option<RegionFlushReason>,
105118
) -> FlushRegionReply {
106119
let mut results = Vec::with_capacity(region_ids.len());
107120

108121
for region_id in region_ids {
109-
let result = self.flush_single_region_sync(region_id).await;
122+
let result = self.flush_single_region_sync(region_id, reason).await;
110123

111124
match &result {
112125
Ok(_) => results.push((region_id, Ok(()))),
@@ -127,7 +140,11 @@ impl HandlerContext {
127140
}
128141

129142
/// Flushes a single region synchronously with proper error handling.
130-
async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
143+
async fn flush_single_region_sync(
144+
&self,
145+
region_id: RegionId,
146+
reason: Option<RegionFlushReason>,
147+
) -> Result<()> {
131148
// Check if region is leader and writable
132149
let Some(writable) = self.region_server.is_region_leader(region_id) else {
133150
return Err(RegionNotFoundSnafu { region_id }.build());
@@ -148,7 +165,8 @@ impl HandlerContext {
148165
.handle_request(
149166
region_id,
150167
RegionRequest::Flush(RegionFlushRequest {
151-
row_group_size: None,
168+
reason,
169+
..Default::default()
152170
}),
153171
)
154172
.await?;
@@ -184,19 +202,27 @@ mod tests {
184202
use super::*;
185203
use crate::tests::{MockRegionEngine, mock_region_server};
186204

205+
type FlushedRequests = Arc<RwLock<Vec<(RegionId, Option<RegionFlushReason>)>>>;
206+
187207
#[tokio::test]
188208
async fn test_handle_flush_region_hint() {
189-
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
209+
let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
190210

191211
let mock_region_server = mock_region_server();
192212
let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
193213
for region_id in &region_ids {
194-
let flushed_region_ids_ref = flushed_region_ids.clone();
214+
let flushed_requests_ref = flushed_requests.clone();
195215
let (mock_engine, _) =
196216
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
197217
region_engine.handle_request_mock_fn =
198-
Some(Box::new(move |region_id, _request| {
199-
flushed_region_ids_ref.write().unwrap().push(region_id);
218+
Some(Box::new(move |region_id, request| {
219+
let RegionRequest::Flush(request) = request else {
220+
panic!("Expected flush request");
221+
};
222+
flushed_requests_ref
223+
.write()
224+
.unwrap()
225+
.push((region_id, request.reason));
200226
Ok(0)
201227
}))
202228
});
@@ -206,44 +232,56 @@ mod tests {
206232
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
207233

208234
// Async hint mode
209-
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
235+
let flush_instruction = FlushRegions::async_batch(region_ids.clone())
236+
.with_reason(RegionFlushReason::RemoteWalPrune);
210237
let reply = FlushRegionsHandler
211238
.handle(&handler_context, flush_instruction)
212239
.await;
213240
assert!(reply.is_none()); // Hint mode returns no reply
214-
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
241+
let expected = region_ids
242+
.iter()
243+
.map(|region_id| (*region_id, Some(RegionFlushReason::RemoteWalPrune)))
244+
.collect::<Vec<_>>();
245+
assert_eq!(*flushed_requests.read().unwrap(), expected);
215246

216247
// Non-existent regions
217-
flushed_region_ids.write().unwrap().clear();
248+
flushed_requests.write().unwrap().clear();
218249
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
219250
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
220251
let reply = FlushRegionsHandler
221252
.handle(&handler_context, flush_instruction)
222253
.await;
223254
assert!(reply.is_none());
224-
assert!(flushed_region_ids.read().unwrap().is_empty());
255+
assert!(flushed_requests.read().unwrap().is_empty());
225256
}
226257

227258
#[tokio::test]
228259
async fn test_handle_flush_region_sync_single() {
229-
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
260+
let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
230261

231262
let mock_region_server = mock_region_server();
232263
let region_id = RegionId::new(1024, 1);
233264

234-
let flushed_region_ids_ref = flushed_region_ids.clone();
265+
let flushed_requests_ref = flushed_requests.clone();
235266
let (mock_engine, _) =
236267
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
237-
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
238-
flushed_region_ids_ref.write().unwrap().push(region_id);
268+
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, request| {
269+
let RegionRequest::Flush(request) = request else {
270+
panic!("Expected flush request");
271+
};
272+
flushed_requests_ref
273+
.write()
274+
.unwrap()
275+
.push((region_id, request.reason));
239276
Ok(0)
240277
}))
241278
});
242279
mock_region_server.register_test_region(region_id, mock_engine);
243280
let kv_backend = Arc::new(MemoryKvBackend::new());
244281
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
245282

246-
let flush_instruction = FlushRegions::sync_single(region_id);
283+
let flush_instruction =
284+
FlushRegions::sync_single(region_id).with_reason(RegionFlushReason::Repartition);
247285
let reply = FlushRegionsHandler
248286
.handle(&handler_context, flush_instruction)
249287
.await;
@@ -252,7 +290,10 @@ mod tests {
252290
assert_eq!(flush_reply.results.len(), 1);
253291
assert_eq!(flush_reply.results[0].0, region_id);
254292
assert!(flush_reply.results[0].1.is_ok());
255-
assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
293+
assert_eq!(
294+
*flushed_requests.read().unwrap(),
295+
vec![(region_id, Some(RegionFlushReason::Repartition))]
296+
);
256297
}
257298

258299
#[tokio::test]
@@ -333,7 +374,7 @@ mod tests {
333374
let display = format!("{}", flush_regions);
334375
assert_eq!(
335376
display,
336-
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
377+
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast, reason=None)"
337378
);
338379
}
339380
}

src/meta-srv/src/procedure/region_migration/flush_leader_region.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::any::Any;
1717
use common_procedure::{Context as ProcedureContext, Status};
1818
use serde::{Deserialize, Serialize};
1919
use snafu::OptionExt;
20+
use store_api::region_request::RegionFlushReason;
2021
use tokio::time::Instant;
2122

2223
use crate::error::{self, Result};
@@ -86,6 +87,7 @@ impl PreFlushRegion {
8687
leader,
8788
operation_timeout,
8889
utils::ErrorStrategy::Ignore,
90+
Some(RegionFlushReason::RegionMigration),
8991
)
9092
.await
9193
}

0 commit comments

Comments
 (0)