Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;

use serde::{Deserialize, Deserializer, Serialize, Serializer};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::region_request::RegionFlushReason;
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
Expand Down Expand Up @@ -338,14 +339,17 @@ pub struct FlushRegions {
/// Error handling strategy for batch operations (only applies when multiple regions and sync strategy).
#[serde(default)]
pub error_strategy: FlushErrorStrategy,
/// The source that triggered this flush.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<RegionFlushReason>,
}

impl Display for FlushRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?})",
self.region_ids, self.strategy, self.error_strategy
"FlushRegions(region_ids={:?}, strategy={:?}, error_strategy={:?}, reason={:?})",
self.region_ids, self.strategy, self.error_strategy, self.reason
)
}
}
Expand All @@ -357,6 +361,7 @@ impl FlushRegions {
region_ids: vec![region_id],
strategy: FlushStrategy::Sync,
error_strategy: FlushErrorStrategy::FailFast,
reason: None,
}
}

Expand All @@ -366,6 +371,7 @@ impl FlushRegions {
region_ids,
strategy: FlushStrategy::Async,
error_strategy: FlushErrorStrategy::TryAll,
reason: None,
}
}

Expand All @@ -375,9 +381,15 @@ impl FlushRegions {
region_ids,
strategy: FlushStrategy::Sync,
error_strategy,
reason: None,
}
}

pub fn with_reason(mut self, reason: RegionFlushReason) -> Self {
self.reason = Some(reason);
self
}

/// Check if this is a single region flush.
pub fn is_single_region(&self) -> bool {
self.region_ids.len() == 1
Expand Down Expand Up @@ -1363,6 +1375,7 @@ mod tests {
assert!(!single_sync.is_hint());
assert!(single_sync.is_sync());
assert_eq!(single_sync.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(single_sync.reason, None);
assert!(single_sync.is_single_region());
assert_eq!(single_sync.single_region_id(), Some(region_id));

Expand All @@ -1374,6 +1387,7 @@ mod tests {
assert!(batch_async.is_hint());
assert!(!batch_async.is_sync());
assert_eq!(batch_async.error_strategy, FlushErrorStrategy::TryAll);
assert_eq!(batch_async.reason, None);
assert!(!batch_async.is_single_region());
assert_eq!(batch_async.single_region_id(), None);

Expand All @@ -1384,6 +1398,10 @@ mod tests {
assert!(!batch_sync.is_hint());
assert!(batch_sync.is_sync());
assert_eq!(batch_sync.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(batch_sync.reason, None);

let with_reason = batch_sync.with_reason(RegionFlushReason::RemoteWalPrune);
assert_eq!(with_reason.reason, Some(RegionFlushReason::RemoteWalPrune));
}

#[test]
Expand All @@ -1401,6 +1419,7 @@ mod tests {
region_ids: vec![region_id],
strategy: FlushStrategy::Async,
error_strategy: FlushErrorStrategy::TryAll,
reason: None,
};
assert_eq!(flush_regions.region_ids, vec![region_id]);
assert_eq!(flush_regions.strategy, FlushStrategy::Async);
Expand Down Expand Up @@ -1450,13 +1469,40 @@ mod tests {
let instruction = Instruction::FlushRegions(flush_regions.clone());

let serialized = serde_json::to_string(&instruction).unwrap();
assert!(!serialized.contains("reason"));
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();

match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.region_ids, vec![region_id]);
assert_eq!(fr.strategy, FlushStrategy::Sync);
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(fr.reason, None);
}
_ => panic!("Expected FlushRegions instruction"),
}

let legacy = r#"{"FlushRegions":{"region_ids":[4398046511105],"strategy":"Sync","error_strategy":"FailFast"}}"#;
let deserialized: Instruction = serde_json::from_str(legacy).unwrap();
match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.region_ids, vec![region_id]);
assert_eq!(fr.strategy, FlushStrategy::Sync);
assert_eq!(fr.error_strategy, FlushErrorStrategy::FailFast);
assert_eq!(fr.reason, None);
}
_ => panic!("Expected FlushRegions instruction"),
}

