Skip to content

Commit 3b34488

Browse files
author
Adar Ovadia
committed
standalone az awareness
Signed-off-by: Adar Ovadia <adarov@amazon.com>
1 parent c9556e0 commit 3b34488

File tree

19 files changed

+260
-91
lines changed

19 files changed

+260
-91
lines changed

.github/workflows/redis-rs.yml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,14 @@ jobs:
6565
working-directory: ./glide-core/redis-rs/redis
6666

6767
- name: Test
68-
run:
6968
# TODO remove the concurrency limit after we fix test flakyness.
7069
run: |
71-
versions_to_compare=$(printf "%s\n" "${{ matrix.engine.version }}" "7.1" | sort -V)
72-
# Sort versions and get the latest version between the current version and 7.1, if the latest version is 7.1, skip the test.
73-
lt_version=$(echo "$versions_to_comare" | tail -n1)
74-
if [[ "7.1" == "$lt_version" ]]; then
75-
cargo test --release -- --test-threads=1 | tee ../test-results.xml
70+
smallest_version=$(printf "%s\n" "${{ matrix.engine.version }}" "8.0" | sort -V | head -n1)
71+
# Sort versions and get the smallest version between the current version and 8.0, if the version is small than 8.0, skip the test.
72+
if [[ "8.0" == "$smallest_version" ]]; then
73+
cargo test --release --features valkey-gte-8 -- --test-threads=1 | tee ../test-results.xml
7674
else
77-
cargo test --release --features valkey-gte-7-2 -- --test-threads=1 | tee ../test-results.xml
75+
cargo test --release -- --test-threads=1 | tee ../test-results.xml
7876
fi
7977
8078
echo "### Tests passed :v:" >> $GITHUB_STEP_SUMMARY

glide-core/redis-rs/redis-test/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,12 @@ impl AioConnectionLike for MockRedisConnection {
292292
fn is_closed(&self) -> bool {
293293
false
294294
}
295+
296+
fn get_az(&self) -> Option<String> {
297+
None
298+
}
299+
300+
fn set_az(&mut self, _az: Option<String>) {}
295301
}
296302

297303
#[cfg(test)]

glide-core/redis-rs/redis/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ bigdecimal = ["dep:bigdecimal"]
153153
num-bigint = []
154154
uuid = ["dep:uuid"]
155155
disable-client-setinfo = []
156-
valkey-gte-7-2 = []
156+
valkey-gte-8 = []
157157

158158
# Deprecated features
159159
tls = ["tls-native-tls"] # use "tls-native-tls" instead

glide-core/redis-rs/redis/src/aio/connection.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ where
6565
pubsub: false,
6666
protocol: connection_info.protocol,
6767
};
68-
setup_connection(connection_info, &mut rv).await?;
68+
setup_connection(connection_info, &mut rv, false).await?;
6969
Ok(rv)
7070
}
7171

@@ -260,6 +260,12 @@ where
260260
// always false for AsyncRead + AsyncWrite (cant do better)
261261
false
262262
}
263+
264+
fn get_az(&self) -> Option<String> {
265+
None
266+
}
267+
268+
fn set_az(&mut self, _az: Option<String>) {}
263269
}
264270

265271
/// Represents a `PubSub` connection.

glide-core/redis-rs/redis/src/aio/connection_manager.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,4 +309,10 @@ impl ConnectionLike for ConnectionManager {
309309
// always return false due to automatic reconnect
310310
false
311311
}
312+
313+
fn get_az(&self) -> Option<String> {
314+
None
315+
}
316+
317+
fn set_az(&mut self, _az: Option<String>) {}
312318
}

glide-core/redis-rs/redis/src/aio/mod.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use crate::cmd::{cmd, Cmd};
33
use crate::connection::{
44
get_resp3_hello_command_error, PubSubSubscriptionKind, RedisConnectionInfo,
55
};
6-
use crate::types::{ErrorKind, ProtocolVersion, RedisFuture, RedisResult, Value};
6+
use crate::types::{
7+
ErrorKind, FromRedisValue, InfoDict, ProtocolVersion, RedisError, RedisFuture, RedisResult,
8+
Value,
9+
};
710
use crate::PushKind;
811
use ::tokio::io::{AsyncRead, AsyncWrite};
912
use async_trait::async_trait;
@@ -84,6 +87,12 @@ pub trait ConnectionLike {
8487

8588
/// Returns the state of the connection
8689
fn is_closed(&self) -> bool;
90+
91+
/// Get the connection availibility zone
92+
fn get_az(&self) -> Option<String>;
93+
94+
/// Set the connection availibility zone
95+
fn set_az(&mut self, az: Option<String>);
8796
}
8897

8998
/// Implements ability to notify about disconnection events
@@ -105,8 +114,40 @@ impl Clone for Box<dyn DisconnectNotifier> {
105114
}
106115
}
107116

