Skip to content

Commit 9d5e64d

Browse files
author
Adar Ovadia
committed
redis-core: add az awareness to read strategy
Signed-off-by: Adar Ovadia <adarov@amazon.com>
1 parent c4eb769 commit 9d5e64d

File tree

16 files changed

+221
-119
lines changed

16 files changed

+221
-119
lines changed

glide-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ authors = ["Valkey GLIDE Maintainers"]
99

1010
[dependencies]
1111
bytes = "1"
12+
copstr = "0"
1213
futures = "^0.3"
1314
redis = { path = "./redis-rs/redis", features = [
1415
"aio",

glide-core/redis-rs/redis/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ bench = false
2222
# These two are generally really common simple dependencies so it does not seem
2323
# much of a point to optimize these, but these could in theory be removed for
2424
# an indirection through std::Formatter.
25+
copstr = "0"
2526
ryu = "1.0"
2627
itoa = "1.0"
2728

glide-core/redis-rs/redis/src/cluster.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,10 @@ where
229229
) -> RedisResult<Self> {
230230
let connection = Self {
231231
connections: RefCell::new(HashMap::new()),
232-
slots: RefCell::new(SlotMap::new(vec![], cluster_params.read_from_replicas)),
232+
slots: RefCell::new(SlotMap::new(
233+
vec![],
234+
cluster_params.read_from_replicas.clone(),
235+
)),
233236
auto_reconnect: RefCell::new(true),
234237
cluster_params,
235238
read_timeout: RefCell::new(None),
@@ -387,7 +390,7 @@ where
387390
"can't parse node address",
388391
)))?;
389392
match parse_and_count_slots(&value, self.cluster_params.tls, addr).map(|slots_data| {
390-
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas)
393+
SlotMap::new(slots_data.1, self.cluster_params.read_from_replicas.clone())
391394
}) {
392395
Ok(new_slots) => {
393396
result = Ok(new_slots);

glide-core/redis-rs/redis/src/cluster_async/connections_container.rs

Lines changed: 51 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::cluster_async::{ClusterParams, ConnectionFuture};
1+
use crate::cluster_async::ConnectionFuture;
22
use crate::cluster_routing::{Route, SlotAddr};
33
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
44
use crate::cluster_topology::TopologyHash;
@@ -233,10 +233,10 @@ where
233233
}
234234
}
235235