let flush_regions = FlushRegions::async_batch(vec![region_id])
.with_reason(RegionFlushReason::RemoteWalPrune);
let instruction = Instruction::FlushRegions(flush_regions);
let serialized = serde_json::to_string(&instruction).unwrap();
assert!(serialized.contains(r#""reason":"RemoteWalPrune""#));
let deserialized: Instruction = serde_json::from_str(&serialized).unwrap();
match deserialized {
Instruction::FlushRegions(fr) => {
assert_eq!(fr.reason, Some(RegionFlushReason::RemoteWalPrune));
}
_ => panic!("Expected FlushRegions instruction"),
}
Expand All @@ -1479,6 +1525,7 @@ mod tests {
assert!(!fr.is_hint());
assert!(fr.is_sync());
assert_eq!(fr.error_strategy, FlushErrorStrategy::TryAll);
assert_eq!(fr.reason, None);
}
_ => panic!("Expected FlushRegions instruction"),
}
Expand Down
4 changes: 1 addition & 3 deletions src/datanode/src/heartbeat/handler/downgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ impl DowngradeRegionsHandler {
region_server_moved
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await?;

Expand Down
91 changes: 66 additions & 25 deletions src/datanode/src/heartbeat/handler/flush_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_meta::instruction::{
FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
};
use common_telemetry::{debug, warn};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::region_request::{RegionFlushReason, RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
Expand All @@ -39,14 +39,17 @@ impl InstructionHandler for FlushRegionsHandler {
let strategy = flush_regions.strategy;
let region_ids = flush_regions.region_ids;
let error_strategy = flush_regions.error_strategy;
let reason = flush_regions.reason;

let reply = if matches!(strategy, FlushStrategy::Async) {
// Asynchronous hint mode: fire-and-forget, no reply expected
ctx.handle_flush_hint(region_ids).await;
ctx.handle_flush_hint(region_ids, reason).await;
None
} else {
// Synchronous mode: return reply with results
let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
let reply = ctx
.handle_flush_sync(region_ids, error_strategy, reason)
.await;
Some(InstructionReply::FlushRegions(reply))
};

Expand All @@ -62,9 +65,14 @@ impl InstructionHandler for FlushRegionsHandler {

impl HandlerContext {
/// Performs the actual region flush operation.
async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
async fn perform_region_flush(
&self,
region_id: RegionId,
reason: Option<RegionFlushReason>,
) -> Result<()> {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
reason,
..Default::default()
});
self.region_server
.handle_request(region_id, request)
Expand All @@ -73,10 +81,14 @@ impl HandlerContext {
}

/// Handles asynchronous flush hints (fire-and-forget).
async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
async fn handle_flush_hint(
&self,
region_ids: Vec<RegionId>,
reason: Option<RegionFlushReason>,
) {
let start_time = Instant::now();
for region_id in &region_ids {
let result = self.perform_region_flush(*region_id).await;
let result = self.perform_region_flush(*region_id, reason).await;
match result {
Ok(_) => {}
Err(error::Error::RegionNotFound { .. }) => {
Expand All @@ -102,11 +114,12 @@ impl HandlerContext {
&self,
region_ids: Vec<RegionId>,
error_strategy: FlushErrorStrategy,
reason: Option<RegionFlushReason>,
) -> FlushRegionReply {
let mut results = Vec::with_capacity(region_ids.len());

for region_id in region_ids {
let result = self.flush_single_region_sync(region_id).await;
let result = self.flush_single_region_sync(region_id, reason).await;

match &result {
Ok(_) => results.push((region_id, Ok(()))),
Expand All @@ -127,7 +140,11 @@ impl HandlerContext {
}

/// Flushes a single region synchronously with proper error handling.
async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
async fn flush_single_region_sync(
&self,
region_id: RegionId,
reason: Option<RegionFlushReason>,
) -> Result<()> {
// Check if region is leader and writable
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Err(RegionNotFoundSnafu { region_id }.build());
Expand All @@ -148,7 +165,8 @@ impl HandlerContext {
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
reason,
..Default::default()
}),
)
.await?;
Expand Down Expand Up @@ -184,19 +202,27 @@ mod tests {
use super::*;
use crate::tests::{MockRegionEngine, mock_region_server};

type FlushedRequests = Arc<RwLock<Vec<(RegionId, Option<RegionFlushReason>)>>>;

#[tokio::test]
async fn test_handle_flush_region_hint() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));

let mock_region_server = mock_region_server();
let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
for region_id in &region_ids {
let flushed_region_ids_ref = flushed_region_ids.clone();
let flushed_requests_ref = flushed_requests.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn =
Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
Some(Box::new(move |region_id, request| {
let RegionRequest::Flush(request) = request else {
panic!("Expected flush request");
};
flushed_requests_ref
.write()
.unwrap()
.push((region_id, request.reason));
Ok(0)
}))
});
Expand All @@ -206,44 +232,56 @@ mod tests {
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);

// Async hint mode
let flush_instruction = FlushRegions::async_batch(region_ids.clone());
let flush_instruction = FlushRegions::async_batch(region_ids.clone())
.with_reason(RegionFlushReason::RemoteWalPrune);
let reply = FlushRegionsHandler
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none()); // Hint mode returns no reply
assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
let expected = region_ids
.iter()
.map(|region_id| (*region_id, Some(RegionFlushReason::RemoteWalPrune)))
.collect::<Vec<_>>();
assert_eq!(*flushed_requests.read().unwrap(), expected);

// Non-existent regions
flushed_region_ids.write().unwrap().clear();
flushed_requests.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
let reply = FlushRegionsHandler
.handle(&handler_context, flush_instruction)
.await;
assert!(reply.is_none());
assert!(flushed_region_ids.read().unwrap().is_empty());
assert!(flushed_requests.read().unwrap().is_empty());
}

#[tokio::test]
async fn test_handle_flush_region_sync_single() {
let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));

let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);

let flushed_region_ids_ref = flushed_region_ids.clone();
let flushed_requests_ref = flushed_requests.clone();
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
flushed_region_ids_ref.write().unwrap().push(region_id);
region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, request| {
let RegionRequest::Flush(request) = request else {
panic!("Expected flush request");
};
flushed_requests_ref
.write()
.unwrap()
.push((region_id, request.reason));
Ok(0)
}))
});
mock_region_server.register_test_region(region_id, mock_engine);
let kv_backend = Arc::new(MemoryKvBackend::new());
let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);

let flush_instruction = FlushRegions::sync_single(region_id);
let flush_instruction =
FlushRegions::sync_single(region_id).with_reason(RegionFlushReason::Repartition);
let reply = FlushRegionsHandler
.handle(&handler_context, flush_instruction)
.await;
Expand All @@ -252,7 +290,10 @@ mod tests {
assert_eq!(flush_reply.results.len(), 1);
assert_eq!(flush_reply.results[0].0, region_id);
assert!(flush_reply.results[0].1.is_ok());
assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
assert_eq!(
*flushed_requests.read().unwrap(),
vec![(region_id, Some(RegionFlushReason::Repartition))]
);
}

#[tokio::test]
Expand Down Expand Up @@ -333,7 +374,7 @@ mod tests {
let display = format!("{}", flush_regions);
assert_eq!(
display,
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
"FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast, reason=None)"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::region_request::RegionFlushReason;
use tokio::time::Instant;

use crate::error::{self, Result};
Expand Down Expand Up @@ -86,6 +87,7 @@ impl PreFlushRegion {
leader,
operation_timeout,
utils::ErrorStrategy::Ignore,
Some(RegionFlushReason::RegionMigration),
)
.await
}
Expand Down
Loading
Loading