Skip to content

Commit 0621529

Browse files
authored
Merge pull request #854 from Migorithm/fix/election-failure
fix: election failure due to scheduler synchronization
2 parents b186a65 + 121436b commit 0621529

File tree

7 files changed

+30
-20
lines changed

7 files changed

+30
-20
lines changed

duva/src/domains/cluster_actors/actor.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,10 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
363363

364364
#[instrument(level = tracing::Level::DEBUG, skip(self))]
365365
pub(crate) async fn send_rpc(&mut self) {
366+
if !self.replication.is_leader() {
367+
return;
368+
}
369+
366370
if self.replicas().count() == 0 {
367371
return;
368372
}
@@ -409,6 +413,9 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
409413
from: &PeerIdentifier,
410414
repl_res: ReplicationAck,
411415
) {
416+
if !self.replication.is_leader() {
417+
return;
418+
}
412419
let Some(peer) = self.members.get_mut(from) else {
413420
return;
414421
};
@@ -426,6 +433,9 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
426433

427434
#[instrument(level = tracing::Level::DEBUG, skip(self, heartbeat), fields(peer_id = %heartbeat.from))]
428435
pub(crate) async fn append_entries_rpc(&mut self, heartbeat: HeartBeat) {
436+
if self.replication.is_leader() {
437+
return;
438+
}
429439
if self.check_term_outdated(&heartbeat).await {
430440
err!("Term Outdated received:{} self:{}", heartbeat.term, self.replication.term);
431441
return;
@@ -875,7 +885,8 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
875885
}
876886

877887
async fn replicate(&mut self, mut heartbeat: HeartBeat) {
878-
if self.replication.is_leader() {
888+
if heartbeat.leader_commit_idx.is_none() {
889+
err!("It must have leader commit index!");
879890
return;
880891
}
881892

@@ -951,15 +962,17 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
951962
// ! Term mismatch -> triggers log truncation
952963
err!("Term mismatch: {} != {}", prev_entry.term, prev_log_term);
953964
self.replication.logger.truncate_after(prev_log_index);
954-
955-
return Err(RejectionReason::LogInconsistency);
956965
}
957966

958967
Ok(())
959968
}
960969

961970
#[instrument(level = tracing::Level::INFO, skip(self))]
962971
pub(crate) async fn run_for_election(&mut self) {
972+
if self.replication.is_leader() {
973+
return;
974+
}
975+
963976
self.become_candidate();
964977
let request_vote = RequestVote::new(self.replication.info(), &self.replication.logger);
965978

duva/src/domains/cluster_actors/actor/tests/replications.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ async fn follower_truncates_log_on_term_mismatch() {
374374
let result = cluster_actor.replicate_log_entries(&mut heartbeat).await;
375375

376376
// THEN: Expect truncation and rejection
377-
assert_eq!(cluster_actor.replication.logger.target.writer.len(), 1);
378-
assert!(result.is_err(), "Should reject due to term mismatch");
377+
assert_eq!(cluster_actor.replication.logger.target.writer.len(), 2);
378+
assert!(result.is_ok(), "Should pass as follower will just truncate and write");
379379
}
380380

381381
#[tokio::test]

duva/src/domains/cluster_actors/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
3434
Ok(self)
3535
}
3636

37+
// ! Beware scheduler is a separate task that asychrnously sends message, therefore leadership change in between may not be observed.
3738
async fn process_scheduler_message(&mut self, msg: SchedulerMessage) {
3839
use SchedulerMessage::*;
3940
match msg {

duva/tests/replication_ops/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ mod test_raft_happy_case;
88
mod test_set_twice_after_election;
99
mod test_sync;
1010

11-
fn panic_if_election_not_done(port1: u16, port2: u16) {
11+
fn panic_if_election_not_done(order: &str, port1: u16, port2: u16, num_possible_nodes: u32) {
1212
let mut first_election_cnt = 0;
1313
let mut flag = false;
1414
let mut h1 = Client::new(port1);
15-
h1.read_timeout = Duration::from_secs(4);
1615

1716
let start = std::time::Instant::now();
1817
while first_election_cnt < 50 {
19-
let res = h1.send_and_get_vec("role", 2);
18+
let res = h1.send_and_get_vec("role", num_possible_nodes);
2019
println!(
2120
"[{}ms] Poll {}: port1={} port2={} res={:?}",
2221
start.elapsed().as_millis(),
@@ -43,5 +42,5 @@ fn panic_if_election_not_done(port1: u16, port2: u16) {
4342
port2
4443
);
4544
}
46-
assert!(flag, "first election fail");
45+
assert!(flag, "{order} election fail");
4746
}

duva/tests/replication_ops/test_leader_election.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ fn run_leader_election(with_append_only: bool) -> anyhow::Result<()> {
1616
drop(leader_p);
1717

1818
// THEN
19-
panic_if_election_not_done(follower_p1.port, follower_p2.port);
19+
panic_if_election_not_done("first", follower_p1.port, follower_p2.port, 3);
2020

2121
Ok(())
2222
}

duva/tests/replication_ops/test_leader_election_twice.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,22 @@ use crate::{
66
/// following test is to see if election works even after the first election.
77
fn run_leader_election_twice(with_append_only: bool) -> anyhow::Result<()> {
88
// GIVEN
9-
let server_env = ServerEnv::default().with_append_only(with_append_only);
10-
let server_env = server_env;
11-
let mut leader_env = server_env;
9+
let mut leader_env = ServerEnv::default().with_append_only(with_append_only);
1210
let mut follower_env1 = ServerEnv::default().with_append_only(with_append_only);
1311
let mut follower_env2 = ServerEnv::default().with_append_only(with_append_only);
1412

1513
let [leader_p, follower_p1, follower_p2] =
1614
form_cluster([&mut leader_env, &mut follower_env1, &mut follower_env2]);
1715

18-
// !first leader is killed -> election happens
19-
16+
// WHEN: The first leader is killed, triggering an election between F1 and F2
2017
drop(leader_p);
21-
panic_if_election_not_done(follower_p1.port, follower_p2.port);
18+
panic_if_election_not_done("first", follower_p1.port, follower_p2.port, 3);
2219

2320
let mut processes = vec![];
2421
for f in [follower_p1, follower_p2] {
2522
let mut handler = Client::new(f.port);
26-
let res = handler.send_and_get_vec("info replication", 4);
27-
if !res.contains(&"role:leader".to_string()) {
23+
let res = handler.send_and_get_vec("role", 3);
24+
if !res.contains(&format!("127.0.0.1:{}:{}", f.port, "leader")) {
2825
processes.push(f);
2926
continue;
3027
}
@@ -41,7 +38,7 @@ fn run_leader_election_twice(with_append_only: bool) -> anyhow::Result<()> {
4138
}
4239
assert_eq!(processes.len(), 2);
4340

44-
panic_if_election_not_done(processes[0].port, processes[1].port);
41+
panic_if_election_not_done("second", processes[0].port, processes[1].port, 3);
4542

4643
Ok(())
4744
}

duva/tests/replication_ops/test_set_twice_after_election.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fn run_set_twice_after_election(with_append_only: bool) -> anyhow::Result<()> {
2828
let mut h1 = Client::new(follower_p1.port);
2929
let mut h2 = Client::new(follower_p2.port);
3030

31-
panic_if_election_not_done(follower_p1.port, follower_p2.port);
31+
panic_if_election_not_done("first", follower_p1.port, follower_p2.port, 3);
3232

3333
let res = h1.send_and_get_vec("role", 3);
3434

0 commit comments

Comments
 (0)