Skip to content

Commit 47691cf

Browse files
author
Adar Ovadia
committed
Core: az affinity strategy
Signed-off-by: Adar Ovadia <adarov@amazon.com>
1 parent c15c76f commit 47691cf

7 files changed

Lines changed: 40 additions & 40 deletions

File tree

.github/workflows/rust.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ jobs:
9999

100100
- name: Run tests
101101
working-directory: ./glide-core
102-
# TODO remove the concurrency limit after we fix test flakyness.
103-
run: cargo test --all-features --release -- --test-threads=1
102+
run: cargo test --all-features --release -- --test-threads=1 # TODO remove the concurrency limit after we fix test flakyness.
104103

105104
- name: Run logger tests
106105
working-directory: ./logger_core

glide-core/redis-rs/redis/src/cluster_async/connections_container.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use dashmap::DashMap;
66
use futures::FutureExt;
77
use rand::seq::IteratorRandom;
88
use std::net::IpAddr;
9+
use std::sync::atomic::Ordering;
910
use telemetrylib::Telemetry;
1011

1112
/// Count the number of connections in a connections_map object
@@ -23,12 +24,12 @@ macro_rules! count_connections {
2324
}};
2425
}
2526

26-
/// A struct that encapsulates a network connection along with its associated IP address.
27+
/// A struct that encapsulates a network connection along with its associated IP address and AZ.
2728
#[derive(Clone, Eq, PartialEq, Debug)]
2829
pub struct ConnectionDetails<Connection> {
2930
/// The actual connection
3031
pub conn: Connection,
31-
/// The IP associated with the connection
32+
/// The IP and AZ associated with the connection
3233
pub ip: Option<IpAddr>,
3334
/// The AZ associated with the connection
3435
pub az: Option<String>,
@@ -209,9 +210,7 @@ where
209210
slot_map_value: &SlotMapValue,
210211
) -> Option<ConnectionAndAddress<Connection>> {
211212
let addrs = &slot_map_value.addrs;
212-
let initial_index = slot_map_value
213-
.latest_used_replica
214-
.load(std::sync::atomic::Ordering::Relaxed);
213+
let initial_index = slot_map_value.latest_used_replica.load(Ordering::Relaxed);
215214
let mut check_count = 0;
216215
loop {
217216
check_count += 1;
@@ -225,8 +224,8 @@ where
225224
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
226225
initial_index,
227226
index,
228-
std::sync::atomic::Ordering::Relaxed,
229-
std::sync::atomic::Ordering::Relaxed,
227+
Ordering::Relaxed,
228+
Ordering::Relaxed,
230229
);
231230
return Some(connection);
232231
}
@@ -239,16 +238,13 @@ where
239238
client_az: String,
240239
) -> Option<ConnectionAndAddress<Connection>> {
241240
let addrs = &slot_map_value.addrs;
242-
let initial_index = slot_map_value
243-
.latest_used_replica
244-
.load(std::sync::atomic::Ordering::Relaxed);
245-
let mut check_count = 0;
241+
let initial_index = slot_map_value.latest_used_replica.load(Ordering::Relaxed);
242+
let mut retries = 0usize;
246243

247244
loop {
248-
check_count += 1;
249-
245+
retries = retries.saturating_add(1);
250246
// Looped through all replicas; no connected replica found in the same AZ.
251-
if check_count > addrs.replicas.len() {
247+
if retries > addrs.replicas.len() {
252248
// Attempt a fallback to any available replica in other AZs.
253249
for replica in &addrs.replicas {
254250
if let Some(connection) = self.connection_for_address(replica.as_str()) {
@@ -260,7 +256,7 @@ where
260256
}
261257

262258
// Calculate index based on initial index and check count.
263-
let index = (initial_index + check_count) % addrs.replicas.len();
259+
let index = (initial_index + retries) % addrs.replicas.len();
264260
let replica = &addrs.replicas[index];
265261

266262
// Check if this replica’s AZ matches the user’s AZ.
@@ -272,8 +268,8 @@ where
272268
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
273269
initial_index,
274270
index,
275-
std::sync::atomic::Ordering::Relaxed,
276-
std::sync::atomic::Ordering::Relaxed,
271+
Ordering::Relaxed,
272+
Ordering::Relaxed,
277273
);
278274
return Some((address, connection_details.conn));
279275
}

glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use super::{
22
connections_container::{ClusterNode, ConnectionDetails},
33
Connect,
44
};
5-
use crate::cmd;
65
use crate::FromRedisValue;
76
use crate::InfoDict;
87
use crate::{
@@ -12,6 +11,7 @@ use crate::{
1211
cluster_client::ClusterParams,
1312
ErrorKind, RedisError, RedisResult,
1413
};
14+
use crate::{cluster_slotmap::ReadFromReplicaStrategy, cmd};
1515
use std::net::SocketAddr;
1616

1717
use futures::prelude::*;
@@ -348,8 +348,8 @@ async fn setup_user_connection<C>(
348348
where
349349
C: ConnectionLike + Connect + Send + 'static,
350350
{
351-
let read_from_replicas = params.read_from_replicas
352-
!= crate::cluster_slotmap::ReadFromReplicaStrategy::AlwaysFromPrimary;
351+
let read_from_replicas =
352+
params.read_from_replicas != ReadFromReplicaStrategy::AlwaysFromPrimary;
353353
let connection_timeout = params.connection_timeout;
354354
check_connection(&mut conn_details.conn, connection_timeout).await?;
355355
if read_from_replicas {
@@ -359,7 +359,12 @@ where
359359
.await?;
360360
}
361361

362-
update_az_from_info(conn_details).await?;
362+
if matches!(
363+
params.read_from_replicas,
364+
ReadFromReplicaStrategy::AZAffinity(_)
365+
) {
366+
update_az_from_info(conn_details).await?;
367+
}
363368
Ok(())
364369
}
365370

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2051,7 +2051,7 @@ where
20512051
.conn_lock
20522052
.read()
20532053
.expect(MUTEX_READ_ERR)
2054-
.connection_for_route_with_params(&route, Some(core.cluster_params.clone()))
2054+
.connection_for_route(&route)
20552055
{
20562056
ConnectionCheck::Found((conn, address))
20572057
} else {

glide-core/redis-rs/redis/src/cluster_client.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -387,17 +387,16 @@ impl ClusterClientBuilder {
387387
self
388388
}
389389

390-
/// Sets the read strategy on all new connections, based on the specified policy.
390+
/// Set the read strategy for this client.
391391
///
392-
/// Using the specified `read_strategy`, this function configures whether read queries will be
393-
/// routed to replica nodes or primary nodes. If `ReadFromReplicaStrategy::AZAffinity` is set,
394-
/// read requests will first attempt to access replicas in the same availability zone, falling
395-
/// back to other replicas or the primary if needed. If `ReadFromReplicaStrategy::RoundRobin` is chosen, reads are distributed
396-
/// across replicas for load balancing, while `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries
397-
/// are directed to the primary node.
392+
/// The parameter `read_strategy` can be one of:
393+
/// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone.
394+
/// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed.
395+
/// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed.
396+
/// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node.
398397
///
399398
/// # Parameters
400-
/// - `read_strategy`: Defines the replica routing strategy.
399+
/// - `read_strategy`: defines the replica routing strategy.
401400
pub fn read_from(mut self, read_strategy: ReadFromReplicaStrategy) -> ClusterClientBuilder {
402401
self.builder_params.read_from_replicas = read_strategy;
403402
self

glide-core/redis-rs/redis/src/cluster_slotmap.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ impl SlotMapValue {
2424
}
2525

2626
#[derive(Debug, Default, Clone, PartialEq)]
27-
/** Represents the client's read from strategy. */
27+
/// Represents the client's read from strategy.
2828
pub enum ReadFromReplicaStrategy {
2929
#[default]
30-
/** Always get from primary, in order to get the freshest data.*/
30+
/// Always get from primary, in order to get the freshest data.
3131
AlwaysFromPrimary,
32-
/** Spread the read requests between all replicas in a round robin manner.
33-
If no replica is available, route the requests to the primary.*/
32+
/// Spread the read requests between all replicas in a round robin manner.
33+
/// If no replica is available, route the requests to the primary.
3434
RoundRobin,
35-
/** Spread the read requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner,
36-
falling back to other replicas or the primary if needed.*/
35+
/// Spread the read requests between replicas in the same client's AZ (Aviliablity zone) in a round robin manner,
36+
/// falling back to other replicas or the primary if needed.
3737
AZAffinity(String),
3838
}
3939

@@ -60,7 +60,7 @@ fn get_address_from_slot(
6060
% slot.addrs.replicas.len();
6161
slot.addrs.replicas[index].as_str()
6262
}
63-
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // todo thrrow exception for sync client
63+
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client
6464
}
6565
}
6666

glide-core/redis-rs/redis/tests/test_cluster_async.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ mod cluster_async {
206206
#[cfg(feature = "valkey-gte-7-2")]
207207
#[tokio::test]
208208
async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_all_replicas() {
209-
let replica_num: u16 = 3;
209+
let replica_num: u16 = 4;
210210
let primaries_num: u16 = 3;
211211
let cluster =
212212
TestClusterContext::new((replica_num * primaries_num) + primaries_num, replica_num);
@@ -260,6 +260,7 @@ mod cluster_async {
260260
.unwrap();
261261

262262
let info_result = redis::from_owned_redis_value::<HashMap<String, String>>(info).unwrap();
263+
println!("{:?}", info_result);
263264
let get_cmdstat = format!("cmdstat_get:calls={}", n);
264265
let client_az = format!("availability_zone:{}", az);
265266

0 commit comments

Comments
 (0)