117+
// Helper function to extract and update availability zone from INFO command
118+
async fn update_az_from_info<C>(con: &mut C) -> RedisResult<()>
119+
where
120+
C: ConnectionLike,
121+
{
122+
let info_res = con.req_packed_command(&cmd("INFO")).await;
123+
124+
match info_res {
125+
Ok(value) => {
126+
let info_dict: InfoDict = FromRedisValue::from_redis_value(&value)?;
127+
if let Some(node_az) = info_dict.get::<String>("availability_zone") {
128+
con.set_az(Some(node_az));
129+
}
130+
Ok(())
131+
}
132+
Err(e) => {
133+
// Handle the error case for the INFO command
134+
Err(RedisError::from((
135+
ErrorKind::ResponseError,
136+
"Failed to execute INFO command. ",
137+
format!("{:?}", e),
138+
)))
139+
}
140+
}
141+
}
142+
108143
// Initial setup for every connection.
109-
async fn setup_connection<C>(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()>
144+
async fn setup_connection<C>(
145+
connection_info: &RedisConnectionInfo,
146+
con: &mut C,
147+
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
148+
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
149+
discover_az: bool,
150+
) -> RedisResult<()>
110151
where
111152
C: ConnectionLike,
112153
{
@@ -181,6 +222,10 @@ where
181222
}
182223
}
183224

225+
if discover_az {
226+
update_az_from_info(con).await?;
227+
}
228+
184229
// result is ignored, as per the command's instructions.
185230
// https://redis.io/commands/client-setinfo/
186231
#[cfg(not(feature = "disable-client-setinfo"))]

glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ pub struct MultiplexedConnection {
417417
response_timeout: Duration,
418418
protocol: ProtocolVersion,
419419
push_manager: PushManager,
420+
availability_zone: Option<String>,
420421
password: Option<String>,
421422
}
422423

