Skip to content

Commit 42f0411

Browse files
authored
Refactoring the SMIGRATED flow - the notification is changed to contain the src node address for each slot range movement. (#3925)
* Refactoring the SMIGRATED flow - the notification is changed to contain the src node address for each slot range movement. * Applying review comments and adding additional test step to ensure connections states * Migarting the last test to use the FI new actions structure * Completed migration of all oss api hitless tests to use the new FI helpers. * Applying review comments
1 parent e07381c commit 42f0411

13 files changed

+1061
-1098
lines changed

redis/_parsers/base.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,26 @@ def parse_oss_maintenance_start_msg(response):
192192
@staticmethod
193193
def parse_oss_maintenance_completed_msg(response):
194194
# Expected message format is:
195-
# SMIGRATED <seq_number> [<host:port> <slot, range1-range2,...>, ...]
195+
# SMIGRATED <seq_number> [[<src_host:port> <dest_host:port> <slot_range>], ...]
196196
id = response[1]
197197
nodes_to_slots_mapping_data = response[2]
198+
# Build the nodes_to_slots_mapping dict structure:
199+
# {
200+
# "src_host:port": [
201+
# {"dest_host:port": "slot_range"},
202+
# ...
203+
# ],
204+
# ...
205+
# }
198206
nodes_to_slots_mapping = {}
199-
for node, slots in nodes_to_slots_mapping_data:
200-
nodes_to_slots_mapping[safe_str(node)] = safe_str(slots)
207+
for src_node, dest_node, slots in nodes_to_slots_mapping_data:
208+
src_node_str = safe_str(src_node)
209+
dest_node_str = safe_str(dest_node)
210+
slots_str = safe_str(slots)
211+
212+
if src_node_str not in nodes_to_slots_mapping:
213+
nodes_to_slots_mapping[src_node_str] = []
214+
nodes_to_slots_mapping[src_node_str].append({dest_node_str: slots_str})
201215

202216
return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)
203217

@@ -341,17 +355,20 @@ def handle_push_response(self, response, **kwargs):
341355

342356
if notification is not None:
343357
return self.maintenance_push_handler_func(notification)
344-
if (
345-
msg_type == _SMIGRATED_MESSAGE
346-
and self.oss_cluster_maint_push_handler_func
358+
if msg_type == _SMIGRATED_MESSAGE and (
359+
self.oss_cluster_maint_push_handler_func
360+
or self.maintenance_push_handler_func
347361
):
348362
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
349363
msg_type
350364
][1]
351365
notification = parser_function(response)
352366

353367
if notification is not None:
354-
return self.oss_cluster_maint_push_handler_func(notification)
368+
if self.maintenance_push_handler_func:
369+
self.maintenance_push_handler_func(notification)
370+
if self.oss_cluster_maint_push_handler_func:
371+
self.oss_cluster_maint_push_handler_func(notification)
355372
except Exception as e:
356373
logger.error(
357374
"Error handling {} message ({}): {}".format(msg_type, response, e)

redis/asyncio/cluster.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,9 @@ def __repr__(self) -> str:
12121212
def __eq__(self, obj: Any) -> bool:
12131213
return isinstance(obj, ClusterNode) and obj.name == self.name
12141214

1215+
def __hash__(self) -> int:
1216+
return hash(self.name)
1217+
12151218
_DEL_MESSAGE = "Unclosed ClusterNode object"
12161219

12171220
def __del__(

redis/cluster.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,6 +1676,9 @@ def __repr__(self):
16761676
def __eq__(self, obj):
16771677
return isinstance(obj, ClusterNode) and obj.name == self.name
16781678

1679+
def __hash__(self):
1680+
return hash(self.name)
1681+
16791682

16801683
class LoadBalancingStrategy(Enum):
16811684
ROUND_ROBIN = "round_robin"

redis/connection.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -440,23 +440,17 @@ def _configure_maintenance_notifications(
440440
else:
441441
self._maint_notifications_pool_handler = None
442442

443+
self._maint_notifications_connection_handler = (
444+
MaintNotificationsConnectionHandler(self, self.maint_notifications_config)
445+
)
446+
443447
if oss_cluster_maint_notifications_handler:
444-
# Extract a reference to a new handler that copies all properties
445-
# of the original one and has a different connection reference
446-
# This is needed because when we attach the handler to the parser
447-
# we need to make sure that the handler has a reference to the
448-
# connection that the parser is attached to.
449448
self._oss_cluster_maint_notifications_handler = (
450-
oss_cluster_maint_notifications_handler.get_handler_for_connection()
449+
oss_cluster_maint_notifications_handler
451450
)
452-
self._oss_cluster_maint_notifications_handler.set_connection(self)
453451
else:
454452
self._oss_cluster_maint_notifications_handler = None
455453

456-
self._maint_notifications_connection_handler = (
457-
MaintNotificationsConnectionHandler(self, self.maint_notifications_config)
458-
)
459-
460454
# Set up OSS cluster handler to parser if available
461455
if self._oss_cluster_maint_notifications_handler:
462456
parser.set_oss_cluster_maint_push_handler(
@@ -521,21 +515,12 @@ def set_maint_notifications_pool_handler_for_connection(
521515
def set_maint_notifications_cluster_handler_for_connection(
522516
self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
523517
):
524-
# Deep copy the cluster handler to avoid sharing the same handler
525-
# between multiple connections, because otherwise each connection will override
526-
# the connection reference and the handler will only hold a reference
527-
# to the last connection that was set.
528-
maint_notifications_cluster_handler_copy = (
529-
oss_cluster_maint_notifications_handler.get_handler_for_connection()
530-
)
531-
532-
maint_notifications_cluster_handler_copy.set_connection(self)
533518
self._get_parser().set_oss_cluster_maint_push_handler(
534-
maint_notifications_cluster_handler_copy.handle_notification
519+
oss_cluster_maint_notifications_handler.handle_notification
535520
)
536521

537522
self._oss_cluster_maint_notifications_handler = (
538-
maint_notifications_cluster_handler_copy
523+
oss_cluster_maint_notifications_handler
539524
)
540525

541526
# Update maintenance notification connection handler if it doesn't exist
@@ -1142,6 +1127,7 @@ def disconnect(self, *args):
11421127
self._sock = None
11431128
# reset the reconnect flag
11441129
self.reset_should_reconnect()
1130+
11451131
if conn_sock is None:
11461132
return
11471133

@@ -1156,6 +1142,17 @@ def disconnect(self, *args):
11561142
except OSError:
11571143
pass
11581144

1145+
if self.maintenance_state == MaintenanceState.MAINTENANCE:
1146+
# this block will be executed only if the connection was in maintenance state
1147+
# and the connection was closed.
1148+
# The state change won't be applied on connections that are in Moving state
1149+
# because their state and configurations will be handled when the moving ttl expires.
1150+
self.reset_tmp_settings(reset_relaxed_timeout=True)
1151+
self.maintenance_state = MaintenanceState.NONE
1152+
# reset the sets that keep track of received start maint
1153+
# notifications and skipped end maint notifications
1154+
self.reset_received_notifications()
1155+
11591156
def mark_for_reconnect(self):
11601157
self._should_reconnect = True
11611158

0 commit comments

Comments
 (0)