Skip to content

Commit 1e394af

Browse files
authored
feat: prevent migrating a leader region to a peer that already has a region follower (#5923)
* feat: prevent migrating a leader region to a peer that already has a region follower * chore: refine err msg
1 parent a9065f5 commit 1e394af

2 files changed

Lines changed: 58 additions & 5 deletions

File tree

src/meta-srv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ chrono.workspace = true
8383
client = { workspace = true, features = ["testing"] }
8484
common-meta = { workspace = true, features = ["testing"] }
8585
common-procedure-test.workspace = true
86+
common-wal = { workspace = true, features = ["testing"] }
8687
session.workspace = true
8788
tracing = "0.1"
8889
tracing-subscriber.workspace = true

src/meta-srv/src/procedure/region_migration/manager.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,13 +268,35 @@ impl RegionMigrationManager {
268268
ensure!(
269269
leader_peer.id == task.from_peer.id,
270270
error::InvalidArgumentsSnafu {
271-
err_msg: "Invalid region migration `from_peer` argument"
271+
err_msg: format!(
272+
"Region's leader peer({}) is not the `from_peer`({}), region: {}",
273+
leader_peer.id, task.from_peer.id, task.region_id
274+
),
272275
}
273276
);
274277

275278
Ok(())
276279
}
277280

281+
/// Throws an error if `to_peer` is already has a region follower.
282+
fn verify_region_follower_peers(
283+
&self,
284+
region_route: &RegionRoute,
285+
task: &RegionMigrationProcedureTask,
286+
) -> Result<()> {
287+
ensure!(
288+
!region_route.follower_peers.contains(&task.to_peer),
289+
error::InvalidArgumentsSnafu {
290+
err_msg: format!(
291+
"The `to_peer`({}) is already has a region follower, region: {}",
292+
task.to_peer.id, task.region_id
293+
),
294+
},
295+
);
296+
297+
Ok(())
298+
}
299+
278300
/// Submits a new region migration procedure.
279301
pub async fn submit_procedure(
280302
&self,
@@ -308,7 +330,7 @@ impl RegionMigrationManager {
308330
}
309331

310332
self.verify_region_leader_peer(&region_route, &task)?;
311-
333+
self.verify_region_follower_peers(&region_route, &task)?;
312334
let table_info = self.retrieve_table_info(region_id).await?;
313335
let TableName {
314336
catalog_name,
@@ -486,9 +508,39 @@ mod test {
486508

487509
let err = manager.submit_procedure(task).await.unwrap_err();
488510
assert_matches!(err, error::Error::InvalidArguments { .. });
489-
assert!(err
490-
.to_string()
491-
.contains("Invalid region migration `from_peer` argument"));
511+
assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
512+
}
513+
514+
#[tokio::test]
515+
async fn test_submit_procedure_region_follower_on_to_peer() {
516+
let env = TestingEnv::new();
517+
let context_factory = env.context_factory();
518+
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
519+
let region_id = RegionId::new(1024, 1);
520+
let task = RegionMigrationProcedureTask {
521+
region_id,
522+
from_peer: Peer::empty(3),
523+
to_peer: Peer::empty(2),
524+
timeout: Duration::from_millis(1000),
525+
};
526+
527+
let table_info = new_test_table_info(1024, vec![1]).into();
528+
let region_routes = vec![RegionRoute {
529+
region: Region::new_test(region_id),
530+
leader_peer: Some(Peer::empty(3)),
531+
follower_peers: vec![Peer::empty(2)],
532+
..Default::default()
533+
}];
534+
535+
env.create_physical_table_metadata(table_info, region_routes)
536+
.await;
537+
538+
let err = manager.submit_procedure(task).await.unwrap_err();
539+
assert_matches!(err, error::Error::InvalidArguments { .. });
540+
assert_eq!(
541+
err.to_string(),
542+
"Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
543+
);
492544
}
493545

494546
#[tokio::test]

0 commit comments

Comments
 (0)