@@ -479,11 +480,16 @@ impl MultiplexedConnection {
479480
.with_push_manager(pm)
480481
.with_protocol(connection_info.redis.protocol)
481482
.with_password(connection_info.redis.password.clone())
483+
.with_availability_zone(None)
482484
.build()
483485
.await?;
484486

485487
let driver = {
486-
let auth = setup_connection(&connection_info.redis, &mut con);
488+
let auth = setup_connection(
489+
&connection_info.redis,
490+
&mut con,
491+
glide_connection_options.discover_az,
492+
);
487493

488494
futures_util::pin_mut!(auth);
489495

@@ -575,6 +581,11 @@ impl MultiplexedConnection {
575581
self.pipeline.set_push_manager(push_manager).await;
576582
}
577583

584+
/// For external visibilty (glide-core)
585+
pub fn get_availability_zone(&self) -> Option<String> {
586+
self.availability_zone.clone()
587+
}
588+
578589
/// Replace the password used to authenticate with the server.
579590
/// If `None` is provided, the password will be removed.
580591
pub async fn update_connection_password(
@@ -599,6 +610,8 @@ pub struct MultiplexedConnectionBuilder {
599610
push_manager: Option<PushManager>,
600611
protocol: Option<ProtocolVersion>,
601612
password: Option<String>,
613+
/// Represents the node's availability zone
614+
availability_zone: Option<String>,
602615
}
603616

604617
impl MultiplexedConnectionBuilder {
@@ -611,6 +624,7 @@ impl MultiplexedConnectionBuilder {
611624
push_manager: None,
612625
protocol: None,
613626
password: None,
627+
availability_zone: None,
614628
}
615629
}
616630

@@ -644,6 +658,12 @@ impl MultiplexedConnectionBuilder {
644658
self
645659
}
646660

661+
/// Sets the avazilability zone for the `MultiplexedConnectionBuilder`.
662+
pub fn with_availability_zone(mut self, az: Option<String>) -> Self {
663+
self.availability_zone = az;
664+
self
665+
}
666+
647667
/// Builds and returns a new `MultiplexedConnection` instance using the configured settings.
648668
pub async fn build(self) -> RedisResult<MultiplexedConnection> {
649669
let db = self.db.unwrap_or_default();
@@ -661,6 +681,7 @@ impl MultiplexedConnectionBuilder {
661681
push_manager,
662682
protocol,
663683
password,
684+
availability_zone: self.availability_zone,
664685
};
665686

666687
Ok(con)
@@ -688,6 +709,16 @@ impl ConnectionLike for MultiplexedConnection {
688709
fn is_closed(&self) -> bool {
689710
self.pipeline.is_closed()
690711
}
712+
713+
/// Get the node's availability zone
714+
fn get_az(&self) -> Option<String> {
715+
self.availability_zone.clone()
716+
}
717+
718+
/// Set the node's availability zone
719+
fn set_az(&mut self, az: Option<String>) {
720+
self.availability_zone = az;
721+
}
691722
}
692723
impl MultiplexedConnection {
693724
/// Subscribes to a new channel.

glide-core/redis-rs/redis/src/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ pub struct GlideConnectionOptions {
8686
#[cfg(feature = "aio")]
8787
/// Passive disconnect notifier
8888
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
89+
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
90+
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
91+
pub discover_az: bool,
8992
}
9093

9194
/// To enable async support you need to enable the feature: `tokio-comp`

glide-core/redis-rs/redis/src/cluster_async/connections_container.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ macro_rules! count_connections {
3030
pub struct ConnectionDetails<Connection> {
3131
/// The actual connection
3232
pub conn: Connection,
33-
/// The IP and AZ associated with the connection
33+
/// The IP associated with the connection
3434
pub ip: Option<IpAddr>,
35-
/// The AZ associated with the connection
35+
/// The availability zone associated with the connection
3636
pub az: Option<String>,
3737
}
3838

@@ -207,6 +207,13 @@ where
207207
Telemetry::incr_total_connections(conn_count_after.saturating_sub(conn_count_before));
208208
}
209209

210+
/// Returns the availability zone associated with the connection in address
211+
pub(crate) fn az_for_address(&self, address: &str) -> Option<String> {
212+
self.connection_map
213+
.get(address)
214+
.map(|item| item.value().user_connection.az.clone())?
215+
}
216+
210217
/// Returns true if the address represents a known primary node.
211218
pub(crate) fn is_primary(&self, address: &String) -> bool {
212219
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
@@ -240,7 +247,9 @@ where
240247
}
241248
}
242249

243-
pub(crate) fn round_robin_read_from_az_awareness_replica(
250+
/// Returns the node's connection in the same availability zone as `client_az` in round robin strategy if exits,
251+
/// if not, will fall back to any available replica or primary.
252+
pub(crate) fn round_robin_read_from_replica_with_az_awareness(
244253
&self,
245254
slot_map_value: &SlotMapValue,
246255
client_az: String,
@@ -251,27 +260,21 @@ where
251260

252261
loop {
253262
retries = retries.saturating_add(1);
254-
// Looped through all replicas; no connected replica found in the same AZ.
263+
// Looped through all replicas; no connected replica found in the same availability zone.
255264
if retries > addrs.replicas().len() {
256-
// Attempt a fallback to any available replica in other AZs.
257-
for replica in &addrs.replicas() {
258-
if let Some(connection) = self.connection_for_address(replica.as_str()) {
259-
return Some(connection);
260-
}
261-
}
262-
// Fallback to the primary if no replica is connected.
263-
return self.connection_for_address(addrs.primary().as_str());
265+
// Attempt a fallback to any available replica or primary if needed.
266+
return self.round_robin_read_from_replica(slot_map_value);
264267
}
265268

266269
// Calculate index based on initial index and check count.
267270
let index = (initial_index + retries) % addrs.replicas().len();
268271
let replica = &addrs.replicas()[index];
269272

270-
// Check if this replica’s AZ matches the user’s AZ.
273+
// Check if this replica’s availability zone matches the user’s availability zone.
271274
if let Some((address, connection_details)) =
272275
self.connection_details_for_address(replica.as_str())
273276
{
274-
if connection_details.az.as_deref() == Some(&client_az) {
277+
if self.az_for_address(&address) == Some(client_az.clone()) {
275278
// Attempt to update `latest_used_replica` with the index of this replica.
276279
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
277280
initial_index,
@@ -303,15 +306,19 @@ where
303306
ReadFromReplicaStrategy::RoundRobin => {
304307
self.round_robin_read_from_replica(slot_map_value)
305308
}
306-
ReadFromReplicaStrategy::AZAffinity(az) => {
307-
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
308-
}
309+
ReadFromReplicaStrategy::AZAffinity(az) => self
310+
.round_robin_read_from_replica_with_az_awareness(
311+
slot_map_value,
312+
az.to_string(),
313+
),
309314
},
310315
// when the user strategy per command is replica_preffered
311316
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
312-
ReadFromReplicaStrategy::AZAffinity(az) => {
313-
self.round_robin_read_from_az_awareness_replica(slot_map_value, az.to_string())
314-
}
317+
ReadFromReplicaStrategy::AZAffinity(az) => self
318+
.round_robin_read_from_replica_with_az_awareness(
319+
slot_map_value,
320+
az.to_string(),
321+
),
315322
_ => self.round_robin_read_from_replica(slot_map_value),
316323
},
317324
}
@@ -502,7 +509,6 @@ mod tests {
502509
}
503510

504511
fn create_container_with_az_strategy(
505-
// strategy: ReadFromReplicaStrategy,
506512
use_management_connections: bool,
507513
) -> ConnectionsContainer<usize> {
508514
let slot_map = SlotMap::new(

0 commit comments

Comments
 (0)