236-
fn round_robin_read_from_az_awareness_replica(
236+
pub(crate) fn round_robin_read_from_az_awareness_replica(
237237
&self,
238238
slot_map_value: &SlotMapValue,
239-
user_az: String,
239+
client_az: String,
240240
) -> Option<ConnectionAndAddress<Connection>> {
241241
let addrs = &slot_map_value.addrs;
242242
let initial_index = slot_map_value
@@ -266,7 +266,7 @@ where
266266
// Check if this replica’s AZ matches the user’s AZ.
267267
if let Some(connection_details) = self.connection_details_for_address(replica.as_str())
268268
{
269-
if connection_details.1.az.as_deref() == Some(&user_az) {
269+
if connection_details.1.az.as_deref() == Some(&client_az) {
270270
// Attempt to update `latest_used_replica` with the index of this replica.
271271
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
272272
initial_index,
@@ -280,42 +280,33 @@ where
280280
}
281281
}
282282

283-
fn lookup_route(
284-
&self,
285-
route: &Route,
286-
cluster_params: &Option<ClusterParams>,
287-
) -> Option<ConnectionAndAddress<Connection>> {
283+
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
288284
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
289285
let addrs = &slot_map_value.addrs;
290286
if addrs.replicas.is_empty() {
291287
return self.connection_for_address(addrs.primary.as_str());
292288
}
293289

294-
//
295290
match route.slot_addr() {
296291
// Master strategy will be in use when the command is not read_only
297292
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
298293
// ReplicaOptional strategy will be in use when the command is read_only
299-
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
294+
SlotAddr::ReplicaOptional => match &self.read_from_replica_strategy {
300295
ReadFromReplicaStrategy::AlwaysFromPrimary => {
301296
self.connection_for_address(addrs.primary.as_str())
302297
}
303298
ReadFromReplicaStrategy::RoundRobin => {
304299
self.round_robin_read_from_replica(slot_map_value)
305300
}
306-
ReadFromReplicaStrategy::AZAffinity => self
307-
.round_robin_read_from_az_awareness_replica(
308-
slot_map_value,
309-
cluster_params.as_ref().unwrap().client_az.clone()?,
310-
),
301+
ReadFromReplicaStrategy::AZAffinity(az) => {
302+
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
303+
}
311304
},
312305
// when the user strategy per command is replica_preffered
313-
SlotAddr::ReplicaRequired => match self.read_from_replica_strategy {
314-
ReadFromReplicaStrategy::AZAffinity => self
315-
.round_robin_read_from_az_awareness_replica(
316-
slot_map_value,
317-
cluster_params.as_ref().unwrap().client_az.clone()?,
318-
),
306+
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
307+
ReadFromReplicaStrategy::AZAffinity(az) => {
308+
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
309+
}
319310
_ => self.round_robin_read_from_replica(slot_map_value),
320311
},
321312
}
@@ -325,17 +316,9 @@ where
325316
&self,
326317
route: &Route,
327318
) -> Option<ConnectionAndAddress<Connection>> {
328-
self.connection_for_route_with_params(route, None)
329-
}
330-
331-
pub(crate) fn connection_for_route_with_params(
332-
&self,
333-
route: &Route,
334-
cluster_params: Option<ClusterParams>,
335-
) -> Option<ConnectionAndAddress<Connection>> {
336-
self.lookup_route(route, &cluster_params).or_else(|| {
319+
self.lookup_route(route).or_else(|| {
337320
if route.slot_addr() != SlotAddr::Master {
338-
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master), &cluster_params)
321+
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master))
339322
} else {
340323
None
341324
}
@@ -513,7 +496,7 @@ mod tests {
513496
}
514497

515498
fn create_container_with_az_strategy(
516-
strategy: ReadFromReplicaStrategy,
499+
// strategy: ReadFromReplicaStrategy,
517500
use_management_connections: bool,
518501
) -> ConnectionsContainer<usize> {
519502
let slot_map = SlotMap::new(
@@ -571,7 +554,7 @@ mod tests {
571554
ConnectionsContainer {
572555
slot_map,
573556
connection_map,
574-
read_from_replica_strategy: strategy,
557+
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
575558
topology_hash: 0,
576559
}
577560
}
@@ -802,28 +785,17 @@ mod tests {
802785

803786
#[test]
804787
fn get_connection_for_az_affinity_route() {
805-
let container =
806-
create_container_with_az_strategy(ReadFromReplicaStrategy::AZAffinity, false);
807-
let mut cluster_params = ClusterParams::default();
808-
809-
cluster_params.client_az = Some("use-1a".to_string());
788+
let container = create_container_with_az_strategy(false);
810789

811790
// slot number is not exits
812791
assert!(container
813-
.connection_for_route_with_params(
814-
&Route::new(1001, SlotAddr::ReplicaOptional),
815-
Some(cluster_params.clone())
816-
)
792+
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
817793
.is_none());
818-
819794
// Get the replica that holds the slot 1002
820795
assert_eq!(
821796
21,
822797
container
823-
.connection_for_route_with_params(
824-
&Route::new(1002, SlotAddr::ReplicaOptional),
825-
Some(cluster_params.clone())
826-
)
798+
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
827799
.unwrap()
828800
.1
829801
);
@@ -832,20 +804,14 @@ mod tests {
832804
assert_eq!(
833805
2,
834806
container
835-
.connection_for_route_with_params(
836-
&Route::new(1500, SlotAddr::Master),
837-
Some(cluster_params.clone())
838-
)
807+
.connection_for_route(&Route::new(1500, SlotAddr::Master))
839808
.unwrap()
840809
.1
841810
);
842811

843812
// receive one of the replicas that holds the slot 2001 and is in the availability zone of the client ("use-1a")
844813
assert!(one_of(
845-
container.connection_for_route_with_params(
846-
&Route::new(2001, SlotAddr::ReplicaRequired),
847-
Some(cluster_params.clone())
848-
),
814+
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
849815
&[31, 33],
850816
));
851817

@@ -854,10 +820,7 @@ mod tests {
854820
assert_eq!(
855821
31,
856822
container
857-
.connection_for_route_with_params(
858-
&Route::new(2001, SlotAddr::ReplicaOptional),
859-
Some(cluster_params.clone())
860-
)
823+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
861824
.unwrap()
862825
.1
863826
);
@@ -867,10 +830,7 @@ mod tests {
867830
assert_eq!(
868831
32,
869832
container
870-
.connection_for_route_with_params(
871-
&Route::new(2001, SlotAddr::ReplicaOptional),
872-
Some(cluster_params.clone())
873-
)
833+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
874834
.unwrap()
875835
.1
876836
);
@@ -880,15 +840,38 @@ mod tests {
880840
assert_eq!(
881841
3,
882842
container
883-
.connection_for_route_with_params(
884-
&Route::new(2001, SlotAddr::ReplicaOptional),
885-
Some(cluster_params.clone())
886-
)
843+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
887844
.unwrap()
888845
.1
889846
);
890847
}
891848

849+
#[test]
850+
fn get_connection_for_az_affinity_route_round_robin() {
851+
let container = create_container_with_az_strategy(false);
852+
853+
let mut addresses = vec![
854+
container
855+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
856+
.unwrap()
857+
.1,
858+
container
859+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
860+
.unwrap()
861+
.1,
862+
container
863+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
864+
.unwrap()
865+
.1,
866+
container
867+
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
868+
.unwrap()
869+
.1,
870+
];
871+
addresses.sort();
872+
assert_eq!(addresses, vec![31, 31, 33, 33]);
873+
}
874+
892875
#[test]
893876
fn get_connection_by_address() {
894877
let container = create_container();

glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use crate::{
1414
};
1515
use std::net::SocketAddr;
1616

17-
use future::ok;
1817
use futures::prelude::*;
1918
use futures_util::{future::BoxFuture, join};
2019
use tracing::warn;
@@ -374,22 +373,8 @@ where
374373
match info_res {
375374
Ok(value) => {
376375
let info_dict: Result<InfoDict, RedisError> = FromRedisValue::from_redis_value(&value);
377-
if let Ok(info_dict) = info_dict {
378-
if let Some(az) = info_dict.get::<String>("availability_zone") {
379-
conn_details.az = Some(az);
380-
Ok(())
381-
} else {
382-
Err(RedisError::from((
383-
ErrorKind::ResponseError,
384-
"Failed to get availability_zone from info",
385-
)))
386-
}
387-
} else {
388-
Err(RedisError::from((
389-
ErrorKind::ResponseError,
390-
"Failed to parse info command",
391-
)))
392-
}
376+
conn_details.az = info_dict?.get::<String>("availability_zone");
377+
Ok(())
393378
}
394379
Err(_) => {
395380
// Handle the error case for the INFO command

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,7 +1043,7 @@ where
10431043
conn_lock: RwLock::new(ConnectionsContainer::new(
10441044
Default::default(),
10451045
connections,
1046-
cluster_params.read_from_replicas,
1046+
cluster_params.read_from_replicas.clone(),
10471047
0,
10481048
)),
10491049
cluster_params: cluster_params.clone(),
@@ -1807,7 +1807,7 @@ where
18071807
*write_guard = ConnectionsContainer::new(
18081808
new_slots,
18091809
new_connections,
1810-
inner.cluster_params.read_from_replicas,
1810+
inner.cluster_params.read_from_replicas.clone(),
18111811
topology_hash,
18121812
);
18131813
Ok(())
@@ -2026,9 +2026,7 @@ where
20262026
)
20272027
}
20282028
InternalSingleNodeRouting::SpecificNode(route) => {
2029-
match read_guard
2030-
.connection_for_route_with_params(&route, Some(core.cluster_params.clone()))
2031-
{
2029+
match read_guard.connection_for_route(&route) {
20322030
Some((conn, address)) => ConnectionCheck::Found((conn, address)),
20332031
None => {
20342032
// No connection is found for the given route:
@@ -2498,7 +2496,7 @@ where
24982496
curr_retry,
24992497
inner.cluster_params.tls,
25002498
num_of_nodes_to_query,
2501-
inner.cluster_params.read_from_replicas,
2499+
inner.cluster_params.read_from_replicas.clone(),
25022500
),
25032501
failed_addresses,
25042502
)

glide-core/redis-rs/redis/src/cluster_client.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use tokio::sync::mpsc;
3333
struct BuilderParams {
3434
password: Option<String>,
3535
username: Option<String>,
36-
client_az: Option<String>,
3736
read_from_replicas: ReadFromReplicaStrategy,
3837
tls: Option<TlsMode>,
3938
#[cfg(feature = "tls-rustls")]
@@ -131,7 +130,6 @@ impl SlotsRefreshRateLimit {
131130
pub struct ClusterParams {
132131
pub(crate) password: Option<String>,
133132
pub(crate) username: Option<String>,
134-
pub(crate) client_az: Option<String>,
135133
pub(crate) read_from_replicas: ReadFromReplicaStrategy,
136134
/// tls indicates tls behavior of connections.
137135
/// When Some(TlsMode), connections use tls and verify certification depends on TlsMode.
@@ -167,7 +165,6 @@ impl ClusterParams {
167165
Ok(Self {
168166
password: value.password,
169167
username: value.username,
170-
client_az: value.client_az,
171168
read_from_replicas: value.read_from_replicas,
172169
tls: value.tls,
173170
retry_params: value.retries_configuration,
@@ -390,6 +387,22 @@ impl ClusterClientBuilder {
390387
self
391388
}
392389

390+
/// Sets the read strategy on all new connections, based on the specified policy.
391+
///
392+
/// Using the specified `read_strategy`, this function configures whether read queries will be
393+
/// routed to replica nodes or primary nodes. If `ReadFromReplicaStrategy::AZAffinity` is set,
394+
/// read requests will first attempt to access replicas in the same availability zone, falling
395+
/// back to other replicas or the primary if needed. If `ReadFromReplicaStrategy::RoundRobin` is chosen, reads are distributed
396+
/// across replicas for load balancing, while `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries
397+
/// are directed to the primary node.
398+
///
399+
/// # Parameters
400+
/// - `read_strategy`: Defines the replica routing strategy.
401+
pub fn read_from(mut self, read_strategy: ReadFromReplicaStrategy) -> ClusterClientBuilder {
402+
self.builder_params.read_from_replicas = read_strategy;
403+
self
404+
}
405+
393406
/// Enables periodic topology checks for this client.
394407
///
395408
/// If enabled, periodic topology checks will be executed at the configured intervals to examine whether there

0 commit comments

Comments
 (0)