Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ chrono.workspace = true
client = { workspace = true, features = ["testing"] }
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test.workspace = true
common-wal = { workspace = true, features = ["testing"] }
session.workspace = true
tracing = "0.1"
tracing-subscriber.workspace = true
62 changes: 57 additions & 5 deletions src/meta-srv/src/procedure/region_migration/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,35 @@ impl RegionMigrationManager {
ensure!(
leader_peer.id == task.from_peer.id,
error::InvalidArgumentsSnafu {
err_msg: "Invalid region migration `from_peer` argument"
err_msg: format!(
"Region's leader peer({}) is not the `from_peer`({}), region: {}",
leader_peer.id, task.from_peer.id, task.region_id
),
}
);

Ok(())
}

/// Throws an error if `to_peer` is already has a region follower.
fn verify_region_follower_peers(
&self,
region_route: &RegionRoute,
task: &RegionMigrationProcedureTask,
) -> Result<()> {
ensure!(
!region_route.follower_peers.contains(&task.to_peer),
error::InvalidArgumentsSnafu {
err_msg: format!(
"The `to_peer`({}) is already has a region follower, region: {}",
task.to_peer.id, task.region_id
),
},
);

Ok(())
}

/// Submits a new region migration procedure.
pub async fn submit_procedure(
&self,
Expand Down Expand Up @@ -308,7 +330,7 @@ impl RegionMigrationManager {
}

self.verify_region_leader_peer(&region_route, &task)?;

self.verify_region_follower_peers(&region_route, &task)?;
let table_info = self.retrieve_table_info(region_id).await?;
let TableName {
catalog_name,
Expand Down Expand Up @@ -486,9 +508,39 @@ mod test {

let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::InvalidArguments { .. });
assert!(err
.to_string()
.contains("Invalid region migration `from_peer` argument"));
assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
}

#[tokio::test]
async fn test_submit_procedure_region_follower_on_to_peer() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationProcedureTask {
region_id,
from_peer: Peer::empty(3),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
};

let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![Peer::empty(2)],
..Default::default()
}];

env.create_physical_table_metadata(table_info, region_routes)
.await;

let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::InvalidArguments { .. });
assert_eq!(
err.to_string(),
"Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
);
}

#[tokio::test]
Expand Down
Loading