Skip to content

Commit e66d888

Browse files
committed
feat: add log-level validation
1 parent 6580aec commit e66d888

File tree

6 files changed

+54
-38
lines changed

6 files changed

+54
-38
lines changed

.config/nextest.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[profile.default]
2-
retries = 0 # this is the default, so it doesn't need to be specified
3-
slow-timeout = "15s"
2+
retries = 0 # this is the default, so it doesn't need to be specified
3+
slow-timeout = { period = "15s", terminate-after = 4 }
44

55

66
[profile.ci]

duva/src/domains/cluster_actors/actor.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,20 +385,42 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
385385
if self.find_replica_mut(&request_vote.candidate_id).is_none() {
386386
return;
387387
};
388-
REQUESTS_BLOCKED_BY_ELECTION.store(true, Ordering::Relaxed);
389388

390-
let grant_vote = self.logger().last_log_index <= request_vote.last_log_index
391-
&& self.replication.become_follower_if_term_higher_and_votable(
392-
&request_vote.candidate_id,
393-
request_vote.term,
389+
let mut grant_vote = false;
390+
391+
// 1. If candidate's term is less than current term, reject
392+
if request_vote.term < self.replication.term {
393+
info!(
394+
"Rejecting vote for {} (term {}). My term is higher ({}).",
395+
request_vote.candidate_id, request_vote.term, self.replication.term
394396
);
397+
}
398+
// 2. If candidate's term is greater than or equal to current term
399+
else {
400+
// If candidate's term is higher, update own term and step down
401+
if request_vote.term > self.replication.term {
402+
self.replication.term = request_vote.term;
403+
self.replication.vote_for(None); // Clear votedFor
404+
self.step_down().await; // Ensure follower mode
405+
}
406+
407+
// 3. Check if log is up-to-date and if not already voted in this term or voted for this candidate
408+
if self
409+
.replication
410+
.is_log_up_to_date(request_vote.last_log_index, request_vote.last_log_term)
411+
&& self.replication.election_state.is_votable(&request_vote.candidate_id)
412+
{
413+
self.replication.vote_for(Some(request_vote.candidate_id.clone()));
414+
grant_vote = true;
415+
}
416+
REQUESTS_BLOCKED_BY_ELECTION.store(true, Ordering::Relaxed);
417+
}
395418

396419
info!(
397420
"Voting for {} with term {} and granted: {grant_vote}",
398421
request_vote.candidate_id, request_vote.term
399422
);
400423

401-
// ! vote may have been made for requester, maybe not. Therefore, we have to set the term for self
402424
let vote = ElectionVote { term: self.replication.term, vote_granted: grant_vote };
403425
let Some(peer) = self.find_replica_mut(&request_vote.candidate_id) else {
404426
return;
@@ -1041,7 +1063,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
10411063
/// 2) step down operation is given from user
10421064
async fn step_down(&mut self) {
10431065
self.replication.vote_for(None);
1044-
self.heartbeat_scheduler.turn_follower_mode().await;
1066+
self.heartbeat_scheduler.turn_follower_mode();
10451067
}
10461068

10471069
async fn become_leader(&mut self) {

duva/src/domains/cluster_actors/actor/heartbeat_scheduler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl HeartBeatScheduler {
124124
self.controller = controller;
125125
}
126126

127-
pub(crate) async fn turn_follower_mode(&mut self) {
127+
pub(crate) fn turn_follower_mode(&mut self) {
128128
let controller = match self.controller.take() {
129129
| Some(SchedulerMode::Leader(sender)) => {
130130
sender.send(());

duva/src/domains/cluster_actors/actor/tests/elections.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ async fn test_vote_election_deny_vote_older_log() {
131131

132132
assert_expected_queryio(
133133
&candidate_fake_buf,
134-
ElectionVote { term: initial_term, vote_granted: false },
134+
ElectionVote { term: initial_term + 1, vote_granted: false },
135135
)
136136
.await;
137137
}

duva/src/domains/cluster_actors/replication.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::consensus::election::ElectionState;
22

3+
use crate::domains::operation_logs::interfaces::TWriteAheadLog;
34
use crate::domains::operation_logs::logger::ReplicatedLogs;
45
use crate::domains::peers::command::BannedPeer;
56
use crate::domains::peers::command::HeartBeat;
@@ -25,7 +26,7 @@ pub(crate) struct ReplicationState<T> {
2526
pub(crate) last_applied: u64,
2627
}
2728

28-
impl<T> ReplicationState<T> {
29+
impl<T: TWriteAheadLog> ReplicationState<T> {
2930
pub(crate) fn new(
3031
replid: ReplicationId,
3132
role: ReplicationRole,
@@ -91,31 +92,6 @@ impl<T> ReplicationState<T> {
9192
}
9293
}
9394

94-
pub(super) fn become_follower_if_term_higher_and_votable(
95-
&mut self,
96-
candidate_id: &PeerIdentifier,
97-
election_term: u64,
98-
) -> bool {
99-
// If the candidate's term is less than mine -> reject
100-
if election_term < self.term {
101-
return false;
102-
}
103-
104-
// When a node sees a higher term, it must forget any vote it cast in a prior term, because:
105-
if election_term > self.term {
106-
self.term = election_term;
107-
self.vote_for(None);
108-
}
109-
110-
if !self.election_state.is_votable(candidate_id) {
111-
return false;
112-
}
113-
114-
self.vote_for(Some(candidate_id.clone()));
115-
116-
true
117-
}
118-
11995
pub(super) fn vote_for(&mut self, leader_id: Option<PeerIdentifier>) {
12096
self.election_state = ElectionState::Follower { voted_for: leader_id };
12197
self.role = ReplicationRole::Follower;
@@ -124,6 +100,20 @@ impl<T> ReplicationState<T> {
124100
pub(crate) fn is_leader(&self) -> bool {
125101
self.role == ReplicationRole::Leader
126102
}
103+
104+
pub(super) fn is_log_up_to_date(
105+
&self,
106+
candidate_last_log_index: u64,
107+
candidate_last_log_term: u64,
108+
) -> bool {
109+
if candidate_last_log_term > self.logger.last_log_term {
110+
true
111+
} else if candidate_last_log_term == self.logger.last_log_term {
112+
candidate_last_log_index >= self.logger.last_log_index
113+
} else {
114+
false
115+
}
116+
}
127117
}
128118

129119
pub(crate) fn time_in_secs() -> anyhow::Result<u64> {

duva/tests/replication_ops/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@ fn panic_if_election_not_done(order: &str, port1: u16, port2: u16, num_possible_
1212
let mut first_election_cnt = 0;
1313
let mut flag = false;
1414
let mut h1 = Client::new(port1);
15+
let mut h2 = Client::new(port2);
1516

1617
let start = std::time::Instant::now();
1718
while first_election_cnt < 50 {
18-
let res = h1.send_and_get_vec("role", num_possible_nodes);
19+
let mut res = h1.send_and_get_vec("role", num_possible_nodes);
20+
if res.is_empty() {
21+
res = h2.send_and_get_vec("role", num_possible_nodes);
22+
}
1923
println!(
2024
"[{}ms] Poll {}: port1={} port2={} res={:?}",
2125
start.elapsed().as_millis(),

0 commit comments

Comments
 (0)