From 6b58c7caa86096eea40bf2910024d5be873ec694 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 17 Apr 2025 11:24:08 +0000 Subject: [PATCH 1/2] feat: prevent migrating a leader region to a peer that already has a region follower --- src/meta-srv/Cargo.toml | 1 + .../src/procedure/region_migration/manager.rs | 49 ++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 15830a4f0520..0e0f3e5cde9a 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -83,6 +83,7 @@ chrono.workspace = true client = { workspace = true, features = ["testing"] } common-meta = { workspace = true, features = ["testing"] } common-procedure-test.workspace = true +common-wal = { workspace = true, features = ["testing"] } session.workspace = true tracing = "0.1" tracing-subscriber.workspace = true diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index e2345559d081..55cc36976448 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -275,6 +275,22 @@ impl RegionMigrationManager { Ok(()) } + /// Throws an error if `to_peer` is already has a region follower. + fn verify_region_follower_peers( + &self, + region_route: &RegionRoute, + task: &RegionMigrationProcedureTask, + ) -> Result<()> { + ensure!( + !region_route.follower_peers.contains(&task.to_peer), + error::InvalidArgumentsSnafu { + err_msg: "`to_peer` is already has a region follower", + }, + ); + + Ok(()) + } + /// Submits a new region migration procedure. pub async fn submit_procedure( &self, @@ -308,7 +324,7 @@ impl RegionMigrationManager { } self.verify_region_leader_peer(®ion_route, &task)?; - + self.verify_region_follower_peers(®ion_route, &task)?; let table_info = self.retrieve_table_info(region_id).await?; let TableName { catalog_name, @@ -491,6 +507,37 @@ mod test { .contains("Invalid region migration `from_peer` argument")); } + #[tokio::test] + async fn test_submit_procedure_region_follower_on_to_peer() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + region_id, + from_peer: Peer::empty(3), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![Peer::empty(2)], + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::InvalidArguments { .. }); + assert!(err + .to_string() + .contains("`to_peer` is already has a region follower")); + } + #[tokio::test] async fn test_submit_procedure_has_migrated() { common_telemetry::init_default_ut_logging(); From 798583feb9196f2af7235aa7bf3152bd223bc92b Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 18 Apr 2025 05:12:26 +0000 Subject: [PATCH 2/2] chore: refine err msg --- .../src/procedure/region_migration/manager.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 55cc36976448..417881f54928 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -268,7 +268,10 @@ impl RegionMigrationManager { ensure!( leader_peer.id == task.from_peer.id, error::InvalidArgumentsSnafu { - err_msg: "Invalid region migration `from_peer` argument" + err_msg: format!( + "Region's leader peer({}) is not the `from_peer`({}), region: {}", + leader_peer.id, task.from_peer.id, task.region_id + ), } ); @@ -284,7 +287,10 @@ impl RegionMigrationManager { ensure!( !region_route.follower_peers.contains(&task.to_peer), error::InvalidArgumentsSnafu { - err_msg: "`to_peer` is already has a region follower", + err_msg: format!( + "The `to_peer`({}) is already has a region follower, region: {}", + task.to_peer.id, task.region_id + ), }, ); @@ -502,9 +508,7 @@ mod test { let err = manager.submit_procedure(task).await.unwrap_err(); assert_matches!(err, error::Error::InvalidArguments { .. }); - assert!(err - .to_string() - .contains("Invalid region migration `from_peer` argument")); + assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"); } #[tokio::test] @@ -533,9 +537,10 @@ mod test { let err = manager.submit_procedure(task).await.unwrap_err(); assert_matches!(err, error::Error::InvalidArguments { .. }); - assert!(err - .to_string() - .contains("`to_peer` is already has a region follower")); + assert_eq!( + err.to_string(), + "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)" + ); } #[tokio::test]