Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,26 @@ def parse_oss_maintenance_start_msg(response):
@staticmethod
def parse_oss_maintenance_completed_msg(response):
# Expected message format is:
# SMIGRATED <seq_number> [<host:port> <slot, range1-range2,...>, ...]
# SMIGRATED <seq_number> [[<src_host:port> <dest_host:port> <slot_range>], ...]
id = response[1]
nodes_to_slots_mapping_data = response[2]
# Build the nodes_to_slots_mapping dict structure:
# {
# "src_host:port": [
# {"dest_host:port": "slot_range"},
# ...
# ],
# ...
# }
nodes_to_slots_mapping = {}
for node, slots in nodes_to_slots_mapping_data:
nodes_to_slots_mapping[safe_str(node)] = safe_str(slots)
for src_node, dest_node, slots in nodes_to_slots_mapping_data:
src_node_str = safe_str(src_node)
dest_node_str = safe_str(dest_node)
slots_str = safe_str(slots)

if src_node_str not in nodes_to_slots_mapping:
nodes_to_slots_mapping[src_node_str] = []
nodes_to_slots_mapping[src_node_str].append({dest_node_str: slots_str})

return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)

Expand Down Expand Up @@ -341,17 +355,20 @@ def handle_push_response(self, response, **kwargs):

if notification is not None:
return self.maintenance_push_handler_func(notification)
if (
msg_type == _SMIGRATED_MESSAGE
and self.oss_cluster_maint_push_handler_func
if msg_type == _SMIGRATED_MESSAGE and (
self.oss_cluster_maint_push_handler_func
or self.maintenance_push_handler_func
):
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification = parser_function(response)

if notification is not None:
return self.oss_cluster_maint_push_handler_func(notification)
if self.maintenance_push_handler_func:
self.maintenance_push_handler_func(notification)
if self.oss_cluster_maint_push_handler_func:
self.oss_cluster_maint_push_handler_func(notification)
except Exception as e:
logger.error(
"Error handling {} message ({}): {}".format(msg_type, response, e)
Expand Down
3 changes: 3 additions & 0 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,9 @@ def __repr__(self) -> str:
def __eq__(self, obj: Any) -> bool:
return isinstance(obj, ClusterNode) and obj.name == self.name

def __hash__(self) -> int:
return hash(self.name)

_DEL_MESSAGE = "Unclosed ClusterNode object"

def __del__(
Expand Down
3 changes: 3 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,9 @@ def __repr__(self):
def __eq__(self, obj):
return isinstance(obj, ClusterNode) and obj.name == self.name

def __hash__(self):
return hash(self.name)


class LoadBalancingStrategy(Enum):
ROUND_ROBIN = "round_robin"
Expand Down
41 changes: 19 additions & 22 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,23 +440,17 @@ def _configure_maintenance_notifications(
else:
self._maint_notifications_pool_handler = None

self._maint_notifications_connection_handler = (
MaintNotificationsConnectionHandler(self, self.maint_notifications_config)
)

if oss_cluster_maint_notifications_handler:
# Extract a reference to a new handler that copies all properties
# of the original one and has a different connection reference
# This is needed because when we attach the handler to the parser
# we need to make sure that the handler has a reference to the
# connection that the parser is attached to.
self._oss_cluster_maint_notifications_handler = (
oss_cluster_maint_notifications_handler.get_handler_for_connection()
oss_cluster_maint_notifications_handler
)
self._oss_cluster_maint_notifications_handler.set_connection(self)
else:
self._oss_cluster_maint_notifications_handler = None

self._maint_notifications_connection_handler = (
MaintNotificationsConnectionHandler(self, self.maint_notifications_config)
)

# Set up OSS cluster handler to parser if available
if self._oss_cluster_maint_notifications_handler:
parser.set_oss_cluster_maint_push_handler(
Expand Down Expand Up @@ -521,21 +515,12 @@ def set_maint_notifications_pool_handler_for_connection(
def set_maint_notifications_cluster_handler_for_connection(
self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
):
# Deep copy the cluster handler to avoid sharing the same handler
# between multiple connections, because otherwise each connection will override
# the connection reference and the handler will only hold a reference
# to the last connection that was set.
maint_notifications_cluster_handler_copy = (
oss_cluster_maint_notifications_handler.get_handler_for_connection()
)

maint_notifications_cluster_handler_copy.set_connection(self)
self._get_parser().set_oss_cluster_maint_push_handler(
maint_notifications_cluster_handler_copy.handle_notification
oss_cluster_maint_notifications_handler.handle_notification
)

self._oss_cluster_maint_notifications_handler = (
maint_notifications_cluster_handler_copy
oss_cluster_maint_notifications_handler
)

# Update maintenance notification connection handler if it doesn't exist
Expand Down Expand Up @@ -1142,6 +1127,7 @@ def disconnect(self, *args):
self._sock = None
# reset the reconnect flag
self.reset_should_reconnect()

if conn_sock is None:
return

Expand All @@ -1156,6 +1142,17 @@ def disconnect(self, *args):
except OSError:
pass

if self.maintenance_state == MaintenanceState.MAINTENANCE:
# this block will be executed only if the connection was in maintenance state
# and the connection was closed.
# The state change won't be applied on connections that are in Moving state
# because their state and configurations will be handled when the moving ttl expires.
self.reset_tmp_settings(reset_relaxed_timeout=True)
self.maintenance_state = MaintenanceState.NONE
# reset the sets that keep track of received start maint
# notifications and skipped end maint notifications
self.reset_received_notifications()

def mark_for_reconnect(self):
self._should_reconnect = True

Expand Down
Loading